Files
poky/bitbake/bin/bitbake-worker
Ross Burton a5e95c2a85 bitbake: bitbake: be more explicit when warning about locale choice
(Bitbake rev: b3f7a75aeac31bc0afb7288fc54eb3929a8e1bae)

Signed-off-by: Ross Burton <ross.burton@intel.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
(cherry picked from commit 286dce008d6e0bd3121393b28ca02de1385519fb)
Signed-off-by: Armin Kuster <akuster808@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
2017-12-04 17:24:01 +00:00

18 KiB
Executable File

#!/usr/bin/env python3

import os import sys import warnings sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) from bb import fetch2 import logging import bb import select import errno import signal import pickle import traceback import queue from multiprocessing import Lock from threading import Thread

if sys.getfilesystemencoding() != "utf-8": sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.")

Users shouldn't be running this code directly

if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") sys.exit(1)

profiling = False if sys.argv[1].startswith("decafbadbad"): profiling = True try: import cProfile as profile except: import profile

Unbuffer stdout to avoid log truncation in the event

of an unorderly exit as well as to provide timely

updates to log files for use with tail

try: if sys.stdout.name == '': import fcntl fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) fl |= os.O_SYNC fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) except: pass

logger = logging.getLogger("BitBake")

worker_pipe = sys.stdout.fileno() bb.utils.nonblockingfd(worker_pipe)

Need to guard against multiprocessing being used in child processes

and multiple processes trying to write to the parent at the same time

worker_pipe_lock = None

handler = bb.event.LogHandler() logger.addHandler(handler)

if 0: # Code to write out a log file of all events passing through the worker logfilename = "/tmp/workerlogfile" format_str = "%(levelname)s: %(message)s" conlogformat = bb.msg.BBLogFormatter(format_str) consolelog = logging.FileHandler(logfilename) bb.msg.addDefaultlogFilter(consolelog) consolelog.setFormatter(conlogformat) logger.addHandler(consolelog)

worker_queue = queue.Queue()

def worker_fire(event, d): data = b"" + pickle.dumps(event) + b"" worker_fire_prepickled(data)

def worker_fire_prepickled(event): global worker_queue

worker_queue.put(event)

We can end up with write contention with the cooker, it can be trying to send commands

and we can be trying to send event data back. Therefore use a separate thread for writing

back data to cooker.

worker_thread_exit = False

def worker_flush(worker_queue): worker_queue_int = b"" global worker_pipe, worker_thread_exit

while True:
    try:
        worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
    except queue.Empty:
        pass
    while (worker_queue_int or not worker_queue.empty()):
        try:
            (_, ready, _) = select.select([], [worker_pipe], [], 1)
            if not worker_queue.empty():
                worker_queue_int = worker_queue_int + worker_queue.get()
            written = os.write(worker_pipe, worker_queue_int)
            worker_queue_int = worker_queue_int[written:]
        except (IOError, OSError) as e:
            if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
                raise
    if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
        return

worker_thread = Thread(target=worker_flush, args=(worker_queue,)) worker_thread.start()

def worker_child_fire(event, d): global worker_pipe global worker_pipe_lock

data = b"<event>" + pickle.dumps(event) + b"</event>"
try:
    worker_pipe_lock.acquire()
    worker_pipe.write(data)
    worker_pipe_lock.release()
except IOError:
    sigterm_handler(None, None)
    raise

bb.event.worker_fire = worker_fire

lf = None #lf = open("/tmp/workercommandlog", "w+") def workerlog_write(msg): if lf: lf.write(msg) lf.flush()

def sigterm_handler(signum, frame): signal.signal(signal.SIGTERM, signal.SIG_DFL) os.killpg(0, signal.SIGTERM) sys.exit()

def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False): # We need to setup the environment BEFORE the fork, since # a fork() or exec*() activates PSEUDO...

envbackup = {}
fakeenv = {}
umask = None

taskdep = workerdata["taskdeps"][fn]
if 'umask' in taskdep and taskname in taskdep['umask']:
    # umask might come in as a number or text string..
    try:
         umask = int(taskdep['umask'][taskname],8)
    except TypeError:
         umask = taskdep['umask'][taskname]

dry_run = cfg.dry_run or dry_run_exec

# We can't use the fakeroot environment in a dry run as it possibly hasn't been built
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
    envvars = (workerdata["fakerootenv"][fn] or "").split()
    for key, value in (var.split('=') for var in envvars):
        envbackup[key] = os.environ.get(key)
        os.environ[key] = value
        fakeenv[key] = value

    fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
    for p in fakedirs:
        bb.utils.mkdirhier(p)
    logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
                    (fn, taskname, ', '.join(fakedirs)))
