Files
poky/bitbake/bin/bitbake-worker
Richard Purdie d061692372 bitbake: bitbake: Share BB_TASKDEPDATA with tasks
Currently tasks have no knowledge of which other tasks they depend
upon. This makes it impossible to do at least two things which would be
desirable/interesting:

a) Have the ability to create per recipe sysroots
b) Allow the aclocal files to be present only for the entries in
   DEPENDS (directly and indirectly)

By exporting task data through this new variable, tasks can inspect
their dependencies and then take actions based upon this.

(Bitbake rev: 84f1dde717dac22435005b79d03ee0b80a3e8e62)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
2013-11-26 23:01:33 +00:00

12 KiB
Executable File

#!/usr/bin/env python

import os import sys import warnings sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) from bb import fetch2 import logging import bb import select import errno import signal

Users shouldn't be running this code directly

if len(sys.argv) != 2 or sys.argv[1] != "decafbad": print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") sys.exit(1)

logger = logging.getLogger("BitBake")

try: import cPickle as pickle except ImportError: import pickle bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")

worker_pipe = sys.stdout.fileno() bb.utils.nonblockingfd(worker_pipe)

handler = bb.event.LogHandler() logger.addHandler(handler)

if 0: # Code to write out a log file of all events passing through the worker logfilename = "/tmp/workerlogfile" format_str = "%(levelname)s: %(message)s" conlogformat = bb.msg.BBLogFormatter(format_str) consolelog = logging.FileHandler(logfilename) bb.msg.addDefaultlogFilter(consolelog) consolelog.setFormatter(conlogformat) logger.addHandler(consolelog)

worker_queue = ""

def worker_fire(event, d): data = "" + pickle.dumps(event) + "" worker_fire_prepickled(data)

def worker_fire_prepickled(event): global worker_queue

worker_queue = worker_queue + event
worker_flush()

def worker_flush(): global worker_queue, worker_pipe

if not worker_queue:
    return

try:
    written = os.write(worker_pipe, worker_queue)
    worker_queue = worker_queue[written:]
except (IOError, OSError) as e:
    if e.errno != errno.EAGAIN:
        raise

def worker_child_fire(event, d): global worker_pipe

data = "<event>" + pickle.dumps(event) + "</event>"
worker_pipe.write(data)

bb.event.worker_fire = worker_fire

lf = None #lf = open("/tmp/workercommandlog", "w+") def workerlog_write(msg): if lf: lf.write(msg) lf.flush()

def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False): # We need to setup the environment BEFORE the fork, since # a fork() or exec*() activates PSEUDO...

envbackup = {}
fakeenv = {}
umask = None

taskdep = workerdata["taskdeps"][fn]
if 'umask' in taskdep and taskname in taskdep['umask']:
    # umask might come in as a number or text string..
    try:
         umask = int(taskdep['umask'][taskname],8)
    except TypeError:
         umask = taskdep['umask'][taskname]

# We can't use the fakeroot environment in a dry run as it possibly hasn't been built
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
    envvars = (workerdata["fakerootenv"][fn] or "").split()
    for key, value in (var.split('=') for var in envvars):
        envbackup[key] = os.environ.get(key)
        os.environ[key] = value
        fakeenv[key] = value

    fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
    for p in fakedirs:
        bb.utils.mkdirhier(p)
    logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
                    (fn, taskname, ', '.join(fakedirs)))
else:
    envvars = (workerdata["fakerootnoenv"][fn] or "").split()
    for key, value in (var.split('=') for var in envvars):
        envbackup[key] = os.environ.get(key)
        os.environ[key] = value
        fakeenv[key] = value

sys.stdout.flush()
sys.stderr.flush()

try:
    pipein, pipeout = os.pipe()
    pipein = os.fdopen(pipein, 'rb', 4096)
    pipeout = os.fdopen(pipeout, 'wb', 0)
    pid = os.fork()
except OSError as e:
    bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))

if pid == 0:
        global worker_pipe
        pipein.close()

        # Save out the PID so that the event can include it the
        # events
        bb.event.worker_pid = os.getpid()
        bb.event.worker_fire = worker_child_fire
        worker_pipe = pipeout

        # Make the child the process group leader
        os.setpgid(0, 0)
        # No stdin
        newsi = os.open(os.devnull, os.O_RDWR)
        os.dup2(newsi, sys.stdin.fileno())

        if umask:
            os.umask(umask)

        data.setVar("BB_WORKERCONTEXT", "1")
        data.setVar("BB_TASKDEPDATA", taskdepdata)
        data.setVar("BUILDNAME", workerdata["buildname"])
        data.setVar("DATE", workerdata["date"])
        data.setVar("TIME", workerdata["time"])
        bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"])
        ret = 0
        try:
            the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
            the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
            for h in workerdata["hashes"]:
                the_data.setVar("BBHASH_%s" % h, workerdata["hashes"][h])
            for h in workerdata["hash_deps"]:
                the_data.setVar("BBHASHDEPS_%s" % h, workerdata["hash_deps"][h])

            # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 
            # successfully. We also need to unset anything from the environment which shouldn't be there 
            exports = bb.data.exported_vars(the_data)
            bb.utils.empty_environment()
            for e, v in exports:
                os.environ[e] = v
            for e in fakeenv:
                os.environ[e] = fakeenv[e]
                the_data.setVar(e, fakeenv[e])
                the_data.setVarFlag(e, 'export', "1")

            if quieterrors:
                the_data.setVarFlag(taskname, "quieterrors", "1")

        except Exception as exc:
            if not quieterrors:
                logger.critical(str(exc))
            os._exit(1)
        try:
            if not cfg.dry_run:
                ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
            os._exit(ret)
        except:
            os._exit(1)
