Files
poky/bitbake/bin/bitbake-worker
Richard Purdie 1371bb4c43 bitbake: bitbake-worker: Handle cooker/worker IO deadlocking
I noiced builds where tasks seemed to be taking a surprisingly long time.
When I looked at the output of top/pstree, these tasks were no longer
running despite being listed in knotty. Some were in D/Z state waiting for
their exit code to be collected, others were simply not present at all.

strace showed communication problems between the worker and cooker, each
was trying to write to the other and nearly deadlocking. Eventually, timeouts
would allow them to echange 64kb of data but this was only happening every
few seconds.

Whilst this particularly affected builds on machines with large numbers
of cores (and hence highly parallal task execution) and in cases where
I had a lot of debug enabled, this situation is clearly bad in general.

This patch introduces a thread to the worker which is used to write data
back to cooker. This means that the deadlock can't occur and data flows
much more freely and effectively.

(Bitbake rev: f48befe1163147b02a9926ee38af0f7258a477e0)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
2017-02-03 09:52:19 +00:00

486 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import sys
import warnings
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
from bb import fetch2
import logging
import bb
import select
import errno
import signal
import pickle
import traceback
import queue
from multiprocessing import Lock
from threading import Thread
if sys.getfilesystemencoding() != "utf-8":
sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.")
# Users shouldn't be running this code directly
if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
sys.exit(1)
profiling = False
if sys.argv[1].startswith("decafbadbad"):
profiling = True
try:
import cProfile as profile
except:
import profile
# Unbuffer stdout to avoid log truncation in the event
# of an unorderly exit as well as to provide timely
# updates to log files for use with tail
try:
if sys.stdout.name == '<stdout>':
import fcntl
fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
fl |= os.O_SYNC
fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
#sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
except:
pass
logger = logging.getLogger("BitBake")
worker_pipe = sys.stdout.fileno()
bb.utils.nonblockingfd(worker_pipe)
# Need to guard against multiprocessing being used in child processes
# and multiple processes trying to write to the parent at the same time
worker_pipe_lock = None
handler = bb.event.LogHandler()
logger.addHandler(handler)
if 0:
# Code to write out a log file of all events passing through the worker
logfilename = "/tmp/workerlogfile"
format_str = "%(levelname)s: %(message)s"
conlogformat = bb.msg.BBLogFormatter(format_str)
consolelog = logging.FileHandler(logfilename)
bb.msg.addDefaultlogFilter(consolelog)
consolelog.setFormatter(conlogformat)
logger.addHandler(consolelog)
worker_queue = queue.Queue()
def worker_fire(event, d):
data = b"<event>" + pickle.dumps(event) + b"</event>"
worker_fire_prepickled(data)
def worker_fire_prepickled(event):
global worker_queue
worker_queue.put(event)
#
# We can end up with write contention with the cooker, it can be trying to send commands
# and we can be trying to send event data back. Therefore use a separate thread for writing
# back data to cooker.
#
worker_thread_exit = False
def worker_flush(worker_queue):
worker_queue_int = b""
global worker_pipe, worker_thread_exit
while True:
try:
worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
except queue.Empty:
pass
while (worker_queue_int or not worker_queue.empty()):
try:
if not worker_queue.empty():
worker_queue_int = worker_queue_int + worker_queue.get()
written = os.write(worker_pipe, worker_queue_int)
worker_queue_int = worker_queue_int[written:]
except (IOError, OSError) as e:
if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
raise
if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
return
worker_thread = Thread(target=worker_flush, args=(worker_queue,))
worker_thread.start()
def worker_child_fire(event, d):
global worker_pipe
global worker_pipe_lock
data = b"<event>" + pickle.dumps(event) + b"</event>"
try:
worker_pipe_lock.acquire()
worker_pipe.write(data)
worker_pipe_lock.release()
except IOError:
sigterm_handler(None, None)
raise
bb.event.worker_fire = worker_fire
lf = None
#lf = open("/tmp/workercommandlog", "w+")
def workerlog_write(msg):
if lf:
lf.write(msg)
lf.flush()
def sigterm_handler(signum, frame):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
os.killpg(0, signal.SIGTERM)
sys.exit()
def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
# We need to setup the environment BEFORE the fork, since
# a fork() or exec*() activates PSEUDO...
envbackup = {}
fakeenv = {}
umask = None
taskdep = workerdata["taskdeps"][fn]
if 'umask' in taskdep and taskname in taskdep['umask']:
# umask might come in as a number or text string..
try:
umask = int(taskdep['umask'][taskname],8)
except TypeError:
umask = taskdep['umask'][taskname]
# We can't use the fakeroot environment in a dry run as it possibly hasn't been built
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
envvars = (workerdata["fakerootenv"][fn] or "").split()
for key, value in (var.split('=') for var in envvars):
envbackup[key] = os.environ.get(key)
os.environ[key] = value
fakeenv[key] = value
fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
for p in fakedirs:
bb.utils.mkdirhier(p)
logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
(fn, taskname, ', '.join(fakedirs)))
else:
envvars = (workerdata["fakerootnoenv"][fn] or "").split()
for key, value in (var.split('=') for var in envvars):
envbackup[key] = os.environ.get(key)
os.environ[key] = value
fakeenv[key] = value
sys.stdout.flush()
sys.stderr.flush()
try:
pipein, pipeout = os.pipe()
pipein = os.fdopen(pipein, 'rb', 4096)
pipeout = os.fdopen(pipeout, 'wb', 0)
pid = os.fork()
except OSError as e:
logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
sys.exit(1)
if pid == 0:
def child():
global worker_pipe
global worker_pipe_lock
pipein.close()
signal.signal(signal.SIGTERM, sigterm_handler)
# Let SIGHUP exit as SIGTERM
signal.signal(signal.SIGHUP, sigterm_handler)
bb.utils.signal_on_parent_exit("SIGTERM")
# Save out the PID so that the event can include it the
# events
bb.event.worker_pid = os.getpid()
bb.event.worker_fire = worker_child_fire
worker_pipe = pipeout
worker_pipe_lock = Lock()
# Make the child the process group leader and ensure no
# child process will be controlled by the current terminal
# This ensures signals sent to the controlling terminal like Ctrl+C
# don't stop the child processes.
os.setsid()
# No stdin
newsi = os.open(os.devnull, os.O_RDWR)
os.dup2(newsi, sys.stdin.fileno())
if umask:
os.umask(umask)
try:
bb_cache = bb.cache.NoCache(databuilder)
(realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
the_data = databuilder.mcdata[mc]
the_data.setVar("BB_WORKERCONTEXT", "1")
the_data.setVar("BB_TASKDEPDATA", taskdepdata)
the_data.setVar("BUILDNAME", workerdata["buildname"])
the_data.setVar("DATE", workerdata["date"])
the_data.setVar("TIME", workerdata["time"])
bb.parse.siggen.set_taskdata(workerdata["sigdata"])
ret = 0
the_data = bb_cache.loadDataFull(fn, appends)
the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN", True), taskname.replace("do_", "")))
# exported_vars() returns a generator which *cannot* be passed to os.environ.update()
# successfully. We also need to unset anything from the environment which shouldn't be there
exports = bb.data.exported_vars(the_data)
bb.utils.empty_environment()
for e, v in exports:
os.environ[e] = v
for e in fakeenv:
os.environ[e] = fakeenv[e]
the_data.setVar(e, fakeenv[e])
the_data.setVarFlag(e, 'export', "1")
task_exports = the_data.getVarFlag(taskname, 'exports', True)
if task_exports:
for e in task_exports.split():
the_data.setVarFlag(e, 'export', '1')
v = the_data.getVar(e, True)
if v is not None:
os.environ[e] = v
if quieterrors:
the_data.setVarFlag(taskname, "quieterrors", "1")
except Exception:
if not quieterrors:
logger.critical(traceback.format_exc())
os._exit(1)
try:
if cfg.dry_run:
return 0
return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
except:
os._exit(1)
if not profiling:
os._exit(child())
else:
profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
prof = profile.Profile()
try:
ret = profile.Profile.runcall(prof, child)
finally:
prof.dump_stats(profname)
bb.utils.process_profilelog(profname)
os._exit(ret)
else:
for key, value in iter(envbackup.items()):
if value is None:
del os.environ[key]
else:
os.environ[key] = value
return pid, pipein, pipeout
class runQueueWorkerPipe():
"""
Abstraction for a pipe between a worker thread and the worker server
"""
def __init__(self, pipein, pipeout):
self.input = pipein
if pipeout:
pipeout.close()
bb.utils.nonblockingfd(self.input)
self.queue = b""
def read(self):
start = len(self.queue)
try:
self.queue = self.queue + (self.input.read(102400) or b"")
except (OSError, IOError) as e:
if e.errno != errno.EAGAIN:
raise
end = len(self.queue)
index = self.queue.find(b"</event>")
while index != -1:
worker_fire_prepickled(self.queue[:index+8])
self.queue = self.queue[index+8:]
index = self.queue.find(b"</event>")
return (end > start)
def close(self):
while self.read():
continue
if len(self.queue) > 0:
print("Warning, worker child left partial message: %s" % self.queue)
self.input.close()
normalexit = False
class BitbakeWorker(object):
def __init__(self, din):
self.input = din
bb.utils.nonblockingfd(self.input)
self.queue = b""
self.cookercfg = None
self.databuilder = None
self.data = None
self.build_pids = {}
self.build_pipes = {}
signal.signal(signal.SIGTERM, self.sigterm_exception)
# Let SIGHUP exit as SIGTERM
signal.signal(signal.SIGHUP, self.sigterm_exception)
if "beef" in sys.argv[1]:
bb.utils.set_process_name("Worker (Fakeroot)")
else:
bb.utils.set_process_name("Worker")
def sigterm_exception(self, signum, stackframe):
if signum == signal.SIGTERM:
bb.warn("Worker received SIGTERM, shutting down...")
elif signum == signal.SIGHUP:
bb.warn("Worker received SIGHUP, shutting down...")
self.handle_finishnow(None)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
os.kill(os.getpid(), signal.SIGTERM)
def serve(self):
while True:
(ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
if self.input in ready:
try:
r = self.input.read()
if len(r) == 0:
# EOF on pipe, server must have terminated
self.sigterm_exception(signal.SIGTERM, None)
self.queue = self.queue + r
except (OSError, IOError):
pass
if len(self.queue):
self.handle_item(b"cookerconfig", self.handle_cookercfg)
self.handle_item(b"workerdata", self.handle_workerdata)
self.handle_item(b"runtask", self.handle_runtask)
self.handle_item(b"finishnow", self.handle_finishnow)
self.handle_item(b"ping", self.handle_ping)
self.handle_item(b"quit", self.handle_quit)
for pipe in self.build_pipes:
self.build_pipes[pipe].read()
if len(self.build_pids):
self.process_waitpid()
def handle_item(self, item, func):
if self.queue.startswith(b"<" + item + b">"):
index = self.queue.find(b"</" + item + b">")
while index != -1:
func(self.queue[(len(item) + 2):index])
self.queue = self.queue[(index + len(item) + 3):]
index = self.queue.find(b"</" + item + b">")
def handle_cookercfg(self, data):
self.cookercfg = pickle.loads(data)
self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
self.databuilder.parseBaseConfiguration()
self.data = self.databuilder.data
def handle_workerdata(self, data):
self.workerdata = pickle.loads(data)
bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
for mc in self.databuilder.mcdata:
self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
def handle_ping(self, _):
workerlog_write("Handling ping\n")
logger.warning("Pong from bitbake-worker!")
def handle_quit(self, data):
workerlog_write("Handling quit\n")
global normalexit
normalexit = True
sys.exit(0)
def handle_runtask(self, data):
fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
self.build_pids[pid] = task
self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
def process_waitpid(self):
"""
Return none is there are no processes awaiting result collection, otherwise
collect the process exit codes and close the information pipe.
"""
try:
pid, status = os.waitpid(-1, os.WNOHANG)
if pid == 0 or os.WIFSTOPPED(status):
return None
except OSError:
return None
workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
if os.WIFEXITED(status):
status = os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
# Per shell conventions for $?, when a process exits due to
# a signal, we return an exit code of 128 + SIGNUM
status = 128 + os.WTERMSIG(status)
task = self.build_pids[pid]
del self.build_pids[pid]
self.build_pipes[pid].close()
del self.build_pipes[pid]
worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
def handle_finishnow(self, _):
if self.build_pids:
logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
for k, v in iter(self.build_pids.items()):
try:
os.kill(-k, signal.SIGTERM)
os.waitpid(-1, 0)
except:
pass
for pipe in self.build_pipes:
self.build_pipes[pipe].read()
try:
worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
if not profiling:
worker.serve()
else:
profname = "profile-worker.log"
prof = profile.Profile()
try:
profile.Profile.runcall(prof, worker.serve)
finally:
prof.dump_stats(profname)
bb.utils.process_profilelog(profname)
except BaseException as e:
if not normalexit:
import traceback
sys.stderr.write(traceback.format_exc())
sys.stderr.write(str(e))
worker_thread_exit = True
worker_thread.join()
workerlog_write("exitting")
sys.exit(0)