bitbake: runqueue: Enable dynamic task adjustment to hash equivalency

There is a compelling usecase for tasks being able to notify runqueue
that their "unihash" has changed. When this is recieved, the hashes of
all subsequent tasks should be recomputed and their new hashes checked
against existing setscene validity. Any newly available setscene tasks
should then be executed.

Making this work effectively needs several pieces. An event is added
which the cooker listen for. If a new hash becomes available it can
send an event to notify of this.

When such an event is seen, hash recomputations are made. A setscene
task can't be run until all the tasks it "covers" are stopped. The
notion of "holdoff" tasks is therefore added, these are removed from
the buildable list with the assumption that some setscene task will
run and cover them.

The workers need to be notified when taskhashes change to update their
own internal siggen data stores. A new worker command is added to do this
which will affect all newly spawned worker processes from that worker.

An example workflow which tests this code is:

Configuration:
BB_SIGNATURE_HANDLER = "OEEquivHash"
SSTATE_HASHEQUIV_SERVER = "http://localhost:8686"

$ bitbake-hashserv &
$ bitbake automake-native
$ bitbake autoconf-native automake-native -c clean
$ bitbake m4-native -c install -f
$ bitbake automake-native

with the test being whether automake-native is installed from sstate.

(Bitbake rev: 1f630fdf0260db08541d3ca9f25f852931c19905)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Richard Purdie
2019-07-23 22:51:15 +01:00
parent 40eb5b344b
commit 7df31ff368
2 changed files with 161 additions and 10 deletions

View File

@@ -234,6 +234,8 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
the_data.setVar(varname, value)
bb.parse.siggen.set_taskdata(workerdata["sigdata"])
if "newhashes" in workerdata:
bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
ret = 0
the_data = bb_cache.loadDataFull(fn, appends)
@@ -377,6 +379,7 @@ class BitbakeWorker(object):
self.handle_item(b"cookerconfig", self.handle_cookercfg)
self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
self.handle_item(b"workerdata", self.handle_workerdata)
self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
self.handle_item(b"runtask", self.handle_runtask)
self.handle_item(b"finishnow", self.handle_finishnow)
self.handle_item(b"ping", self.handle_ping)
@@ -416,6 +419,9 @@ class BitbakeWorker(object):
for mc in self.databuilder.mcdata:
self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
def handle_newtaskhashes(self, data):
self.workerdata["newhashes"] = pickle.loads(data)
def handle_ping(self, _):
workerlog_write("Handling ping\n")

View File

