Files
poky/bitbake/bin/bitbake-worker
Richard Purdie 218b81acb6 bitbake: bitbake: Initial multi-config support
This patch adds the notion of supporting multiple configurations within
a single build. To enable it, set a line in local.conf like:

BBMULTICONFIG = "configA configB configC"

This would tell bitbake that before it parses the base configuration,
it should load conf/configA.conf and so on for each different
configuration. These would contain lines like:

MACHINE = "A"

or other variables which can be set which can be built in the same
build directory (or change TMPDIR not to conflict).

One downside I've already discovered is that if we want to inherit this
file right at the start of parsing, the only place you can put the
configurations is in "cwd", since BBPATH isn't constructed until the
layers are parsed and therefore using it as a preconf file isn't
possible unless its located there.

Execution of these targets takes the form "bitbake
multiconfig:configA:core-image-minimal core-image-sato" so similar to
our virtclass approach for native/nativesdk/multilib using BBCLASSEXTEND.

Implementation wise, the implication is that instead of tasks being
uniquely referenced with "recipename/fn:task" it now needs to be
"configuration:recipename:task".

We already started using "virtual" filenames for recipes when we
implemented BBCLASSEXTEND and this patch adds a new prefix to
these, "multiconfig:<configname>:" and hence avoid changes to a large
part of the codebase thanks to this. databuilder has an internal array
of data stores and uses the right one depending on the supplied virtual
filename.

That trick allows us to use the existing parsing code including the
multithreading mostly unchanged as well as most of the cache code.

For recipecache, we end up with a dict of these accessed by
multiconfig (mc). taskdata and runqueue can only cope with one recipecache
so for taskdata, we pass in each recipecache and have it compute the result
and end up with an array of taskdatas. We can only have one runqueue so there
extensive changes there.

This initial implementation has some drawbacks:

a) There are no inter-multi-configuration dependencies as yet

b) There are no sstate optimisations. This means if the build uses the
same object twice in say two different TMPDIRs, it will either load from
an existing sstate cache at the start or build it twice. We can then in
due course look at ways in which it would only build it once and then
reuse it. This will likely need significant changes to the way sstate
currently works to make that possible.

(Bitbake rev: 5287991691578825c847bac2368e9b51c0ede3f0)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
2016-08-18 10:06:27 +01:00

16 KiB
Executable File

#!/usr/bin/env python3

import os import sys import warnings sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) from bb import fetch2 import logging import bb import select import errno import signal import pickle from multiprocessing import Lock

if sys.getfilesystemencoding() != "utf-8": sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.")

Users shouldn't be running this code directly

if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") sys.exit(1)

profiling = False if sys.argv[1].startswith("decafbadbad"): profiling = True try: import cProfile as profile except: import profile

Unbuffer stdout to avoid log truncation in the event

of an unorderly exit as well as to provide timely

updates to log files for use with tail

try: if sys.stdout.name == '': import fcntl fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) fl |= os.O_SYNC fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) except: pass

logger = logging.getLogger("BitBake")

worker_pipe = sys.stdout.fileno() bb.utils.nonblockingfd(worker_pipe)

Need to guard against multiprocessing being used in child processes

and multiple processes trying to write to the parent at the same time

worker_pipe_lock = None

handler = bb.event.LogHandler() logger.addHandler(handler)

if 0: # Code to write out a log file of all events passing through the worker logfilename = "/tmp/workerlogfile" format_str = "%(levelname)s: %(message)s" conlogformat = bb.msg.BBLogFormatter(format_str) consolelog = logging.FileHandler(logfilename) bb.msg.addDefaultlogFilter(consolelog) consolelog.setFormatter(conlogformat) logger.addHandler(consolelog)

worker_queue = b""

def worker_fire(event, d): data = b"" + pickle.dumps(event) + b"" worker_fire_prepickled(data)

def worker_fire_prepickled(event): global worker_queue

worker_queue = worker_queue + event
worker_flush()

def worker_flush(): global worker_queue, worker_pipe

if not worker_queue:
    return

try:
    written = os.write(worker_pipe, worker_queue)
    worker_queue = worker_queue[written:]
except (IOError, OSError) as e:
    if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
        raise

def worker_child_fire(event, d): global worker_pipe global worker_pipe_lock

data = b"<event>" + pickle.dumps(event) + b"</event>"
try:
    worker_pipe_lock.acquire()
    worker_pipe.write(data)
    worker_pipe_lock.release()
except IOError:
    sigterm_handler(None, None)
    raise

bb.event.worker_fire = worker_fire

lf = None #lf = open("/tmp/workercommandlog", "w+") def workerlog_write(msg): if lf: lf.write(msg) lf.flush()

def sigterm_handler(signum, frame): signal.signal(signal.SIGTERM, signal.SIG_DFL) os.killpg(0, signal.SIGTERM) sys.exit()

def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False): # We need to setup the environment BEFORE the fork, since # a fork() or exec*() activates PSEUDO...

