Files
poky/bitbake/bin/bitbake-worker
Richard Purdie 654eadfa30 bitbake: bitbake: Update logger.warn() -> logger.warning()
python deprecated logger.warn() in favour of logger.warning(). This is only
used in bitbake code so we may as well just translate everything to avoid
warnings under python 3. Its safe for python 2.7.

(Bitbake rev: 676a5f592e8507e81b8f748d58acfea7572f8796)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
2016-05-11 10:34:30 +01:00

448 lines
15 KiB
Python
Executable File

#!/usr/bin/env python
import os
import sys
import warnings
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
from bb import fetch2
import logging
import bb
import select
import errno
import signal
from multiprocessing import Lock
# Users shouldn't be running this code directly
if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
sys.exit(1)
profiling = False
if sys.argv[1].startswith("decafbadbad"):
profiling = True
try:
import cProfile as profile
except:
import profile
# Unbuffer stdout to avoid log truncation in the event
# of an unorderly exit as well as to provide timely
# updates to log files for use with tail
try:
if sys.stdout.name == '<stdout>':
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
except:
pass
logger = logging.getLogger("BitBake")
try:
import cPickle as pickle
except ImportError:
import pickle
bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
worker_pipe = sys.stdout.fileno()
bb.utils.nonblockingfd(worker_pipe)
# Need to guard against multiprocessing being used in child processes
# and multiple processes trying to write to the parent at the same time
worker_pipe_lock = None
handler = bb.event.LogHandler()
logger.addHandler(handler)
if 0:
# Code to write out a log file of all events passing through the worker
logfilename = "/tmp/workerlogfile"
format_str = "%(levelname)s: %(message)s"
conlogformat = bb.msg.BBLogFormatter(format_str)
consolelog = logging.FileHandler(logfilename)
bb.msg.addDefaultlogFilter(consolelog)
consolelog.setFormatter(conlogformat)
logger.addHandler(consolelog)
worker_queue = ""
def worker_fire(event, d):
data = "<event>" + pickle.dumps(event) + "</event>"
worker_fire_prepickled(data)
def worker_fire_prepickled(event):
global worker_queue
worker_queue = worker_queue + event
worker_flush()
def worker_flush():
global worker_queue, worker_pipe
if not worker_queue:
return
try:
written = os.write(worker_pipe, worker_queue)
worker_queue = worker_queue[written:]
except (IOError, OSError) as e:
if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
raise
def worker_child_fire(event, d):
global worker_pipe
global worker_pipe_lock
data = "<event>" + pickle.dumps(event) + "</event>"
try:
worker_pipe_lock.acquire()
worker_pipe.write(data)
worker_pipe_lock.release()
except IOError:
sigterm_handler(None, None)
raise
bb.event.worker_fire = worker_fire
lf = None
#lf = open("/tmp/workercommandlog", "w+")
def workerlog_write(msg):
if lf:
lf.write(msg)
lf.flush()
def sigterm_handler(signum, frame):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
os.killpg(0, signal.SIGTERM)
sys.exit()
def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
# We need to setup the environment BEFORE the fork, since
# a fork() or exec*() activates PSEUDO...
envbackup = {}
fakeenv = {}
umask = None
taskdep = workerdata["taskdeps"][fn]
if 'umask' in taskdep and taskname in taskdep['umask']:
# umask might come in as a number or text string..
try:
umask = int(taskdep['umask'][taskname],8)
except TypeError:
umask = taskdep['umask'][taskname]
# We can't use the fakeroot environment in a dry run as it possibly hasn't been built
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
envvars = (workerdata["fakerootenv"][fn] or "").split()
for key, value in (var.split('=') for var in envvars):
envbackup[key] = os.environ.get(key)
os.environ[key] = value
fakeenv[key] = value
fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
for p in fakedirs:
bb.utils.mkdirhier(p)
logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
(fn, taskname, ', '.join(fakedirs)))
else:
envvars = (workerdata["fakerootnoenv"][fn] or "").split()
for key, value in (var.split('=') for var in envvars):
envbackup[key] = os.environ.get(key)
os.environ[key] = value
fakeenv[key] = value
sys.stdout.flush()
sys.stderr.flush()
try:
pipein, pipeout = os.pipe()
pipein = os.fdopen(pipein, 'rb', 4096)
pipeout = os.fdopen(pipeout, 'wb', 0)
pid = os.fork()
except OSError as e:
bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
if pid == 0:
def child():
global worker_pipe
global worker_pipe_lock
pipein.close()
signal.signal(signal.SIGTERM, sigterm_handler)
# Let SIGHUP exit as SIGTERM
signal.signal(signal.SIGHUP, sigterm_handler)
bb.utils.signal_on_parent_exit("SIGTERM")
# Save out the PID so that the event can include it the
# events
bb.event.worker_pid = os.getpid()
bb.event.worker_fire = worker_child_fire
worker_pipe = pipeout
worker_pipe_lock = Lock()
# Make the child the process group leader and ensure no
# child process will be controlled by the current terminal
# This ensures signals sent to the controlling terminal like Ctrl+C
# don't stop the child processes.
os.setsid()
# No stdin
newsi = os.open(os.devnull, os.O_RDWR)
os.dup2(newsi, sys.stdin.fileno())
if umask:
os.umask(umask)
data.setVar("BB_WORKERCONTEXT", "1")
data.setVar("BB_TASKDEPDATA", taskdepdata)
data.setVar("BUILDNAME", workerdata["buildname"])
data.setVar("DATE", workerdata["date"])
data.setVar("TIME", workerdata["time"])
bb.parse.siggen.set_taskdata(workerdata["sigdata"])
ret = 0
try:
the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN", True), taskname.replace("do_", "")))
# exported_vars() returns a generator which *cannot* be passed to os.environ.update()
# successfully. We also need to unset anything from the environment which shouldn't be there
exports = bb.data.exported_vars(the_data)
bb.utils.empty_environment()
for e, v in exports:
os.environ[e] = v
for e in fakeenv:
os.environ[e] = fakeenv[e]
the_data.setVar(e, fakeenv[e])
the_data.setVarFlag(e, 'export', "1")
if quieterrors:
the_data.setVarFlag(taskname, "quieterrors", "1")
except Exception as exc:
if not quieterrors:
logger.critical(str(exc))
os._exit(1)
try:
if cfg.dry_run:
return 0
return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
except:
os._exit(1)
if not profiling:
os._exit(child())
else:
profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
prof = profile.Profile()
try:
ret = profile.Profile.runcall(prof, child)
finally:
prof.dump_stats(profname)
bb.utils.process_profilelog(profname)
os._exit(ret)
else:
for key, value in envbackup.iteritems():
if value is None:
del os.environ[key]
else:
os.environ[key] = value
return pid, pipein, pipeout
class runQueueWorkerPipe():
"""
Abstraction for a pipe between a worker thread and the worker server
"""
def __init__(self, pipein, pipeout):
self.input = pipein
if pipeout:
pipeout.close()
bb.utils.nonblockingfd(self.input)
self.queue = ""
def read(self):
start = len(self.queue)
try:
self.queue = self.queue + self.input.read(102400)
except (OSError, IOError) as e:
if e.errno != errno.EAGAIN:
raise
end = len(self.queue)
index = self.queue.find("</event>")
while index != -1:
worker_fire_prepickled(self.queue[:index+8])
self.queue = self.queue[index+8:]
index = self.queue.find("</event>")
return (end > start)
def close(self):
while self.read():
continue
if len(self.queue) > 0:
print("Warning, worker child left partial message: %s" % self.queue)
self.input.close()
normalexit = False
class BitbakeWorker(object):
def __init__(self, din):
self.input = din
bb.utils.nonblockingfd(self.input)
self.queue = ""
self.cookercfg = None
self.databuilder = None
self.data = None
self.build_pids = {}
self.build_pipes = {}
signal.signal(signal.SIGTERM, self.sigterm_exception)
# Let SIGHUP exit as SIGTERM
signal.signal(signal.SIGHUP, self.sigterm_exception)
if "beef" in sys.argv[1]:
bb.utils.set_process_name("Worker (Fakeroot)")
else:
bb.utils.set_process_name("Worker")
def sigterm_exception(self, signum, stackframe):
if signum == signal.SIGTERM:
bb.warn("Worker received SIGTERM, shutting down...")
elif signum == signal.SIGHUP:
bb.warn("Worker received SIGHUP, shutting down...")
self.handle_finishnow(None)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
os.kill(os.getpid(), signal.SIGTERM)
def serve(self):
while True:
(ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
if self.input in ready:
try:
r = self.input.read()
if len(r) == 0:
# EOF on pipe, server must have terminated
self.sigterm_exception(signal.SIGTERM, None)
self.queue = self.queue + r
except (OSError, IOError):
pass
if len(self.queue):
self.handle_item("cookerconfig", self.handle_cookercfg)
self.handle_item("workerdata", self.handle_workerdata)
self.handle_item("runtask", self.handle_runtask)
self.handle_item("finishnow", self.handle_finishnow)
self.handle_item("ping", self.handle_ping)
self.handle_item("quit", self.handle_quit)
for pipe in self.build_pipes:
self.build_pipes[pipe].read()
if len(self.build_pids):
self.process_waitpid()
worker_flush()
def handle_item(self, item, func):
if self.queue.startswith("<" + item + ">"):
index = self.queue.find("</" + item + ">")
while index != -1:
func(self.queue[(len(item) + 2):index])
self.queue = self.queue[(index + len(item) + 3):]
index = self.queue.find("</" + item + ">")
def handle_cookercfg(self, data):
self.cookercfg = pickle.loads(data)
self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
self.databuilder.parseBaseConfiguration()
self.data = self.databuilder.data
def handle_workerdata(self, data):
self.workerdata = pickle.loads(data)
bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
self.data.setVar("PRSERV_HOST", self.workerdata["prhost"])
def handle_ping(self, _):
workerlog_write("Handling ping\n")
logger.warning("Pong from bitbake-worker!")
def handle_quit(self, data):
workerlog_write("Handling quit\n")
global normalexit
normalexit = True
sys.exit(0)
def handle_runtask(self, data):
fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
self.build_pids[pid] = task
self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
def process_waitpid(self):
"""
Return none is there are no processes awaiting result collection, otherwise
collect the process exit codes and close the information pipe.
"""
try:
pid, status = os.waitpid(-1, os.WNOHANG)
if pid == 0 or os.WIFSTOPPED(status):
return None
except OSError:
return None
workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
if os.WIFEXITED(status):
status = os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
# Per shell conventions for $?, when a process exits due to
# a signal, we return an exit code of 128 + SIGNUM
status = 128 + os.WTERMSIG(status)
task = self.build_pids[pid]
del self.build_pids[pid]
self.build_pipes[pid].close()
del self.build_pipes[pid]
worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
def handle_finishnow(self, _):
if self.build_pids:
logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
for k, v in self.build_pids.iteritems():
try:
os.kill(-k, signal.SIGTERM)
os.waitpid(-1, 0)
except:
pass
for pipe in self.build_pipes:
self.build_pipes[pipe].read()
try:
worker = BitbakeWorker(sys.stdin)
if not profiling:
worker.serve()
else:
profname = "profile-worker.log"
prof = profile.Profile()
try:
profile.Profile.runcall(prof, worker.serve)
finally:
prof.dump_stats(profname)
bb.utils.process_profilelog(profname)
except BaseException as e:
if not normalexit:
import traceback
sys.stderr.write(traceback.format_exc())
sys.stderr.write(str(e))
while len(worker_queue):
worker_flush()
workerlog_write("exitting")
sys.exit(0)