This is a pretty fundamental change to the way bitbake operates. It splits out the task execution part of runqueue into a completely separately exec'd process called bitbake-worker. This means that the separate process has to build its own datastore and that configuration needs to be passed from the cooker over to the bitbake worker process. Known issues: * Hob is broken with this patch since it writes to the configuration and that configuration isn't preserved in bitbake-worker. * We create a worker for setscene, then a new worker for the main task execution. This is wasteful but shouldn't be hard to fix. * We probably send too much data over to bitbake-worker, need to see if we can streamline it. These are issues which will be followed up in subsequent patches. This patch sets the groundwork for the removal of the double bitbake execution for psuedo which will be in a follow on patch. (Bitbake rev: b2e26f1db28d74f2dd9df8ab4ed3b472503b9a5c) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
12 KiB
Executable File
#!/usr/bin/env python
import os import sys import warnings sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) from bb import fetch2 import logging import bb import select import errno
Users shouldn't be running this code directly
if len(sys.argv) != 2 or sys.argv[1] != "decafbad": print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") sys.exit(1)
logger = logging.getLogger("BitBake")
try: import cPickle as pickle except ImportError: import pickle bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
worker_pipe = sys.stdout.fileno() bb.utils.nonblockingfd(worker_pipe)
handler = bb.event.LogHandler() logger.addHandler(handler)
if 0: # Code to write out a log file of all events passing through the worker logfilename = "/tmp/workerlogfile" format_str = "%(levelname)s: %(message)s" conlogformat = bb.msg.BBLogFormatter(format_str) consolelog = logging.FileHandler(logfilename) bb.msg.addDefaultlogFilter(consolelog) consolelog.setFormatter(conlogformat) logger.addHandler(consolelog)
worker_queue = ""
def worker_fire(event, d): data = "" + pickle.dumps(event) + "" worker_fire_prepickled(data)
def worker_fire_prepickled(event): global worker_queue
worker_queue = worker_queue + event
worker_flush()
def worker_flush(): global worker_queue, worker_pipe
if not worker_queue:
return
try:
written = os.write(worker_pipe, worker_queue)
worker_queue = worker_queue[written:]
except (IOError, OSError) as e:
if e.errno != errno.EAGAIN:
raise
def worker_child_fire(event, d): global worker_pipe
data = "<event>" + pickle.dumps(event) + "</event>"
worker_pipe.write(data)
bb.event.worker_fire = worker_fire
lf = None #lf = open("/tmp/workercommandlog", "w+") def workerlog_write(msg): if lf: lf.write(msg) lf.flush()
def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, quieterrors=False): # We need to setup the environment BEFORE the fork, since # a fork() or exec*() activates PSEUDO...
envbackup = {}
fakeenv = {}
umask = None
taskdep = workerdata["taskdeps"][fn]
if 'umask' in taskdep and taskname in taskdep['umask']:
# umask might come in as a number or text string..
try:
umask = int(taskdep['umask'][taskname],8)
except TypeError:
umask = taskdep['umask'][taskname]
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
envvars = (workerdata["fakerootenv"][fn] or "").split()
for key, value in (var.split('=') for var in envvars):
envbackup[key] = os.environ.get(key)
os.environ[key] = value
fakeenv[key] = value
fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
for p in fakedirs:
bb.utils.mkdirhier(p)
logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
(fn, taskname, ', '.join(fakedirs)))
else:
envvars = (workerdata["fakerootnoenv"][fn] or "").split()
for key, value in (var.split('=') for var in envvars):
envbackup[key] = os.environ.get(key)
os.environ[key] = value
fakeenv[key] = value
sys.stdout.flush()
sys.stderr.flush()
try:
pipein, pipeout = os.pipe()
pipein = os.fdopen(pipein, 'rb', 4096)
pipeout = os.fdopen(pipeout, 'wb', 0)
pid = os.fork()
except OSError as e:
bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
if pid == 0:
global worker_pipe
pipein.close()
# Save out the PID so that the event can include it the
# events
bb.event.worker_pid = os.getpid()
bb.event.worker_fire = worker_child_fire
worker_pipe = pipeout
# Make the child the process group leader
os.setpgid(0, 0)
# No stdin
newsi = os.open(os.devnull, os.O_RDWR)
os.dup2(newsi, sys.stdin.fileno())
if umask:
os.umask(umask)
data.setVar("BB_WORKERCONTEXT", "1")
bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"])
ret = 0
try:
the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
for h in workerdata["hashes"]:
the_data.setVar("BBHASH_%s" % h, workerdata["hashes"][h])
for h in workerdata["hash_deps"]:
the_data.setVar("BBHASHDEPS_%s" % h, workerdata["hash_deps"][h])
# exported_vars() returns a generator which *cannot* be passed to os.environ.update()
# successfully. We also need to unset anything from the environment which shouldn't be there
exports = bb.data.exported_vars(the_data)
bb.utils.empty_environment()
for e, v in exports:
os.environ[e] = v
for e in fakeenv:
os.environ[e] = fakeenv[e]
the_data.setVar(e, fakeenv[e])
the_data.setVarFlag(e, 'export', "1")
if quieterrors:
the_data.setVarFlag(taskname, "quieterrors", "1")
except Exception as exc:
if not quieterrors:
logger.critical(str(exc))
os._exit(1)
try:
if not cfg.dry_run:
ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
os._exit(ret)
except:
os._exit(1)
else:
for key, value in envbackup.iteritems():
if value is None:
del os.environ[key]
else:
os.environ[key] = value
return pid, pipein, pipeout
class runQueueWorkerPipe(): """ Abstraction for a pipe between a worker thread and the worker server """ def init(self, pipein, pipeout): self.input = pipein if pipeout: pipeout.close() bb.utils.nonblockingfd(self.input) self.queue = ""
def read(self):
start = len(self.queue)
try:
self.queue = self.queue + self.input.read(102400)
except (OSError, IOError) as e:
if e.errno != errno.EAGAIN:
raise
end = len(self.queue)
index = self.queue.find("</event>")
while index != -1:
worker_fire_prepickled(self.queue[:index+8])
self.queue = self.queue[index+8:]
index = self.queue.find("</event>")
return (end > start)
def close(self):
while self.read():
continue
if len(self.queue) > 0:
print("Warning, worker child left partial message: %s" % self.queue)
self.input.close()
normalexit = False
class BitbakeWorker(object): def init(self, din): self.input = din bb.utils.nonblockingfd(self.input) self.queue = "" self.cookercfg = None self.databuilder = None self.data = None self.build_pids = {} self.build_pipes = {}
def serve(self):
while True:
(ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
if self.input in ready or len(self.queue):
start = len(self.queue)
try:
self.queue = self.queue + self.input.read()
except (OSError, IOError):
pass
end = len(self.queue)
self.handle_item("cookerconfig", self.handle_cookercfg)
self.handle_item("workerdata", self.handle_workerdata)
self.handle_item("runtask", self.handle_runtask)
self.handle_item("finishnow", self.handle_finishnow)
self.handle_item("ping", self.handle_ping)
self.handle_item("quit", self.handle_quit)
for pipe in self.build_pipes:
self.build_pipes[pipe].read()
if len(self.build_pids):
self.process_waitpid()
worker_flush()
def handle_item(self, item, func):
if self.queue.startswith("<" + item + ">"):
index = self.queue.find("</" + item + ">")
while index != -1:
func(self.queue[(len(item) + 2):index])
self.queue = self.queue[(index + len(item) + 3):]
index = self.queue.find("</" + item + ">")
def handle_cookercfg(self, data):
self.cookercfg = pickle.loads(data)
self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
self.databuilder.parseBaseConfiguration()
self.data = self.databuilder.data
def handle_workerdata(self, data):
self.workerdata = pickle.loads(data)
bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
def handle_ping(self, _):
workerlog_write("Handling ping\n")
logger.warn("Pong from bitbake-worker!")
def handle_quit(self, data):
workerlog_write("Handling quit\n")
global normalexit
normalexit = True
sys.exit(0)
def handle_runtask(self, data):
fn, task, taskname, quieterrors, appends = pickle.loads(data)
workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, quieterrors)
self.build_pids[pid] = task
self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
def process_waitpid(self):
"""
Return none is there are no processes awaiting result collection, otherwise
collect the process exit codes and close the information pipe.
"""
try:
pid, status = os.waitpid(-1, os.WNOHANG)
if pid == 0 or os.WIFSTOPPED(status):
return None
except OSError:
return None
workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
if os.WIFEXITED(status):
status = os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
# Per shell conventions for $?, when a process exits due to
# a signal, we return an exit code of 128 + SIGNUM
status = 128 + os.WTERMSIG(status)
task = self.build_pids[pid]
del self.build_pids[pid]
self.build_pipes[pid].close()
del self.build_pipes[pid]
worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
def handle_finishnow(self, _):
if self.build_pids:
logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
for k, v in self.build_pids.iteritems():
try:
os.kill(-k, signal.SIGTERM)
os.waitpid(-1, 0)
except:
pass
for pipe in self.build_pipes:
self.build_pipes[pipe].read()
try: worker = BitbakeWorker(sys.stdin) worker.serve() except BaseException as e: if not normalexit: import traceback sys.stderr.write(traceback.format_exc()) sys.stderr.write(str(e)) while len(worker_queue): worker_flush() workerlog_write("exitting") sys.exit(0)