bitbake: runqueue: Merge the queues and execute setscene and normal tasks in parallel

This is the serious functionality change in this runqueue patch series of
changes.

Rather than two phases of execution, the scenequeue setscene phase, followed
by normal task exeuction, this change allows them to execute in parallel
together.

To do this we need to handle marking of tasks as covered/uncovered in a piecemeal
fashion on a task by task basis rather than in a single function.

The code will block normal task exeuction until any setcene task which could
cover that task is executed and its status is known. There is a slight
optimisation which could be possible here at the risk of races but that
doesn't seem worthwhile.

The state engine isn't entirely cleaned up in this commit (see FIXME) and
the setscenewhitelist functionality is broken by it (see following patches)
however its good enough to test with normal workflows.

(Bitbake rev: 58b3f0847cc2d47e76f74d59dcbbf78fe41b118b)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Richard Purdie
2019-07-04 00:14:02 +01:00
parent 491c6049e0
commit cf829a5f66

View File

@@ -142,7 +142,7 @@ class RunQueueScheduler(object):
Return the id of the first task we find that is buildable
"""
self.buildable = [x for x in self.buildable if x not in self.rq.runq_running]
buildable = self.buildable
buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered)]
if not buildable:
return None
@@ -1454,25 +1454,18 @@ class RunQueue:
# If we don't have any setscene functions, skip execution
if len(self.rqdata.runq_setscene_tids) == 0:
self.rqdata.init_progress_reporter.finish()
self.state = runQueueRunInit
else:
logger.info('Executing SetScene Tasks')
self.state = runQueueSceneRun
if self.state is runQueueSceneRun:
retval = self.rqexe.sq_execute()
if self.state is runQueueRunInit:
if self.cooker.configuration.setsceneonly:
self.state = runQueueComplete
if self.state is runQueueRunInit:
logger.info("Executing RunQueue Tasks")
start_runqueue_tasks(self.rqexe)
logger.info('No setscene tasks')
for tid in self.rqdata.runtaskentries:
if len(self.rqdata.runtaskentries[tid].depends) == 0:
self.rqexe.setbuildable(tid)
self.rqexe.tasks_notcovered.add(tid)
self.rqexe.sqdone = True
logger.info('Executing Tasks')
self.state = runQueueRunning
if self.state is runQueueRunning:
retval = self.rqexe.sq_execute()
# FIXME revtal
retval = self.rqexe.execute()
if self.state is runQueueCleanUp:
@@ -1757,6 +1750,8 @@ class RunQueueExecute:
self.stampcache = {}
self.sqdone = False
self.stats = RunQueueStats(len(self.rqdata.runtaskentries))
self.sq_stats = RunQueueStats(len(self.rqdata.runq_setscene_tids))
@@ -1772,12 +1767,12 @@ class RunQueueExecute:
self.scenequeue_covered = set()
# List of tasks which are covered (including setscene ones)
self.tasks_covered = set()
self.tasks_scenequeue_done = set()
self.scenequeue_notcovered = set()
self.tasks_notcovered = set()
self.scenequeue_notneeded = set()
if len(self.rqdata.runq_setscene_tids) > 0:
self.sqdata = SQData()
build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self)
self.coveredtopocess = set()
schedulers = self.get_schedulers()
for scheduler in schedulers:
@@ -1789,6 +1784,10 @@ class RunQueueExecute:
bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
(self.scheduler, ", ".join(obj.name for obj in schedulers)))
if len(self.rqdata.runq_setscene_tids) > 0:
self.sqdata = SQData()
build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self)
def runqueue_process_waitpid(self, task, status):
# self.build_stamps[pid] may not exist when use shared work directory.
@@ -1951,6 +1950,9 @@ class RunQueueExecute:
if process_setscenewhitelist(self.rq, self.rqdata, self.stampcache, self.sched, self):
return True
if self.cooker.configuration.setsceneonly:
return True
self.rq.read_workers()
if self.stats.total == 0:
@@ -2014,7 +2016,7 @@ class RunQueueExecute:
if self.can_start_task():
return True
if self.stats.active > 0:
if self.stats.active > 0 or self.sq_stats.active > 0:
self.rq.read_workers()
return self.rq.active_fds()
@@ -2026,9 +2028,9 @@ class RunQueueExecute:
for task in self.rqdata.runtaskentries:
if task not in self.runq_buildable:
logger.error("Task %s never buildable!", task)
if task not in self.runq_running:
elif task not in self.runq_running:
logger.error("Task %s never ran!", task)
if task not in self.runq_complete:
elif task not in self.runq_complete:
logger.error("Task %s never completed!", task)
self.rq.state = runQueueComplete
@@ -2070,7 +2072,42 @@ class RunQueueExecute:
#bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
return taskdepdata
def scenequeue_updatecounters(self, task, fail = False):
def scenequeue_process_notcovered(self, task):
logger.debug(1, 'Not skipping setscene task %s', task)
if len(self.rqdata.runtaskentries[task].depends) == 0:
self.setbuildable(task)
notcovered = set([task])
while notcovered:
new = set()
for t in notcovered:
for deptask in self.rqdata.runtaskentries[t].depends:
if deptask in notcovered or deptask in new or deptask in self.rqdata.runq_setscene_tids or deptask in self.tasks_notcovered:
continue
logger.debug(1, 'Task %s depends on non-setscene task %s so not skipping' % (t, deptask))
new.add(deptask)
self.tasks_notcovered.add(deptask)
if len(self.rqdata.runtaskentries[deptask].depends) == 0:
self.setbuildable(deptask)
notcovered = new
def scenequeue_process_unskippable(self, task):
# Look up the dependency chain for non-setscene things which depend on this task
# and mark as 'done'/notcovered
ready = set([task])
while ready:
new = set()
for t in ready:
for deptask in self.rqdata.runtaskentries[t].revdeps:
if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids:
continue
if self.rqdata.runtaskentries[deptask].depends.issubset(self.tasks_scenequeue_done):
new.add(deptask)
self.tasks_scenequeue_done.add(deptask)
self.tasks_notcovered.add(deptask)
#logger.warning("Up: " + str(deptask))
ready = new
def scenequeue_updatecounters(self, task, fail=False):
for dep in self.sqdata.sq_deps[task]:
if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]:
logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep))
@@ -2083,6 +2120,43 @@ class RunQueueExecute:
if len(self.sqdata.sq_revdeps2[dep]) == 0:
self.sq_buildable.add(dep)
next = set([task])
while next:
new = set()
for t in next:
self.tasks_scenequeue_done.add(t)
# Look down the dependency chain for non-setscene things which this task depends on
# and mark as 'done'
for dep in self.rqdata.runtaskentries[t].depends:
if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done:
continue
if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done):
new.add(dep)
#logger.warning(" Down: " + dep)
next = new
if task in self.sqdata.unskippable:
self.scenequeue_process_unskippable(task)
if task in self.scenequeue_notcovered:
self.scenequeue_process_notcovered(task)
elif task in self.scenequeue_covered:
logger.debug(1, 'Queued setscene task %s', task)
self.coveredtopocess.add(task)
for task in self.coveredtopocess.copy():
if self.sqdata.sq_covered_tasks[task].issubset(self.tasks_scenequeue_done):
logger.debug(1, 'Processing setscene task %s', task)
covered = self.sqdata.sq_covered_tasks[task]
covered.add(task)
# Remove notcovered tasks
covered.difference_update(self.tasks_notcovered)
self.tasks_covered.update(covered)
self.coveredtopocess.remove(task)
for tid in covered:
if len(self.rqdata.runtaskentries[tid].depends) == 0:
self.setbuildable(tid)
def sq_task_completeoutright(self, task):
"""
Mark a task as completed
@@ -2113,6 +2187,7 @@ class RunQueueExecute:
self.sq_stats.taskFailed()
bb.event.fire(sceneQueueTaskFailed(task, self.sq_stats, result, self), self.cfgData)
self.scenequeue_notcovered.add(task)
self.tasks_notcovered.add(task)
self.scenequeue_updatecounters(task, True)
self.sq_check_taskfail(task)
@@ -2122,6 +2197,7 @@ class RunQueueExecute:
self.sq_stats.taskSkipped()
self.sq_stats.taskCompleted()
self.scenequeue_notcovered.add(task)
self.tasks_notcovered.add(task)
self.scenequeue_updatecounters(task, True)
def sq_task_skip(self, task):
@@ -2136,6 +2212,9 @@ class RunQueueExecute:
Run the tasks in a queue prepared by prepare_runqueue
"""
if self.sqdone:
return True
self.rq.read_workers()
task = None
@@ -2209,7 +2288,7 @@ class RunQueueExecute:
if self.can_start_task():
return True
if self.sq_stats.active > 0:
if self.stats.active > 0 or self.sq_stats.active > 0:
self.rq.read_workers()
return self.rq.active_fds()
@@ -2221,11 +2300,14 @@ class RunQueueExecute:
logger.debug(1, 'We can skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
self.rq.state = runQueueRunInit
completeevent = sceneQueueComplete(self.sq_stats, self.rq)
bb.event.fire(completeevent, self.cfgData)
if self.cooker.configuration.setsceneonly:
self.rq.state = runQueueComplete
self.sqdone = True
return True
def sq_build_taskdepdata(self, task):
@@ -2366,6 +2448,12 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
if tid in rqdata.runq_setscene_tids:
continue
sqdata.unskippable.remove(tid)
if len(rqdata.runtaskentries[tid].depends) == 0:
# These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
sqrq.tasks_notcovered.add(tid)
sqrq.tasks_scenequeue_done.add(tid)
sqrq.setbuildable(tid)
sqrq.scenequeue_process_unskippable(tid)
sqdata.unskippable |= rqdata.runtaskentries[tid].depends
new = True
@@ -2499,33 +2587,6 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
logger.debug(2, 'No package found, so skipping setscene task %s', tid)
sqdata.outrightfail.append(tid)
def start_runqueue_tasks(rqexec):
# Mark initial buildable tasks
for tid in rqexec.rqdata.runtaskentries:
if len(rqexec.rqdata.runtaskentries[tid].depends) == 0:
rqexec.setbuildable(tid)
if len(rqexec.rqdata.runtaskentries[tid].revdeps) > 0 and rqexec.rqdata.runtaskentries[tid].revdeps.issubset(rqexec.tasks_covered):
rqexec.tasks_covered.add(tid)
found = True
while found:
found = False
for tid in rqexec.rqdata.runtaskentries:
if tid in rqexec.tasks_covered:
continue
logger.debug(1, 'Considering %s: %s' % (tid, str(rqexec.rqdata.runtaskentries[tid].revdeps)))
if len(rqexec.rqdata.runtaskentries[tid].revdeps) > 0 and rqexec.rqdata.runtaskentries[tid].revdeps.issubset(rqexec.tasks_covered):
if tid in rqexec.scenequeue_notcovered:
continue
found = True
rqexec.tasks_covered.add(tid)
logger.debug(1, 'Skip list %s', sorted(rqexec.tasks_covered))
for task in self.rq.scenequeue_notcovered:
logger.debug(1, 'Not skipping task %s', task)
class TaskFailure(Exception):
"""
Exception raised when a task in a runqueue fails