else:
    envvars = (workerdata["fakerootnoenv"][fn] or "").split()
    for key, value in (var.split('=') for var in envvars):
        envbackup[key] = os.environ.get(key)
        os.environ[key] = value
        fakeenv[key] = value

sys.stdout.flush()
sys.stderr.flush()

try:
    pipein, pipeout = os.pipe()
    pipein = os.fdopen(pipein, 'rb', 4096)
    pipeout = os.fdopen(pipeout, 'wb', 0)
    pid = os.fork()
except OSError as e:
    logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
    sys.exit(1)

if pid == 0:
    def child():
        global worker_pipe
        global worker_pipe_lock
        pipein.close()

        signal.signal(signal.SIGTERM, sigterm_handler)
        # Let SIGHUP exit as SIGTERM
        signal.signal(signal.SIGHUP, sigterm_handler)
        bb.utils.signal_on_parent_exit("SIGTERM")

        # Save out the PID so that the event can include it the
        # events
        bb.event.worker_pid = os.getpid()
        bb.event.worker_fire = worker_child_fire
        worker_pipe = pipeout
        worker_pipe_lock = Lock()

        # Make the child the process group leader and ensure no
        # child process will be controlled by the current terminal
        # This ensures signals sent to the controlling terminal like Ctrl+C
        # don't stop the child processes.
        os.setsid()
        # No stdin
        newsi = os.open(os.devnull, os.O_RDWR)
        os.dup2(newsi, sys.stdin.fileno())

        if umask:
            os.umask(umask)

        try:
            bb_cache = bb.cache.NoCache(databuilder)
            (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
            the_data = databuilder.mcdata[mc]
            the_data.setVar("BB_WORKERCONTEXT", "1")
            the_data.setVar("BB_TASKDEPDATA", taskdepdata)
            if cfg.limited_deps:
                the_data.setVar("BB_LIMITEDDEPS", "1")
            the_data.setVar("BUILDNAME", workerdata["buildname"])
            the_data.setVar("DATE", workerdata["date"])
            the_data.setVar("TIME", workerdata["time"])
            for varname, value in extraconfigdata.items():
                the_data.setVar(varname, value)

            bb.parse.siggen.set_taskdata(workerdata["sigdata"])
            ret = 0

            the_data = bb_cache.loadDataFull(fn, appends)
            the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])

            bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))

            # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 
            # successfully. We also need to unset anything from the environment which shouldn't be there 
            exports = bb.data.exported_vars(the_data)

            bb.utils.empty_environment()
            for e, v in exports:
                os.environ[e] = v

            for e in fakeenv:
                os.environ[e] = fakeenv[e]
                the_data.setVar(e, fakeenv[e])
                the_data.setVarFlag(e, 'export', "1")

            task_exports = the_data.getVarFlag(taskname, 'exports')
            if task_exports:
                for e in task_exports.split():
                    the_data.setVarFlag(e, 'export', '1')
                    v = the_data.getVar(e)
                    if v is not None:
                        os.environ[e] = v

            if quieterrors:
                the_data.setVarFlag(taskname, "quieterrors", "1")

        except Exception:
            if not quieterrors:
                logger.critical(traceback.format_exc())
            os._exit(1)
        try:
            if dry_run:
                return 0
            return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
        except:
            os._exit(1)
    if not profiling:
        os._exit(child())
    else:
        profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
        prof = profile.Profile()
        try: 
            ret = profile.Profile.runcall(prof, child)
        finally:
            prof.dump_stats(profname)
            bb.utils.process_profilelog(profname)
            os._exit(ret)
else:
    for key, value in iter(envbackup.items()):
        if value is None:
            del os.environ[key]
        else:
            os.environ[key] = value

return pid, pipein, pipeout

class runQueueWorkerPipe(): """ Abstraction for a pipe between a worker thread and the worker server """ def init(self, pipein, pipeout): self.input = pipein if pipeout: pipeout.close() bb.utils.nonblockingfd(self.input) self.queue = b""

def read(self):
    start = len(self.queue)
    try:
        self.queue = self.queue + (self.input.read(102400) or b"")
    except (OSError, IOError) as e:
        if e.errno != errno.EAGAIN:
            raise

    end = len(self.queue)
    index = self.queue.find(b"</event>")
    while index != -1:
        worker_fire_prepickled(self.queue[:index+8])
        self.queue = self.queue[index+8:]
        index = self.queue.find(b"</event>")
    return (end > start)

def close(self):
    while self.read():
        continue
    if len(self.queue) > 0:
        print("Warning, worker child left partial message: %s" % self.queue)
    self.input.close()

normalexit = False