envbackup = {}
fakeenv = {}
umask = None

taskdep = workerdata["taskdeps"][fn]
if 'umask' in taskdep and taskname in taskdep['umask']:
    # umask might come in as a number or text string..
    try:
         umask = int(taskdep['umask'][taskname],8)
    except TypeError:
         umask = taskdep['umask'][taskname]

# We can't use the fakeroot environment in a dry run as it possibly hasn't been built
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
    envvars = (workerdata["fakerootenv"][fn] or "").split()
    for key, value in (var.split('=') for var in envvars):
        envbackup[key] = os.environ.get(key)
        os.environ[key] = value
        fakeenv[key] = value

    fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
    for p in fakedirs:
        bb.utils.mkdirhier(p)
    logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
                    (fn, taskname, ', '.join(fakedirs)))
else:
    envvars = (workerdata["fakerootnoenv"][fn] or "").split()
    for key, value in (var.split('=') for var in envvars):
        envbackup[key] = os.environ.get(key)
        os.environ[key] = value
        fakeenv[key] = value

sys.stdout.flush()
sys.stderr.flush()

try:
    pipein, pipeout = os.pipe()
    pipein = os.fdopen(pipein, 'rb', 4096)
    pipeout = os.fdopen(pipeout, 'wb', 0)
    pid = os.fork()
except OSError as e:
    logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
    sys.exit(1)

if pid == 0:
    def child():
        global worker_pipe
        global worker_pipe_lock
        pipein.close()

        signal.signal(signal.SIGTERM, sigterm_handler)
        # Let SIGHUP exit as SIGTERM
        signal.signal(signal.SIGHUP, sigterm_handler)
        bb.utils.signal_on_parent_exit("SIGTERM")

        # Save out the PID so that the event can include it the
        # events
        bb.event.worker_pid = os.getpid()
        bb.event.worker_fire = worker_child_fire
        worker_pipe = pipeout
        worker_pipe_lock = Lock()

        # Make the child the process group leader and ensure no
        # child process will be controlled by the current terminal
        # This ensures signals sent to the controlling terminal like Ctrl+C
        # don't stop the child processes.
        os.setsid()
        # No stdin
        newsi = os.open(os.devnull, os.O_RDWR)
        os.dup2(newsi, sys.stdin.fileno())

        if umask:
            os.umask(umask)

        try:
            bb_cache = bb.cache.NoCache(databuilder)
            (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
            the_data = databuilder.mcdata[mc]
            the_data.setVar("BB_WORKERCONTEXT", "1")
            the_data.setVar("BB_TASKDEPDATA", taskdepdata)
            the_data.setVar("BUILDNAME", workerdata["buildname"])
            the_data.setVar("DATE", workerdata["date"])
            the_data.setVar("TIME", workerdata["time"])
            bb.parse.siggen.set_taskdata(workerdata["sigdata"])
            ret = 0

            the_data = bb_cache.loadDataFull(fn, appends)
            the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])

            bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN", True), taskname.replace("do_", "")))

            # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 
            # successfully. We also need to unset anything from the environment which shouldn't be there 
            exports = bb.data.exported_vars(the_data)

            bb.utils.empty_environment()
            for e, v in exports:
                os.environ[e] = v

            for e in fakeenv:
                os.environ[e] = fakeenv[e]
                the_data.setVar(e, fakeenv[e])
                the_data.setVarFlag(e, 'export', "1")

            task_exports = the_data.getVarFlag(taskname, 'exports', True)
            if task_exports:
                for e in task_exports.split():
                    the_data.setVarFlag(e, 'export', '1')
                    v = the_data.getVar(e, True)
                    if v is not None:
                        os.environ[e] = v

            if quieterrors:
                the_data.setVarFlag(taskname, "quieterrors", "1")

        except Exception as exc:
            if not quieterrors:
                logger.critical(str(exc))
            os._exit(1)
        try:
            if cfg.dry_run:
                return 0
            return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
        except:
            os._exit(1)
    if not profiling:
        os._exit(child())
    else:
        profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
        prof = profile.Profile()
        try: 
            ret = profile.Profile.runcall(prof, child)
        finally:
            prof.dump_stats(profname)
            bb.utils.process_profilelog(profname)
            os._exit(ret)
else:
    for key, value in iter(envbackup.items()):
        if value is None:
            del os.environ[key]
        else:
            os.environ[key] = value

return pid, pipein, pipeout

class runQueueWorkerPipe(): """ Abstraction for a pipe between a worker thread and the worker server """ def init(self, pipein, pipeout): self.input = pipein if pipeout: pipeout.close() bb.utils.nonblockingfd(self.input) self.queue = b""

def read(self):
    start = len(self.queue)
    try:
        self.queue = self.queue + (self.input.read(102400) or b"")
    except (OSError, IOError) as e:
        if e.errno != errno.EAGAIN:
            raise

    end = len(self.queue)
    index = self.queue.find(b"</event>")
    while index != -1:
        worker_fire_prepickled(self.queue[:index+8])
        self.queue = self.queue[index+8:]
        index = self.queue.find(b"</event>")
    return (end > start)

