mirror of
https://git.yoctoproject.org/poky
synced 2026-02-08 18:02:12 +01:00
In the forked child, we may use multiprocessing. There is only one event pipe to the worker controlling process and if we're unlucky, multiple processes can write to it at once corrupting the data by intermixing it. We don't see this often but when we do, its quite puzzling. I suspect it only happens in tasks which use multiprocessng (do_rootfs, do_package) and is much more likely to happen when we have long messages, usually many times PAGE_SIZE since PAGE_SIZE writes are atomic. This makes it much more likely within do_roofs, when for example a subprocess lists the contents of a rootfs. To fix this, we give each child a Lock() object and use this to serialise writes to the controlling worker. (Bitbake rev: 3cb55bdf06148943960e438291f9a562857340a3) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
442 lines
15 KiB
Python
Executable File
442 lines
15 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import os
|
|
import sys
|
|
import warnings
|
|
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
|
|
from bb import fetch2
|
|
import logging
|
|
import bb
|
|
import select
|
|
import errno
|
|
import signal
|
|
from multiprocessing import Lock
|
|
|
|
# Users shouldn't be running this code directly
|
|
if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
|
|
print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
|
|
sys.exit(1)
|
|
|
|
profiling = False
|
|
if sys.argv[1] == "decafbadbad":
|
|
profiling = True
|
|
try:
|
|
import cProfile as profile
|
|
except:
|
|
import profile
|
|
|
|
# Unbuffer stdout to avoid log truncation in the event
|
|
# of an unorderly exit as well as to provide timely
|
|
# updates to log files for use with tail
|
|
try:
|
|
if sys.stdout.name == '<stdout>':
|
|
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
|
|
except:
|
|
pass
|
|
|
|
logger = logging.getLogger("BitBake")
|
|
|
|
try:
|
|
import cPickle as pickle
|
|
except ImportError:
|
|
import pickle
|
|
bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
|
|
|
|
|
|
worker_pipe = sys.stdout.fileno()
|
|
bb.utils.nonblockingfd(worker_pipe)
|
|
# Need to guard against multiprocessing being used in child processes
|
|
# and multiple processes trying to write to the parent at the same time
|
|
worker_pipe_lock = None
|
|
|
|
handler = bb.event.LogHandler()
|
|
logger.addHandler(handler)
|
|
|
|
if 0:
|
|
# Code to write out a log file of all events passing through the worker
|
|
logfilename = "/tmp/workerlogfile"
|
|
format_str = "%(levelname)s: %(message)s"
|
|
conlogformat = bb.msg.BBLogFormatter(format_str)
|
|
consolelog = logging.FileHandler(logfilename)
|
|
bb.msg.addDefaultlogFilter(consolelog)
|
|
consolelog.setFormatter(conlogformat)
|
|
logger.addHandler(consolelog)
|
|
|
|
worker_queue = ""
|
|
|
|
def worker_fire(event, d):
|
|
data = "<event>" + pickle.dumps(event) + "</event>"
|
|
worker_fire_prepickled(data)
|
|
|
|
def worker_fire_prepickled(event):
|
|
global worker_queue
|
|
|
|
worker_queue = worker_queue + event
|
|
worker_flush()
|
|
|
|
def worker_flush():
|
|
global worker_queue, worker_pipe
|
|
|
|
if not worker_queue:
|
|
return
|
|
|
|
try:
|
|
written = os.write(worker_pipe, worker_queue)
|
|
worker_queue = worker_queue[written:]
|
|
except (IOError, OSError) as e:
|
|
if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
|
|
raise
|
|
|
|
def worker_child_fire(event, d):
|
|
global worker_pipe
|
|
global worker_pipe_lock
|
|
|
|
data = "<event>" + pickle.dumps(event) + "</event>"
|
|
try:
|
|
worker_pipe_lock.acquire()
|
|
worker_pipe.write(data)
|
|
worker_pipe_lock.release()
|
|
except IOError:
|
|
sigterm_handler(None, None)
|
|
raise
|
|
|
|
bb.event.worker_fire = worker_fire
|
|
|
|
lf = None
|
|
#lf = open("/tmp/workercommandlog", "w+")
|
|
def workerlog_write(msg):
|
|
if lf:
|
|
lf.write(msg)
|
|
lf.flush()
|
|
|
|
def sigterm_handler(signum, frame):
|
|
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
|
os.killpg(0, signal.SIGTERM)
|
|
sys.exit()
|
|
|
|
def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
|
|
# We need to setup the environment BEFORE the fork, since
|
|
# a fork() or exec*() activates PSEUDO...
|
|
|
|
envbackup = {}
|
|
fakeenv = {}
|
|
umask = None
|
|
|
|
taskdep = workerdata["taskdeps"][fn]
|
|
if 'umask' in taskdep and taskname in taskdep['umask']:
|
|
# umask might come in as a number or text string..
|
|
try:
|
|
umask = int(taskdep['umask'][taskname],8)
|
|
except TypeError:
|
|
umask = taskdep['umask'][taskname]
|
|
|
|
# We can't use the fakeroot environment in a dry run as it possibly hasn't been built
|
|
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
|
|
envvars = (workerdata["fakerootenv"][fn] or "").split()
|
|
for key, value in (var.split('=') for var in envvars):
|
|
envbackup[key] = os.environ.get(key)
|
|
os.environ[key] = value
|
|
fakeenv[key] = value
|
|
|
|
fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
|
|
for p in fakedirs:
|
|
bb.utils.mkdirhier(p)
|
|
logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
|
|
(fn, taskname, ', '.join(fakedirs)))
|
|
else:
|
|
envvars = (workerdata["fakerootnoenv"][fn] or "").split()
|
|
for key, value in (var.split('=') for var in envvars):
|
|
envbackup[key] = os.environ.get(key)
|
|
os.environ[key] = value
|
|
fakeenv[key] = value
|
|
|
|
sys.stdout.flush()
|
|
sys.stderr.flush()
|
|
|
|
try:
|
|
pipein, pipeout = os.pipe()
|
|
pipein = os.fdopen(pipein, 'rb', 4096)
|
|
pipeout = os.fdopen(pipeout, 'wb', 0)
|
|
pid = os.fork()
|
|
except OSError as e:
|
|
bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
|
|
|
|
if pid == 0:
|
|
def child():
|
|
global worker_pipe
|
|
global worker_pipe_lock
|
|
pipein.close()
|
|
|
|
signal.signal(signal.SIGTERM, sigterm_handler)
|
|
# Let SIGHUP exit as SIGTERM
|
|
signal.signal(signal.SIGHUP, sigterm_handler)
|
|
bb.utils.signal_on_parent_exit("SIGTERM")
|
|
|
|
# Save out the PID so that the event can include it the
|
|
# events
|
|
bb.event.worker_pid = os.getpid()
|
|
bb.event.worker_fire = worker_child_fire
|
|
worker_pipe = pipeout
|
|
worker_pipe_lock = Lock()
|
|
|
|
# Make the child the process group leader and ensure no
|
|
# child process will be controlled by the current terminal
|
|
# This ensures signals sent to the controlling terminal like Ctrl+C
|
|
# don't stop the child processes.
|
|
os.setsid()
|
|
# No stdin
|
|
newsi = os.open(os.devnull, os.O_RDWR)
|
|
os.dup2(newsi, sys.stdin.fileno())
|
|
|
|
if umask:
|
|
os.umask(umask)
|
|
|
|
data.setVar("BB_WORKERCONTEXT", "1")
|
|
data.setVar("BB_TASKDEPDATA", taskdepdata)
|
|
data.setVar("BUILDNAME", workerdata["buildname"])
|
|
data.setVar("DATE", workerdata["date"])
|
|
data.setVar("TIME", workerdata["time"])
|
|
bb.parse.siggen.set_taskdata(workerdata["sigdata"])
|
|
ret = 0
|
|
try:
|
|
the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
|
|
the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
|
|
|
|
# exported_vars() returns a generator which *cannot* be passed to os.environ.update()
|
|
# successfully. We also need to unset anything from the environment which shouldn't be there
|
|
exports = bb.data.exported_vars(the_data)
|
|
bb.utils.empty_environment()
|
|
for e, v in exports:
|
|
os.environ[e] = v
|
|
for e in fakeenv:
|
|
os.environ[e] = fakeenv[e]
|
|
the_data.setVar(e, fakeenv[e])
|
|
the_data.setVarFlag(e, 'export', "1")
|
|
|
|
if quieterrors:
|
|
the_data.setVarFlag(taskname, "quieterrors", "1")
|
|
|
|
except Exception as exc:
|
|
if not quieterrors:
|
|
logger.critical(str(exc))
|
|
os._exit(1)
|
|
try:
|
|
if cfg.dry_run:
|
|
return 0
|
|
return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
|
|
except:
|
|
os._exit(1)
|
|
if not profiling:
|
|
os._exit(child())
|
|
else:
|
|
profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
|
|
prof = profile.Profile()
|
|
try:
|
|
ret = profile.Profile.runcall(prof, child)
|
|
finally:
|
|
prof.dump_stats(profname)
|
|
bb.utils.process_profilelog(profname)
|
|
os._exit(ret)
|
|
else:
|
|
for key, value in envbackup.iteritems():
|
|
if value is None:
|
|
del os.environ[key]
|
|
else:
|
|
os.environ[key] = value
|
|
|
|
return pid, pipein, pipeout
|
|
|
|
class runQueueWorkerPipe():
|
|
"""
|
|
Abstraction for a pipe between a worker thread and the worker server
|
|
"""
|
|
def __init__(self, pipein, pipeout):
|
|
self.input = pipein
|
|
if pipeout:
|
|
pipeout.close()
|
|
bb.utils.nonblockingfd(self.input)
|
|
self.queue = ""
|
|
|
|
def read(self):
|
|
start = len(self.queue)
|
|
try:
|
|
self.queue = self.queue + self.input.read(102400)
|
|
except (OSError, IOError) as e:
|
|
if e.errno != errno.EAGAIN:
|
|
raise
|
|
|
|
end = len(self.queue)
|
|
index = self.queue.find("</event>")
|
|
while index != -1:
|
|
worker_fire_prepickled(self.queue[:index+8])
|
|
self.queue = self.queue[index+8:]
|
|
index = self.queue.find("</event>")
|
|
return (end > start)
|
|
|
|
def close(self):
|
|
while self.read():
|
|
continue
|
|
if len(self.queue) > 0:
|
|
print("Warning, worker child left partial message: %s" % self.queue)
|
|
self.input.close()
|
|
|
|
normalexit = False
|
|
|
|
class BitbakeWorker(object):
|
|
def __init__(self, din):
|
|
self.input = din
|
|
bb.utils.nonblockingfd(self.input)
|
|
self.queue = ""
|
|
self.cookercfg = None
|
|
self.databuilder = None
|
|
self.data = None
|
|
self.build_pids = {}
|
|
self.build_pipes = {}
|
|
|
|
signal.signal(signal.SIGTERM, self.sigterm_exception)
|
|
# Let SIGHUP exit as SIGTERM
|
|
signal.signal(signal.SIGHUP, self.sigterm_exception)
|
|
|
|
def sigterm_exception(self, signum, stackframe):
|
|
if signum == signal.SIGTERM:
|
|
bb.warn("Worker recieved SIGTERM, shutting down...")
|
|
elif signum == signal.SIGHUP:
|
|
bb.warn("Worker recieved SIGHUP, shutting down...")
|
|
self.handle_finishnow(None)
|
|
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
|
os.kill(os.getpid(), signal.SIGTERM)
|
|
|
|
def serve(self):
|
|
while True:
|
|
(ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
|
|
if self.input in ready:
|
|
try:
|
|
r = self.input.read()
|
|
if len(r) == 0:
|
|
# EOF on pipe, server must have terminated
|
|
self.sigterm_exception(signal.SIGTERM, None)
|
|
self.queue = self.queue + r
|
|
except (OSError, IOError):
|
|
pass
|
|
if len(self.queue):
|
|
self.handle_item("cookerconfig", self.handle_cookercfg)
|
|
self.handle_item("workerdata", self.handle_workerdata)
|
|
self.handle_item("runtask", self.handle_runtask)
|
|
self.handle_item("finishnow", self.handle_finishnow)
|
|
self.handle_item("ping", self.handle_ping)
|
|
self.handle_item("quit", self.handle_quit)
|
|
|
|
for pipe in self.build_pipes:
|
|
self.build_pipes[pipe].read()
|
|
if len(self.build_pids):
|
|
self.process_waitpid()
|
|
worker_flush()
|
|
|
|
|
|
def handle_item(self, item, func):
|
|
if self.queue.startswith("<" + item + ">"):
|
|
index = self.queue.find("</" + item + ">")
|
|
while index != -1:
|
|
func(self.queue[(len(item) + 2):index])
|
|
self.queue = self.queue[(index + len(item) + 3):]
|
|
index = self.queue.find("</" + item + ">")
|
|
|
|
def handle_cookercfg(self, data):
|
|
self.cookercfg = pickle.loads(data)
|
|
self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
|
|
self.databuilder.parseBaseConfiguration()
|
|
self.data = self.databuilder.data
|
|
|
|
def handle_workerdata(self, data):
|
|
self.workerdata = pickle.loads(data)
|
|
bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
|
|
bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
|
|
bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
|
|
bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
|
|
self.data.setVar("PRSERV_HOST", self.workerdata["prhost"])
|
|
|
|
def handle_ping(self, _):
|
|
workerlog_write("Handling ping\n")
|
|
|
|
logger.warn("Pong from bitbake-worker!")
|
|
|
|
def handle_quit(self, data):
|
|
workerlog_write("Handling quit\n")
|
|
|
|
global normalexit
|
|
normalexit = True
|
|
sys.exit(0)
|
|
|
|
def handle_runtask(self, data):
|
|
fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
|
|
workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
|
|
|
|
pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
|
|
|
|
self.build_pids[pid] = task
|
|
self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
|
|
|
|
def process_waitpid(self):
|
|
"""
|
|
Return none is there are no processes awaiting result collection, otherwise
|
|
collect the process exit codes and close the information pipe.
|
|
"""
|
|
try:
|
|
pid, status = os.waitpid(-1, os.WNOHANG)
|
|
if pid == 0 or os.WIFSTOPPED(status):
|
|
return None
|
|
except OSError:
|
|
return None
|
|
|
|
workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
|
|
|
|
if os.WIFEXITED(status):
|
|
status = os.WEXITSTATUS(status)
|
|
elif os.WIFSIGNALED(status):
|
|
# Per shell conventions for $?, when a process exits due to
|
|
# a signal, we return an exit code of 128 + SIGNUM
|
|
status = 128 + os.WTERMSIG(status)
|
|
|
|
task = self.build_pids[pid]
|
|
del self.build_pids[pid]
|
|
|
|
self.build_pipes[pid].close()
|
|
del self.build_pipes[pid]
|
|
|
|
worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
|
|
|
|
def handle_finishnow(self, _):
|
|
if self.build_pids:
|
|
logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
|
|
for k, v in self.build_pids.iteritems():
|
|
try:
|
|
os.kill(-k, signal.SIGTERM)
|
|
os.waitpid(-1, 0)
|
|
except:
|
|
pass
|
|
for pipe in self.build_pipes:
|
|
self.build_pipes[pipe].read()
|
|
|
|
try:
|
|
worker = BitbakeWorker(sys.stdin)
|
|
if not profiling:
|
|
worker.serve()
|
|
else:
|
|
profname = "profile-worker.log"
|
|
prof = profile.Profile()
|
|
try:
|
|
profile.Profile.runcall(prof, worker.serve)
|
|
finally:
|
|
prof.dump_stats(profname)
|
|
bb.utils.process_profilelog(profname)
|
|
except BaseException as e:
|
|
if not normalexit:
|
|
import traceback
|
|
sys.stderr.write(traceback.format_exc())
|
|
sys.stderr.write(str(e))
|
|
while len(worker_queue):
|
|
worker_flush()
|
|
workerlog_write("exitting")
|
|
sys.exit(0)
|
|
|