else:
    for key, value in envbackup.iteritems():
        if value is None:
            del os.environ[key]
        else:
            os.environ[key] = value

return pid, pipein, pipeout

class runQueueWorkerPipe(): """ Abstraction for a pipe between a worker thread and the worker server """ def init(self, pipein, pipeout): self.input = pipein if pipeout: pipeout.close() bb.utils.nonblockingfd(self.input) self.queue = ""

def read(self):
    start = len(self.queue)
    try:
        self.queue = self.queue + self.input.read(102400)
    except (OSError, IOError) as e:
        if e.errno != errno.EAGAIN:
            raise

    end = len(self.queue)
    index = self.queue.find("</event>")
    while index != -1:
        worker_fire_prepickled(self.queue[:index+8])
        self.queue = self.queue[index+8:]
        index = self.queue.find("</event>")
    return (end > start)

def close(self):
    while self.read():
        continue
    if len(self.queue) > 0:
        print("Warning, worker child left partial message: %s" % self.queue)
    self.input.close()

normalexit = False

class BitbakeWorker(object): def init(self, din): self.input = din bb.utils.nonblockingfd(self.input) self.queue = "" self.cookercfg = None self.databuilder = None self.data = None self.build_pids = {} self.build_pipes = {}

def serve(self):        
    while True:
        (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
        if self.input in ready or len(self.queue):
            start = len(self.queue)
            try:
                self.queue = self.queue + self.input.read()
            except (OSError, IOError):
                pass
            end = len(self.queue)
            self.handle_item("cookerconfig", self.handle_cookercfg)
            self.handle_item("workerdata", self.handle_workerdata)
            self.handle_item("runtask", self.handle_runtask)
            self.handle_item("finishnow", self.handle_finishnow)
            self.handle_item("ping", self.handle_ping)
            self.handle_item("quit", self.handle_quit)

        for pipe in self.build_pipes:
            self.build_pipes[pipe].read()
        if len(self.build_pids):
            self.process_waitpid()
        worker_flush()


def handle_item(self, item, func):
    if self.queue.startswith("<" + item + ">"):
        index = self.queue.find("</" + item + ">")
        while index != -1:
            func(self.queue[(len(item) + 2):index])
            self.queue = self.queue[(index + len(item) + 3):]
            index = self.queue.find("</" + item + ">")

def handle_cookercfg(self, data):
    self.cookercfg = pickle.loads(data)
    self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
    self.databuilder.parseBaseConfiguration()
    self.data = self.databuilder.data

def handle_workerdata(self, data):
    self.workerdata = pickle.loads(data)
    bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
    bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
    bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
    bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
    self.data.setVar("PRSERV_HOST", self.workerdata["prhost"])

def handle_ping(self, _):
    workerlog_write("Handling ping\n")

    logger.warn("Pong from bitbake-worker!")

def handle_quit(self, data):
    workerlog_write("Handling quit\n")

    global normalexit
    normalexit = True
    sys.exit(0)

def handle_runtask(self, data):
    fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
    workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))

    pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)

    self.build_pids[pid] = task
    self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)

def process_waitpid(self):
    """
    Return none is there are no processes awaiting result collection, otherwise
    collect the process exit codes and close the information pipe.
    """
    try:
        pid, status = os.waitpid(-1, os.WNOHANG)
        if pid == 0 or os.WIFSTOPPED(status):
            return None
    except OSError:
        return None

    workerlog_write("Exit code of %s for pid %s\n" % (status, pid))

    if os.WIFEXITED(status):
        status = os.WEXITSTATUS(status)
    elif os.WIFSIGNALED(status):
        # Per shell conventions for $?, when a process exits due to
        # a signal, we return an exit code of 128 + SIGNUM
        status = 128 + os.WTERMSIG(status)

    task = self.build_pids[pid]
    del self.build_pids[pid]

    self.build_pipes[pid].close()
    del self.build_pipes[pid]

    worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")

def handle_finishnow(self, _):
    if self.build_pids:
        logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
        for k, v in self.build_pids.iteritems():
            try:
                os.kill(-k, signal.SIGTERM)
                os.waitpid(-1, 0)
            except:
                pass
    for pipe in self.build_pipes:
        self.build_pipes[pipe].read()

try: worker = BitbakeWorker(sys.stdin) worker.serve() except BaseException as e: if not normalexit: import traceback sys.stderr.write(traceback.format_exc()) sys.stderr.write(str(e)) while len(worker_queue): worker_flush() workerlog_write("exitting") sys.exit(0)