sstate: Use the python3 ThreadPoolExecutor instead of the OE ThreadedPool

For the FetchConnectionCache use a queue where each thread can
get an unsed connection_cache that is properly initialized before
we fireup the ThreadPoolExecutor.

(From OE-Core rev: eb6a6820928472ef194b963b606454e731f9486f)

Signed-off-by: Jose Quaresma <quaresma.jose@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Jose Quaresma
2022-04-16 23:28:45 +01:00
committed by Richard Purdie
parent a2d8217488
commit 6421d3330c

View File

@@ -977,15 +977,19 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
localdata.delVar('BB_NO_NETWORK')
from bb.fetch2 import FetchConnectionCache
def checkstatus_init(thread_worker):
thread_worker.connection_cache = FetchConnectionCache()
def checkstatus_init():
while not connection_cache_pool.full():
connection_cache_pool.put(FetchConnectionCache())
def checkstatus_end(thread_worker):
thread_worker.connection_cache.close_connections()
def checkstatus_end():
while not connection_cache_pool.empty():
connection_cache = connection_cache_pool.get()
connection_cache.close_connections()
def checkstatus(thread_worker, arg):
def checkstatus(arg):
(tid, sstatefile) = arg
connection_cache = connection_cache_pool.get()
localdata2 = bb.data.createCopy(localdata)
srcuri = "file://" + sstatefile
localdata2.setVar('SRC_URI', srcuri)
@@ -995,7 +999,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
try:
fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2,
connection_cache=thread_worker.connection_cache)
connection_cache=connection_cache)
fetcher.checkstatus()
bb.debug(2, "SState: Successful fetch test for %s" % srcuri)
found.add(tid)
@@ -1005,6 +1009,8 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
except Exception as e:
bb.error("SState: cannot test %s: %s\n%s" % (srcuri, repr(e), traceback.format_exc()))
connection_cache_pool.put(connection_cache)
if progress:
bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - thread_worker.tasks.qsize()), d)
@@ -1025,13 +1031,13 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
fetcherenv = bb.fetch2.get_fetcher_environment(d)
with bb.utils.environment(**fetcherenv):
bb.event.enable_threadlock()
pool = oe.utils.ThreadedPool(nproc, len(tasklist),
worker_init=checkstatus_init, worker_end=checkstatus_end,
name="sstate_checkhashes-")
for t in tasklist:
pool.add_task(checkstatus, t)
pool.start()
pool.wait_completion()
import concurrent.futures
from queue import Queue
connection_cache_pool = Queue(nproc)
checkstatus_init()
with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
executor.map(checkstatus, tasklist.copy())
checkstatus_end()
bb.event.disable_threadlock()
if progress: