Files
poky/bitbake/bin/bitbake-worker
Richard Purdie 5ebd9bfff1 bitbake: prserv: Adapt autostart to bitbake-worker
With the change to bitbake-worker we need to ensure the workers know
how to contact the PR service, the magic 0 port and singleton is
no longer enough.

(Bitbake rev: c761751e259bb8e940552a28794b45887b5a72d9)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
2013-06-14 12:52:57 +01:00

12 KiB
Executable File

#!/usr/bin/env python

import os import sys import warnings sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) from bb import fetch2 import logging import bb import select import errno

Users shouldn't be running this code directly

if len(sys.argv) != 2 or sys.argv[1] != "decafbad": print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") sys.exit(1)

logger = logging.getLogger("BitBake")

try: import cPickle as pickle except ImportError: import pickle bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")

worker_pipe = sys.stdout.fileno() bb.utils.nonblockingfd(worker_pipe)

handler = bb.event.LogHandler() logger.addHandler(handler)

if 0: # Code to write out a log file of all events passing through the worker logfilename = "/tmp/workerlogfile" format_str = "%(levelname)s: %(message)s" conlogformat = bb.msg.BBLogFormatter(format_str) consolelog = logging.FileHandler(logfilename) bb.msg.addDefaultlogFilter(consolelog) consolelog.setFormatter(conlogformat) logger.addHandler(consolelog)

worker_queue = ""

def worker_fire(event, d): data = "" + pickle.dumps(event) + "" worker_fire_prepickled(data)

def worker_fire_prepickled(event): global worker_queue

worker_queue = worker_queue + event
worker_flush()

def worker_flush(): global worker_queue, worker_pipe

if not worker_queue:
    return

try:
    written = os.write(worker_pipe, worker_queue)
    worker_queue = worker_queue[written:]
except (IOError, OSError) as e:
    if e.errno != errno.EAGAIN:
        raise

def worker_child_fire(event, d): global worker_pipe

data = "<event>" + pickle.dumps(event) + "</event>"
worker_pipe.write(data)

bb.event.worker_fire = worker_fire

lf = None #lf = open("/tmp/workercommandlog", "w+") def workerlog_write(msg): if lf: lf.write(msg) lf.flush()

def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, quieterrors=False): # We need to setup the environment BEFORE the fork, since # a fork() or exec*() activates PSEUDO...

envbackup = {}
fakeenv = {}
umask = None

taskdep = workerdata["taskdeps"][fn]
if 'umask' in taskdep and taskname in taskdep['umask']:
    # umask might come in as a number or text string..
    try:
         umask = int(taskdep['umask'][taskname],8)
    except TypeError:
         umask = taskdep['umask'][taskname]

if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
    envvars = (workerdata["fakerootenv"][fn] or "").split()
    for key, value in (var.split('=') for var in envvars):
        envbackup[key] = os.environ.get(key)
        os.environ[key] = value
        fakeenv[key] = value

    fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
    for p in fakedirs:
        bb.utils.mkdirhier(p)
    logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
                    (fn, taskname, ', '.join(fakedirs)))
else:
    envvars = (workerdata["fakerootnoenv"][fn] or "").split()
    for key, value in (var.split('=') for var in envvars):
        envbackup[key] = os.environ.get(key)
        os.environ[key] = value
        fakeenv[key] = value

sys.stdout.flush()
sys.stderr.flush()

try:
    pipein, pipeout = os.pipe()
    pipein = os.fdopen(pipein, 'rb', 4096)
    pipeout = os.fdopen(pipeout, 'wb', 0)
    pid = os.fork()
except OSError as e:
    bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))

if pid == 0:
        global worker_pipe
        pipein.close()

        # Save out the PID so that the event can include it the
        # events
        bb.event.worker_pid = os.getpid()
        bb.event.worker_fire = worker_child_fire
        worker_pipe = pipeout

        # Make the child the process group leader
        os.setpgid(0, 0)
        # No stdin
        newsi = os.open(os.devnull, os.O_RDWR)
        os.dup2(newsi, sys.stdin.fileno())

        if umask:
            os.umask(umask)

        data.setVar("BB_WORKERCONTEXT", "1")
        bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"])
        ret = 0
        try:
            the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
            the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
            for h in workerdata["hashes"]:
                the_data.setVar("BBHASH_%s" % h, workerdata["hashes"][h])
            for h in workerdata["hash_deps"]:
                the_data.setVar("BBHASHDEPS_%s" % h, workerdata["hash_deps"][h])

            # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 
            # successfully. We also need to unset anything from the environment which shouldn't be there 
            exports = bb.data.exported_vars(the_data)
            bb.utils.empty_environment()
            for e, v in exports:
                os.environ[e] = v
            for e in fakeenv:
                os.environ[e] = fakeenv[e]
                the_data.setVar(e, fakeenv[e])
                the_data.setVarFlag(e, 'export', "1")

            if quieterrors:
                the_data.setVarFlag(taskname, "quieterrors", "1")

        except Exception as exc:
            if not quieterrors:
                logger.critical(str(exc))
            os._exit(1)
        try:
            if not cfg.dry_run:
                ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
            os._exit(ret)
        except:
            os._exit(1)
else:
    for key, value in envbackup.iteritems():
        if value is None:
            del os.environ[key]
        else:
            os.environ[key] = value

return pid, pipein, pipeout

class runQueueWorkerPipe(): """ Abstraction for a pipe between a worker thread and the worker server """ def init(self, pipein, pipeout): self.input = pipein if pipeout: pipeout.close() bb.utils.nonblockingfd(self.input) self.queue = ""

def read(self):
    start = len(self.queue)
    try:
        self.queue = self.queue + self.input.read(102400)
    except (OSError, IOError) as e:
        if e.errno != errno.EAGAIN:
            raise

    end = len(self.queue)
    index = self.queue.find("</event>")
    while index != -1:
        worker_fire_prepickled(self.queue[:index+8])
        self.queue = self.queue[index+8:]
        index = self.queue.find("</event>")
    return (end > start)

def close(self):
    while self.read():
        continue
    if len(self.queue) > 0:
        print("Warning, worker child left partial message: %s" % self.queue)
    self.input.close()

normalexit = False

class BitbakeWorker(object): def init(self, din): self.input = din bb.utils.nonblockingfd(self.input) self.queue = "" self.cookercfg = None self.databuilder = None self.data = None self.build_pids = {} self.build_pipes = {}

def serve(self):        
    while True:
        (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
        if self.input in ready or len(self.queue):
            start = len(self.queue)
            try:
                self.queue = self.queue + self.input.read()
            except (OSError, IOError):
                pass
            end = len(self.queue)
            self.handle_item("cookerconfig", self.handle_cookercfg)
            self.handle_item("workerdata", self.handle_workerdata)
            self.handle_item("runtask", self.handle_runtask)
            self.handle_item("finishnow", self.handle_finishnow)
            self.handle_item("ping", self.handle_ping)
            self.handle_item("quit", self.handle_quit)

        for pipe in self.build_pipes:
            self.build_pipes[pipe].read()
        if len(self.build_pids):
            self.process_waitpid()
        worker_flush()


def handle_item(self, item, func):
    if self.queue.startswith("<" + item + ">"):
        index = self.queue.find("</" + item + ">")
        while index != -1:
            func(self.queue[(len(item) + 2):index])
            self.queue = self.queue[(index + len(item) + 3):]
            index = self.queue.find("</" + item + ">")

def handle_cookercfg(self, data):
    self.cookercfg = pickle.loads(data)
    self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
    self.databuilder.parseBaseConfiguration()
    self.data = self.databuilder.data

def handle_workerdata(self, data):
    self.workerdata = pickle.loads(data)
    bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
    bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
    bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
    bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
    self.data.setVar("PRSERV_HOST", self.workerdata["prhost"])

def handle_ping(self, _):
    workerlog_write("Handling ping\n")

    logger.warn("Pong from bitbake-worker!")

def handle_quit(self, data):
    workerlog_write("Handling quit\n")

    global normalexit
    normalexit = True
    sys.exit(0)

def handle_runtask(self, data):
    fn, task, taskname, quieterrors, appends = pickle.loads(data)
    workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))

    pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, quieterrors)

    self.build_pids[pid] = task
    self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)

def process_waitpid(self):
    """
    Return none is there are no processes awaiting result collection, otherwise
    collect the process exit codes and close the information pipe.
    """
    try:
        pid, status = os.waitpid(-1, os.WNOHANG)
        if pid == 0 or os.WIFSTOPPED(status):
            return None
    except OSError:
        return None

    workerlog_write("Exit code of %s for pid %s\n" % (status, pid))

    if os.WIFEXITED(status):
        status = os.WEXITSTATUS(status)
    elif os.WIFSIGNALED(status):
        # Per shell conventions for $?, when a process exits due to
        # a signal, we return an exit code of 128 + SIGNUM
        status = 128 + os.WTERMSIG(status)

    task = self.build_pids[pid]
    del self.build_pids[pid]

    self.build_pipes[pid].close()
    del self.build_pipes[pid]

    worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")

def handle_finishnow(self, _):
    if self.build_pids:
        logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
        for k, v in self.build_pids.iteritems():
            try:
                os.kill(-k, signal.SIGTERM)
                os.waitpid(-1, 0)
            except:
                pass
    for pipe in self.build_pipes:
        self.build_pipes[pipe].read()

try: worker = BitbakeWorker(sys.stdin) worker.serve() except BaseException as e: if not normalexit: import traceback sys.stderr.write(traceback.format_exc()) sys.stderr.write(str(e)) while len(worker_queue): worker_flush() workerlog_write("exitting") sys.exit(0)