class BitbakeWorker(object): def init(self, din): self.input = din bb.utils.nonblockingfd(self.input) self.queue = b"" self.cookercfg = None self.databuilder = None self.data = None self.extraconfigdata = None self.build_pids = {} self.build_pipes = {}

    signal.signal(signal.SIGTERM, self.sigterm_exception)
    # Let SIGHUP exit as SIGTERM
    signal.signal(signal.SIGHUP, self.sigterm_exception)
    if "beef" in sys.argv[1]:
        bb.utils.set_process_name("Worker (Fakeroot)")
    else:
        bb.utils.set_process_name("Worker")

def sigterm_exception(self, signum, stackframe):
    if signum == signal.SIGTERM:
        bb.warn("Worker received SIGTERM, shutting down...")
    elif signum == signal.SIGHUP:
        bb.warn("Worker received SIGHUP, shutting down...")
    self.handle_finishnow(None)
    signal.signal(signal.SIGTERM, signal.SIG_DFL)
    os.kill(os.getpid(), signal.SIGTERM)

def serve(self):        
    while True:
        (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
        if self.input in ready:
            try:
                r = self.input.read()
                if len(r) == 0:
                    # EOF on pipe, server must have terminated
                    self.sigterm_exception(signal.SIGTERM, None)
                self.queue = self.queue + r
            except (OSError, IOError):
                pass
        if len(self.queue):
            self.handle_item(b"cookerconfig", self.handle_cookercfg)
            self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
            self.handle_item(b"workerdata", self.handle_workerdata)
            self.handle_item(b"runtask", self.handle_runtask)
            self.handle_item(b"finishnow", self.handle_finishnow)
            self.handle_item(b"ping", self.handle_ping)
            self.handle_item(b"quit", self.handle_quit)

        for pipe in self.build_pipes:
            if self.build_pipes[pipe].input in ready:
                self.build_pipes[pipe].read()
        if len(self.build_pids):
            while self.process_waitpid():
                continue


def handle_item(self, item, func):
    if self.queue.startswith(b"<" + item + b">"):
        index = self.queue.find(b"</" + item + b">")
        while index != -1:
            func(self.queue[(len(item) + 2):index])
            self.queue = self.queue[(index + len(item) + 3):]
            index = self.queue.find(b"</" + item + b">")

def handle_cookercfg(self, data):
    self.cookercfg = pickle.loads(data)
    self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
    self.databuilder.parseBaseConfiguration()
    self.data = self.databuilder.data

def handle_extraconfigdata(self, data):
    self.extraconfigdata = pickle.loads(data)

def handle_workerdata(self, data):
    self.workerdata = pickle.loads(data)
    bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
    bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
    bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
    bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
    for mc in self.databuilder.mcdata:
        self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])

def handle_ping(self, _):
    workerlog_write("Handling ping\n")

    logger.warning("Pong from bitbake-worker!")

def handle_quit(self, data):
    workerlog_write("Handling quit\n")

    global normalexit
    normalexit = True
    sys.exit(0)

def handle_runtask(self, data):
    fn, task, taskname, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data)
    workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))

    pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec)

    self.build_pids[pid] = task
    self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)

def process_waitpid(self):
    """
    Return none is there are no processes awaiting result collection, otherwise
    collect the process exit codes and close the information pipe.
    """
    try:
        pid, status = os.waitpid(-1, os.WNOHANG)
        if pid == 0 or os.WIFSTOPPED(status):
            return False
    except OSError:
        return False

    workerlog_write("Exit code of %s for pid %s\n" % (status, pid))

    if os.WIFEXITED(status):
        status = os.WEXITSTATUS(status)
    elif os.WIFSIGNALED(status):
        # Per shell conventions for $?, when a process exits due to
        # a signal, we return an exit code of 128 + SIGNUM
        status = 128 + os.WTERMSIG(status)

    task = self.build_pids[pid]
    del self.build_pids[pid]

    self.build_pipes[pid].close()
    del self.build_pipes[pid]

    worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")

    return True

def handle_finishnow(self, _):
    if self.build_pids:
        logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
        for k, v in iter(self.build_pids.items()):
            try:
                os.kill(-k, signal.SIGTERM)
                os.waitpid(-1, 0)
            except:
                pass
    for pipe in self.build_pipes:
        self.build_pipes[pipe].read()

try: worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) if not profiling: worker.serve() else: profname = "profile-worker.log" prof = profile.Profile() try: profile.Profile.runcall(prof, worker.serve) finally: prof.dump_stats(profname) bb.utils.process_profilelog(profname) except BaseException as e: if not normalexit: import traceback sys.stderr.write(traceback.format_exc()) sys.stderr.write(str(e))

worker_thread_exit = True worker_thread.join()

workerlog_write("exitting") sys.exit(0)