mirror of
https://git.yoctoproject.org/poky
synced 2026-04-17 00:32:13 +02:00
bitbake: runqueue: Fix event timing race
The event from the task notifiing of hash equivalency should only be processed when the task completes. This can otherwise result in a race where a dependent task may run before the original task completes causing various failures. To make this work reliably, the code had to be restructured quite a bit. (Bitbake rev: 1bf5be46f92f125193638cf41ff207d68f592259) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
@@ -1696,7 +1696,8 @@ class RunQueueExecute:
|
||||
self.sq_running = set()
|
||||
self.sq_live = set()
|
||||
|
||||
self.changed_setscene = set()
|
||||
self.updated_taskhash_queue = []
|
||||
self.pending_migrations = set()
|
||||
|
||||
self.runq_buildable = set()
|
||||
self.runq_running = set()
|
||||
@@ -1910,8 +1911,8 @@ class RunQueueExecute:
|
||||
if self.sq_deferred:
|
||||
logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred))
|
||||
err = True
|
||||
if self.changed_setscene:
|
||||
logger.error("Scenequeue had unprocessed changed entries: %s" % pprint.pformat(self.changed_setscene))
|
||||
if self.updated_taskhash_queue:
|
||||
logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue))
|
||||
err = True
|
||||
if self.holdoff_tasks:
|
||||
logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks))
|
||||
@@ -2023,7 +2024,7 @@ class RunQueueExecute:
|
||||
if self.can_start_task():
|
||||
return True
|
||||
|
||||
if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene and not self.holdoff_tasks:
|
||||
if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks:
|
||||
logger.info("Setscene tasks completed")
|
||||
|
||||
err = self.summarise_scenequeue_errors()
|
||||
@@ -2177,65 +2178,6 @@ class RunQueueExecute:
|
||||
#bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
|
||||
return taskdepdata
|
||||
|
||||
def updated_taskhash(self, tid, unihash):
|
||||
changed = set()
|
||||
if unihash != self.rqdata.runtaskentries[tid].unihash:
|
||||
logger.info("Task %s unihash changed to %s" % (tid, unihash))
|
||||
self.rqdata.runtaskentries[tid].unihash = unihash
|
||||
bb.parse.siggen.set_unihash(tid, unihash)
|
||||
|
||||
# Work out all tasks which depend on this one
|
||||
total = set()
|
||||
next = set(self.rqdata.runtaskentries[tid].revdeps)
|
||||
while next:
|
||||
current = next.copy()
|
||||
total = total |next
|
||||
next = set()
|
||||
for ntid in current:
|
||||
next |= self.rqdata.runtaskentries[ntid].revdeps
|
||||
next.difference_update(total)
|
||||
|
||||
# Now iterate those tasks in dependency order to regenerate their taskhash/unihash
|
||||
done = set()
|
||||
next = set(self.rqdata.runtaskentries[tid].revdeps)
|
||||
while next:
|
||||
current = next.copy()
|
||||
next = set()
|
||||
for tid in current:
|
||||
if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
|
||||
continue
|
||||
procdep = []
|
||||
for dep in self.rqdata.runtaskentries[tid].depends:
|
||||
procdep.append(dep)
|
||||
orighash = self.rqdata.runtaskentries[tid].hash
|
||||
self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, procdep, self.rqdata.dataCaches[mc_from_tid(tid)])
|
||||
origuni = self.rqdata.runtaskentries[tid].unihash
|
||||
self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid)
|
||||
logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash))
|
||||
next |= self.rqdata.runtaskentries[tid].revdeps
|
||||
changed.add(tid)
|
||||
total.remove(tid)
|
||||
next.intersection_update(total)
|
||||
|
||||
if changed:
|
||||
for mc in self.rq.worker:
|
||||
self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
|
||||
for mc in self.rq.fakeworker:
|
||||
self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
|
||||
|
||||
logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed)))
|
||||
|
||||
for tid in changed:
|
||||
if tid not in self.rqdata.runq_setscene_tids:
|
||||
continue
|
||||
valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False)
|
||||
if not valid:
|
||||
continue
|
||||
self.changed_setscene.add(tid)
|
||||
|
||||
if changed:
|
||||
self.update_holdofftasks()
|
||||
|
||||
def update_holdofftasks(self):
|
||||
self.holdoff_tasks = set()
|
||||
|
||||
@@ -2249,13 +2191,74 @@ class RunQueueExecute:
|
||||
self.holdoff_tasks.add(dep)
|
||||
logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks))
|
||||
|
||||
|
||||
def process_possible_migrations(self):
|
||||
changes = False
|
||||
for tid in self.changed_setscene.copy():
|
||||
if tid in self.runq_running:
|
||||
self.changed_setscene.remove(tid)
|
||||
|
||||
changed = set()
|
||||
for tid, unihash in self.updated_taskhash_queue.copy():
|
||||
if tid in self.runq_running and tid not in self.runq_complete:
|
||||
continue
|
||||
|
||||
self.updated_taskhash_queue.remove((tid, unihash))
|
||||
|
||||
if unihash != self.rqdata.runtaskentries[tid].unihash:
|
||||
logger.info("Task %s unihash changed to %s" % (tid, unihash))
|
||||
self.rqdata.runtaskentries[tid].unihash = unihash
|
||||
bb.parse.siggen.set_unihash(tid, unihash)
|
||||
|
||||
# Work out all tasks which depend on this one
|
||||
total = set()
|
||||
next = set(self.rqdata.runtaskentries[tid].revdeps)
|
||||
while next:
|
||||
current = next.copy()
|
||||
total = total |next
|
||||
next = set()
|
||||
for ntid in current:
|
||||
next |= self.rqdata.runtaskentries[ntid].revdeps
|
||||
next.difference_update(total)
|
||||
|
||||
# Now iterate those tasks in dependency order to regenerate their taskhash/unihash
|
||||
done = set()
|
||||
next = set(self.rqdata.runtaskentries[tid].revdeps)
|
||||
while next:
|
||||
current = next.copy()
|
||||
next = set()
|
||||
for tid in current:
|
||||
if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
|
||||
continue
|
||||
procdep = []
|
||||
for dep in self.rqdata.runtaskentries[tid].depends:
|
||||
procdep.append(dep)
|
||||
orighash = self.rqdata.runtaskentries[tid].hash
|
||||
self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, procdep, self.rqdata.dataCaches[mc_from_tid(tid)])
|
||||
origuni = self.rqdata.runtaskentries[tid].unihash
|
||||
self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid)
|
||||
logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash))
|
||||
next |= self.rqdata.runtaskentries[tid].revdeps
|
||||
changed.add(tid)
|
||||
total.remove(tid)
|
||||
next.intersection_update(total)
|
||||
|
||||
if changed:
|
||||
for mc in self.rq.worker:
|
||||
self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
|
||||
for mc in self.rq.fakeworker:
|
||||
self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
|
||||
|
||||
logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed)))
|
||||
|
||||
for tid in changed:
|
||||
if tid not in self.rqdata.runq_setscene_tids:
|
||||
continue
|
||||
valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False)
|
||||
if not valid:
|
||||
continue
|
||||
if tid in self.runq_running:
|
||||
continue
|
||||
if tid not in self.pending_migrations:
|
||||
self.pending_migrations.add(tid)
|
||||
|
||||
for tid in self.pending_migrations.copy():
|
||||
valid = True
|
||||
# Check no tasks this covers are running
|
||||
for dep in self.sqdata.sq_covered_tasks[tid]:
|
||||
@@ -2266,6 +2269,8 @@ class RunQueueExecute:
|
||||
if not valid:
|
||||
continue
|
||||
|
||||
self.pending_migrations.remove(tid)
|
||||
|
||||
if tid in self.tasks_scenequeue_done:
|
||||
self.tasks_scenequeue_done.remove(tid)
|
||||
for dep in self.sqdata.sq_covered_tasks[tid]:
|
||||
@@ -2296,10 +2301,8 @@ class RunQueueExecute:
|
||||
|
||||
logger.info("Setscene task %s now valid and being rerun" % tid)
|
||||
self.sqdone = False
|
||||
self.changed_setscene.remove(tid)
|
||||
changes = True
|
||||
|
||||
if changes:
|
||||
if changed:
|
||||
self.update_holdofftasks()
|
||||
|
||||
def scenequeue_updatecounters(self, task, fail=False):
|
||||
@@ -2854,7 +2857,7 @@ class runQueuePipe():
|
||||
bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
|
||||
bb.event.fire_from_worker(event, self.d)
|
||||
if isinstance(event, taskUniHashUpdate):
|
||||
self.rqexec.updated_taskhash(event.taskid, event.unihash)
|
||||
self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash))
|
||||
found = True
|
||||
self.queue = self.queue[index+8:]
|
||||
index = self.queue.find(b"</event>")
|
||||
|
||||
@@ -384,8 +384,8 @@ class RunQueueTests(unittest.TestCase):
|
||||
with open(tempdir + "/stamps/b1.do_install.taint", "w") as f:
|
||||
f.write("ed36d46a-2977-458a-b3de-eef885bc1817")
|
||||
cmd = ["bitbake", "e1", "-DD"]
|
||||
sstatevalid = "e1:do_package:b710f6312ffed900b4b2761cc05538645f4ff3e7e0b70d688c70c0f3bcc2e1a2"
|
||||
tasks = self.run_bitbakecmd(cmd, tempdir, sstatevalid, extraenv=extraenv, cleanup=True, slowtasks="e1:fetch")
|
||||
sstatevalid = "e1:do_package:f9aa46d63cb63d70a09712b6bc7fab57e4966cf8e8b52ff5ad1ba23823aec7d4 e1:do_package:b710f6312ffed900b4b2761cc05538645f4ff3e7e0b70d688c70c0f3bcc2e1a2"
|
||||
tasks = self.run_bitbakecmd(cmd, tempdir, sstatevalid, extraenv=extraenv, cleanup=True, slowtasks="e1:fetch b1:install")
|
||||
expected = ['a1:package', 'a1:install', 'b1:package', 'b1:install', 'a1:populate_sysroot', 'b1:populate_sysroot',
|
||||
'a1:package_write_ipk_setscene', 'b1:packagedata_setscene', 'b1:package_write_rpm_setscene',
|
||||
'a1:package_write_rpm_setscene', 'b1:package_write_ipk_setscene', 'a1:packagedata_setscene',
|
||||
|
||||
Reference in New Issue
Block a user