@@ -149,7 +149,7 @@ class RunQueueScheduler(object):
Return the id of the first task we find that is buildable
"""
self.buildable = [x for x in self.buildable if x not in self.rq.runq_running]
buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered)]
buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered) and x not in self.rq.holdoff_tasks]
if not buildable:
return None
@@ -206,6 +206,9 @@ class RunQueueScheduler(object):
def newbuildable(self, task):
self.buildable.append(task)
def removebuildable(self, task):
self.buildable.remove(task)
def describe_task(self, taskid):
result = 'ID %s' % taskid
if self.rev_prio_map:
@@ -1719,6 +1722,8 @@ class RunQueueExecute:
self.sq_running = set()
self.sq_live = set()
self.changed_setscene = set()
self.runq_buildable = set()
self.runq_running = set()
self.runq_complete = set()
@@ -1730,6 +1735,7 @@ class RunQueueExecute:
self.stampcache = {}
self.holdoff_tasks = set()
self.sqdone = False
self.stats = RunQueueStats(len(self.rqdata.runtaskentries))
@@ -1925,6 +1931,7 @@ class RunQueueExecute:
"""
self.rq.read_workers()
self.process_possible_migrations()
task = None
if not self.sqdone and self.can_start_task():
@@ -2007,7 +2014,7 @@ class RunQueueExecute:
if self.can_start_task():
return True
if not self.sq_live and not self.sqdone and not self.sq_deferred:
if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene and not self.holdoff_tasks:
logger.info("Setscene tasks completed")
logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
@@ -2167,6 +2174,131 @@ class RunQueueExecute:
#bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
return taskdepdata
def updated_taskhash(self, tid, unihash):
changed = set()
if unihash != self.rqdata.runtaskentries[tid].unihash:
logger.info("Task %s unihash changed to %s" % (tid, unihash))
self.rqdata.runtaskentries[tid].unihash = unihash
(mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
bb.parse.siggen.set_unihash(taskfn + "." + taskname, unihash)
# Work out all tasks which depend on this one
total = set()
next = set(self.rqdata.runtaskentries[tid].revdeps)
while next:
current = next.copy()
total = total |next
next = set()
for ntid in current:
next |= self.rqdata.runtaskentries[ntid].revdeps
next.difference_update(total)
# Now iterate those tasks in dependency order to regenerate their taskhash/unihash
done = set()
next = set(self.rqdata.runtaskentries[tid].revdeps)
while next:
current = next.copy()
next = set()
for tid in current:
if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
continue
procdep = []
for dep in self.rqdata.runtaskentries[tid].depends:
procdep.append(fn_from_tid(dep) + "." + taskname_from_tid(dep))
(mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
orighash = self.rqdata.runtaskentries[tid].hash
self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(taskfn, taskname, procdep, self.rqdata.dataCaches[mc])
origuni = self.rqdata.runtaskentries[tid].unihash
self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(taskfn + "." + taskname)
logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash))
next |= self.rqdata.runtaskentries[tid].revdeps
changed.add(tid)
total.remove(tid)
next.intersection_update(total)
if changed:
for mc in self.rq.worker:
self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
for mc in self.rq.fakeworker:
self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed)))
for tid in changed:
if tid not in self.rqdata.runq_setscene_tids:
continue
valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False)
if not valid:
continue
self.changed_setscene.add(tid)
if changed:
self.update_holdofftasks()
def update_holdofftasks(self):
self.holdoff_tasks = set(self.changed_setscene)
for tid in self.rqdata.runq_setscene_tids:
if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
self.holdoff_tasks.add(tid)
for tid in self.holdoff_tasks.copy():
for dep in self.sqdata.sq_covered_tasks[tid]:
if dep not in self.runq_complete:
self.holdoff_tasks.add(dep)
logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks))
def process_possible_migrations(self):
changes = False
for tid in self.changed_setscene.copy():
if tid in self.runq_running:
self.changed_setscene.remove(tid)
continue
valid = True
# Check no tasks this covers are running
for dep in self.sqdata.sq_covered_tasks[tid]:
if dep in self.runq_running and dep not in self.runq_complete:
logger.debug(2, "Task %s is running which blocks setscene for %s from running" % (dep, tid))
valid = False
break
if not valid:
continue
for dep in self.sqdata.sq_covered_tasks[tid]:
if dep not in self.runq_complete:
if dep in self.tasks_scenequeue_done:
self.tasks_scenequeue_done.remove(dep)
if dep in self.tasks_notcovered:
self.tasks_notcovered.remove(dep)
if tid in self.sq_buildable:
self.sq_buildable.remove(tid)
if tid in self.sq_running:
self.sq_running.remove(tid)
if self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
if tid not in self.sq_buildable:
self.sq_buildable.add(tid)
if tid in self.sqdata.outrightfail:
self.sqdata.outrightfail.remove(tid)
if tid in self.scenequeue_notcovered:
self.scenequeue_notcovered.remove(tid)
(mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True)
if tid in self.build_stamps:
del self.build_stamps[tid]
logger.info("Setscene task %s now valid and being rerun" % tid)
self.sqdone = False
self.changed_setscene.remove(tid)
changes = True
if changes:
self.update_holdofftasks()
def scenequeue_process_notcovered(self, task):
if len(self.rqdata.runtaskentries[task].depends) == 0:
self.setbuildable(task)
@@ -2194,7 +2326,7 @@ class RunQueueExecute:
for deptask in self.rqdata.runtaskentries[t].revdeps:
if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids:
continue
if self.rqdata.runtaskentries[deptask].depends.issubset(self.tasks_scenequeue_done):
if deptask in self.sqdata.unskippable:
new.add(deptask)
self.tasks_scenequeue_done.add(deptask)
self.tasks_notcovered.add(deptask)
@@ -2254,8 +2386,9 @@ class RunQueueExecute:
self.tasks_covered.update(covered)
self.coveredtopocess.remove(task)
for tid in covered:
if len(self.rqdata.runtaskentries[tid].depends) == 0:
if self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete):
self.setbuildable(tid)
self.update_holdofftasks()
def sq_task_completeoutright(self, task):
"""
@@ -2454,8 +2587,8 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
rqdata.init_progress_reporter.next_stage()
# Build a list of setscene tasks which are "unskippable"
# These are direct endpoints referenced by the build
# Build a list of tasks which are "unskippable"
# These are direct endpoints referenced by the build upto and including setscene tasks
# Take the build endpoints (no revdeps) and find the sstate tasks they depend upon
new = True
for tid in rqdata.runtaskentries:
@@ -2463,18 +2596,19 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
sqdata.unskippable.add(tid)
while new:
new = False
for tid in sqdata.unskippable.copy():
orig = sqdata.unskippable.copy()
for tid in orig:
if tid in rqdata.runq_setscene_tids:
continue
sqdata.unskippable.remove(tid)
if len(rqdata.runtaskentries[tid].depends) == 0:
# These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
sqrq.tasks_notcovered.add(tid)
sqrq.tasks_scenequeue_done.add(tid)
sqrq.setbuildable(tid)
sqrq.scenequeue_process_unskippable(tid)
sqrq.scenequeue_process_unskippable(tid)
sqdata.unskippable |= rqdata.runtaskentries[tid].depends
new = True
if sqdata.unskippable != orig:
new = True
rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries))
@@ -2710,6 +2844,15 @@ class runQueueTaskSkipped(runQueueEvent):
runQueueEvent.__init__(self, task, stats, rq)
self.reason = reason
class taskUniHashUpdate(bb.event.Event):
"""
Base runQueue event class
"""
def __init__(self, task, unihash):
self.taskid = task
self.unihash = unihash
bb.event.Event.__init__(self)
class runQueuePipe():
"""
Abstraction for a pipe between a worker thread and the server
@@ -2752,6 +2895,8 @@ class runQueuePipe():
except ValueError as e:
bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
bb.event.fire_from_worker(event, self.d)
if isinstance(event, taskUniHashUpdate):
self.rqexec.updated_taskhash(event.taskid, event.unihash)
found = True
self.queue = self.queue[index+8:]
index = self.queue.find(b"</event>")