def close(self):
    while self.read():
        continue
    if len(self.queue) > 0:
        print("Warning, worker child left partial message: %s" % self.queue)
    self.input.close()

normalexit = False

class BitbakeWorker(object): def init(self, din): self.input = din bb.utils.nonblockingfd(self.input) self.queue = b"" self.cookercfg = None self.databuilder = None self.data = None self.build_pids = {} self.build_pipes = {}

    signal.signal(signal.SIGTERM, self.sigterm_exception)
    # Let SIGHUP exit as SIGTERM
    signal.signal(signal.SIGHUP, self.sigterm_exception)
    if "beef" in sys.argv[1]:
        bb.utils.set_process_name("Worker (Fakeroot)")
    else:
        bb.utils.set_process_name("Worker")

def sigterm_exception(self, signum, stackframe):
    if signum == signal.SIGTERM:
        bb.warn("Worker received SIGTERM, shutting down...")
    elif signum == signal.SIGHUP:
        bb.warn("Worker received SIGHUP, shutting down...")
    self.handle_finishnow(None)
    signal.signal(signal.SIGTERM, signal.SIG_DFL)
    os.kill(os.getpid(), signal.SIGTERM)

def serve(self):        
    while True:
        (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
        if self.input in ready:
            try:
                r = self.input.read()
                if len(r) == 0:
                    # EOF on pipe, server must have terminated
                    self.sigterm_exception(signal.SIGTERM, None)
                self.queue = self.queue + r
            except (OSError, IOError):
                pass
        if len(self.queue):
            self.handle_item(b"cookerconfig", self.handle_cookercfg)
            self.handle_item(b"workerdata", self.handle_workerdata)
            self.handle_item(b"runtask", self.handle_runtask)
            self.handle_item(b"finishnow", self.handle_finishnow)
            self.handle_item(b"ping", self.handle_ping)
            self.handle_item(b"quit", self.handle_quit)

        for pipe in self.build_pipes:
            self.build_pipes[pipe].read()
        if len(self.build_pids):
            self.process_waitpid()
        worker_flush()


def handle_item(self, item, func):
    if self.queue.startswith(b"<" + item + b">"):
        index = self.queue.find(b"</" + item + b">")
        while index != -1:
            func(self.queue[(len(item) + 2):index])
            self.queue = self.queue[(index + len(item) + 3):]
            index = self.queue.find(b"</" + item + b">")

def handle_cookercfg(self, data):
    self.cookercfg = pickle.loads(data)
    self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
    self.databuilder.parseBaseConfiguration()
    self.data = self.databuilder.data

def handle_workerdata(self, data):
    self.workerdata = pickle.loads(data)
    bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
    bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
    bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
    bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
    for mc in self.databuilder.mcdata:
        self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])

def handle_ping(self, _):
    workerlog_write("Handling ping\n")

    logger.warning("Pong from bitbake-worker!")

def handle_quit(self, data):
    workerlog_write("Handling quit\n")

    global normalexit
    normalexit = True
    sys.exit(0)

def handle_runtask(self, data):
    fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
    workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))

    pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)

    self.build_pids[pid] = task
    self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)

def process_waitpid(self):
    """
    Return none is there are no processes awaiting result collection, otherwise
    collect the process exit codes and close the information pipe.
    """
    try:
        pid, status = os.waitpid(-1, os.WNOHANG)
        if pid == 0 or os.WIFSTOPPED(status):
            return None
    except OSError:
        return None

    workerlog_write("Exit code of %s for pid %s\n" % (status, pid))

    if os.WIFEXITED(status):
        status = os.WEXITSTATUS(status)
    elif os.WIFSIGNALED(status):
        # Per shell conventions for $?, when a process exits due to
        # a signal, we return an exit code of 128 + SIGNUM
        status = 128 + os.WTERMSIG(status)

    task = self.build_pids[pid]
    del self.build_pids[pid]

    self.build_pipes[pid].close()
    del self.build_pipes[pid]

    worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")

def handle_finishnow(self, _):
    if self.build_pids:
        logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
        for k, v in iter(self.build_pids.items()):
            try:
                os.kill(-k, signal.SIGTERM)
                os.waitpid(-1, 0)
            except:
                pass
    for pipe in self.build_pipes:
        self.build_pipes[pipe].read()

try: worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) if not profiling: worker.serve() else: profname = "profile-worker.log" prof = profile.Profile() try: profile.Profile.runcall(prof, worker.serve) finally: prof.dump_stats(profname) bb.utils.process_profilelog(profname) except BaseException as e: if not normalexit: import traceback sys.stderr.write(traceback.format_exc()) sys.stderr.write(str(e)) while len(worker_queue): worker_flush() workerlog_write("exitting") sys.exit(0)