oeqa/core/threaded: Remove in favour of using concurrenttests

We have several options for parallel processing in oeqa, parallel
execution of modules, threading and mulitple processes for the runners.

After much experimentation is appears the most scalable and least
invasive approach is multiple processes using concurrenttestsuite
from testtools. This means we can drop the current threading code
which is only used by the sdk test execution.

oeqa/decorator/depends: Remove threading code

Revert "oeqa/sdk: Enable usage of OEQA thread mode"
This reverts commit adc434c063.

Revert "oeqa/core/tests: Add tests of OEQA Threaded mode"
This reverts commit a4eef558c9.

Revert "oeqa/core/decorator/oetimeout: Add support for OEQA threaded mode"
This reverts commit d3d4ba902d.

(From OE-Core rev: a98ab5e560e73b6988512fbae5cefe9e42ceed53)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Richard Purdie
2018-07-12 11:10:38 +00:00
parent ebd97e728a
commit 4e4958cba2
12 changed files with 14 additions and 411 deletions

View File

@@ -24,8 +24,6 @@ def testsdk_main(d):
from oeqa.sdk.context import OESDKTestContext, OESDKTestContextExecutor
from oeqa.utils import make_logger_bitbake_compatible
bb.event.enable_threadlock()
pn = d.getVar("PN")
logger = make_logger_bitbake_compatible(logging.getLogger("BitBake"))
@@ -99,8 +97,6 @@ def testsdkext_main(d):
from oeqa.utils import avoid_paths_in_environ, make_logger_bitbake_compatible, subprocesstweak
from oeqa.sdkext.context import OESDKExtTestContext, OESDKExtTestContextExecutor
bb.event.enable_threadlock()
pn = d.getVar("PN")
logger = make_logger_bitbake_compatible(logging.getLogger("BitBake"))

View File

@@ -3,7 +3,6 @@
from unittest import SkipTest
from oeqa.core.threaded import OETestRunnerThreaded
from oeqa.core.exception import OEQADependency
from . import OETestDiscover, registerDecorator
@@ -64,11 +63,7 @@ def _order_test_case_by_depends(cases, depends):
return [cases[case_id] for case_id in cases_ordered]
def _skipTestDependency(case, depends):
if isinstance(case.tc.runner, OETestRunnerThreaded):
import threading
results = case.tc._results[threading.get_ident()]
else:
results = case.tc._results
results = case.tc._results
skipReasons = ['errors', 'failures', 'skipped']

View File

@@ -1,12 +1,8 @@
# Copyright (C) 2016 Intel Corporation
# Released under the MIT license (see COPYING.MIT)
from . import OETestDecorator, registerDecorator
import signal
from threading import Timer
from oeqa.core.threaded import OETestRunnerThreaded
from . import OETestDecorator, registerDecorator
from oeqa.core.exception import OEQATimeoutError
@registerDecorator
@@ -14,32 +10,16 @@ class OETimeout(OETestDecorator):
attrs = ('oetimeout',)
def setUpDecorator(self):
self.logger.debug("Setting up a %d second(s) timeout" % self.oetimeout)
if isinstance(self.case.tc.runner, OETestRunnerThreaded):
self.timeouted = False
def _timeoutHandler():
self.timeouted = True
self.timer = Timer(self.oetimeout, _timeoutHandler)
self.timer.start()
else:
timeout = self.oetimeout
def _timeoutHandler(signum, frame):
raise OEQATimeoutError("Timed out after %s "
timeout = self.oetimeout
def _timeoutHandler(signum, frame):
raise OEQATimeoutError("Timed out after %s "
"seconds of execution" % timeout)
self.alarmSignal = signal.signal(signal.SIGALRM, _timeoutHandler)
signal.alarm(self.oetimeout)
self.logger.debug("Setting up a %d second(s) timeout" % self.oetimeout)
self.alarmSignal = signal.signal(signal.SIGALRM, _timeoutHandler)
signal.alarm(self.oetimeout)
def tearDownDecorator(self):
if isinstance(self.case.tc.runner, OETestRunnerThreaded):
self.timer.cancel()
self.logger.debug("Removed Timer handler")
if self.timeouted:
raise OEQATimeoutError("Timed out after %s "
"seconds of execution" % self.oetimeout)
else:
signal.alarm(0)
signal.signal(signal.SIGALRM, self.alarmSignal)
self.logger.debug("Removed SIGALRM handler")
signal.alarm(0)
signal.signal(signal.SIGALRM, self.alarmSignal)
self.logger.debug("Removed SIGALRM handler")

View File

@@ -1,12 +0,0 @@
# Copyright (C) 2017 Intel Corporation
# Released under the MIT license (see COPYING.MIT)
from oeqa.core.case import OETestCase
class ThreadedTest(OETestCase):
def test_threaded_no_depends(self):
self.assertTrue(True, msg='How is this possible?')
class ThreadedTest2(OETestCase):
def test_threaded_same_module(self):
self.assertTrue(True, msg='How is this possible?')

View File

@@ -1,8 +0,0 @@
# Copyright (C) 2017 Intel Corporation
# Released under the MIT license (see COPYING.MIT)
from oeqa.core.case import OETestCase
class ThreadedTestAlone(OETestCase):
def test_threaded_alone(self):
self.assertTrue(True, msg='How is this possible?')

View File

@@ -1,10 +0,0 @@
# Copyright (C) 2017 Intel Corporation
# Released under the MIT license (see COPYING.MIT)
from oeqa.core.case import OETestCase
from oeqa.core.decorator.depends import OETestDepends
class ThreadedTest3(OETestCase):
@OETestDepends(['threaded.ThreadedTest.test_threaded_no_depends'])
def test_threaded_depends(self):
self.assertTrue(True, msg='How is this possible?')

View File

@@ -1,12 +0,0 @@
# Copyright (C) 2017 Intel Corporation
# Released under the MIT license (see COPYING.MIT)
from oeqa.core.case import OETestCase
class ThreadedTestModule(OETestCase):
def test_threaded_module(self):
self.assertTrue(True, msg='How is this possible?')
class ThreadedTestModule2(OETestCase):
def test_threaded_module2(self):
self.assertTrue(True, msg='How is this possible?')

View File

@@ -33,13 +33,3 @@ class TestBase(unittest.TestCase):
tc.loadTests(self.cases_path, modules=modules, tests=tests,
filters=filters)
return tc
def _testLoaderThreaded(self, d={}, modules=[],
tests=[], filters={}):
from oeqa.core.threaded import OETestContextThreaded
tc = OETestContextThreaded(d, self.logger)
tc.loadTests(self.cases_path, modules=modules, tests=tests,
filters=filters)
return tc

View File

@@ -131,17 +131,5 @@ class TestTimeoutDecorator(TestBase):
msg = "OETestTimeout didn't restore SIGALRM"
self.assertIs(alarm_signal, signal.getsignal(signal.SIGALRM), msg=msg)
def test_timeout_thread(self):
tests = ['timeout.TimeoutTest.testTimeoutPass']
msg = 'Failed to run test using OETestTimeout'
tc = self._testLoaderThreaded(modules=self.modules, tests=tests)
self.assertTrue(tc.runTests().wasSuccessful(), msg=msg)
def test_timeout_threaded_fail(self):
tests = ['timeout.TimeoutTest.testTimeoutFail']
msg = "OETestTimeout test didn't timeout as expected"
tc = self._testLoaderThreaded(modules=self.modules, tests=tests)
self.assertFalse(tc.runTests().wasSuccessful(), msg=msg)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# Copyright (C) 2016-2017 Intel Corporation
# Copyright (C) 2016 Intel Corporation
# Released under the MIT license (see COPYING.MIT)
import os
@@ -82,33 +82,5 @@ class TestLoader(TestBase):
msg = 'Expected modules from two different paths'
self.assertEqual(modules, expected_modules, msg=msg)
def test_loader_threaded(self):
cases_path = self.cases_path
self.cases_path = [os.path.join(self.cases_path, 'loader', 'threaded')]
tc = self._testLoaderThreaded()
self.assertEqual(len(tc.suites), 3, "Expected to be 3 suites")
case_ids = ['threaded.ThreadedTest.test_threaded_no_depends',
'threaded.ThreadedTest2.test_threaded_same_module',
'threaded_depends.ThreadedTest3.test_threaded_depends']
for case in tc.suites[0]._tests:
self.assertEqual(case.id(),
case_ids[tc.suites[0]._tests.index(case)])
case_ids = ['threaded_alone.ThreadedTestAlone.test_threaded_alone']
for case in tc.suites[1]._tests:
self.assertEqual(case.id(),
case_ids[tc.suites[1]._tests.index(case)])
case_ids = ['threaded_module.ThreadedTestModule.test_threaded_module',
'threaded_module.ThreadedTestModule2.test_threaded_module2']
for case in tc.suites[2]._tests:
self.assertEqual(case.id(),
case_ids[tc.suites[2]._tests.index(case)])
self.cases_path = cases_path
if __name__ == '__main__':
unittest.main()

View File

@@ -1,275 +0,0 @@
# Copyright (C) 2017 Intel Corporation
# Released under the MIT license (see COPYING.MIT)
import threading
import multiprocessing
import queue
import time
from unittest.suite import TestSuite
from oeqa.core.loader import OETestLoader
from oeqa.core.runner import OEStreamLogger, OETestResult, OETestRunner
from oeqa.core.context import OETestContext
class OETestLoaderThreaded(OETestLoader):
def __init__(self, tc, module_paths, modules, tests, modules_required,
filters, process_num=0, *args, **kwargs):
super(OETestLoaderThreaded, self).__init__(tc, module_paths, modules,
tests, modules_required, filters, *args, **kwargs)
self.process_num = process_num
def discover(self):
suite = super(OETestLoaderThreaded, self).discover()
if self.process_num <= 0:
self.process_num = min(multiprocessing.cpu_count(),
len(suite._tests))
suites = []
for _ in range(self.process_num):
suites.append(self.suiteClass())
def _search_for_module_idx(suites, case):
"""
Cases in the same module needs to be run
in the same thread because PyUnit keeps track
of setUp{Module, Class,} and tearDown{Module, Class,}.
"""
for idx in range(self.process_num):
suite = suites[idx]
for c in suite._tests:
if case.__module__ == c.__module__:
return idx
return -1
def _search_for_depend_idx(suites, depends):
"""
Dependency cases needs to be run in the same
thread, because OEQA framework look at the state
of dependant test to figure out if skip or not.
"""
for idx in range(self.process_num):
suite = suites[idx]
for case in suite._tests:
if case.id() in depends:
return idx
return -1
def _get_best_idx(suites):
sizes = [len(suite._tests) for suite in suites]
return sizes.index(min(sizes))
def _fill_suites(suite):
idx = -1
for case in suite:
if isinstance(case, TestSuite):
_fill_suites(case)
else:
idx = _search_for_module_idx(suites, case)
depends = {}
if 'depends' in self.tc._registry:
depends = self.tc._registry['depends']
if idx == -1 and case.id() in depends:
case_depends = depends[case.id()]
idx = _search_for_depend_idx(suites, case_depends)
if idx == -1:
idx = _get_best_idx(suites)
suites[idx].addTest(case)
_fill_suites(suite)
suites_tmp = suites
suites = []
for suite in suites_tmp:
if len(suite._tests) > 0:
suites.append(suite)
return suites
class OEStreamLoggerThreaded(OEStreamLogger):
_lock = threading.Lock()
buffers = {}
def write(self, msg):
tid = threading.get_ident()
if not tid in self.buffers:
self.buffers[tid] = ""
if msg:
self.buffers[tid] += msg
def finish(self):
tid = threading.get_ident()
self._lock.acquire()
self.logger.info('THREAD: %d' % tid)
self.logger.info('-' * 70)
for line in self.buffers[tid].split('\n'):
self.logger.info(line)
self._lock.release()
class OETestResultThreadedInternal(OETestResult):
def _tc_map_results(self):
tid = threading.get_ident()
# PyUnit generates a result for every test module run, test
# if the thread already has an entry to avoid lose the previous
# test module results.
if not tid in self.tc._results:
self.tc._results[tid] = {}
self.tc._results[tid]['failures'] = self.failures
self.tc._results[tid]['errors'] = self.errors
self.tc._results[tid]['skipped'] = self.skipped
self.tc._results[tid]['expectedFailures'] = self.expectedFailures
class OETestResultThreaded(object):
_results = {}
_lock = threading.Lock()
def __init__(self, tc):
self.tc = tc
def _fill_tc_results(self):
tids = list(self.tc._results.keys())
fields = ['failures', 'errors', 'skipped', 'expectedFailures']
for tid in tids:
result = self.tc._results[tid]
for field in fields:
if not field in self.tc._results:
self.tc._results[field] = []
self.tc._results[field].extend(result[field])
def addResult(self, result, run_start_time, run_end_time):
tid = threading.get_ident()
self._lock.acquire()
self._results[tid] = {}
self._results[tid]['result'] = result
self._results[tid]['run_start_time'] = run_start_time
self._results[tid]['run_end_time'] = run_end_time
self._results[tid]['result'] = result
self._lock.release()
def wasSuccessful(self):
wasSuccessful = True
for tid in self._results.keys():
wasSuccessful = wasSuccessful and \
self._results[tid]['result'].wasSuccessful()
return wasSuccessful
def stop(self):
for tid in self._results.keys():
self._results[tid]['result'].stop()
def logSummary(self, component, context_msg=''):
elapsed_time = (self.tc._run_end_time - self.tc._run_start_time)
self.tc.logger.info("SUMMARY:")
self.tc.logger.info("%s (%s) - Ran %d tests in %.3fs" % (component,
context_msg, len(self.tc._registry['cases']), elapsed_time))
if self.wasSuccessful():
msg = "%s - OK - All required tests passed" % component
else:
msg = "%s - FAIL - Required tests failed" % component
self.tc.logger.info(msg)
def logDetails(self):
if list(self._results):
tid = list(self._results)[0]
result = self._results[tid]['result']
result.logDetails()
class _Worker(threading.Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks, result, stream):
threading.Thread.__init__(self)
self.tasks = tasks
self.result = result
self.stream = stream
def run(self):
while True:
try:
func, args, kargs = self.tasks.get(block=False)
except queue.Empty:
break
try:
run_start_time = time.time()
rc = func(*args, **kargs)
run_end_time = time.time()
self.result.addResult(rc, run_start_time, run_end_time)
self.stream.finish()
except Exception as e:
print(e)
finally:
self.tasks.task_done()
class _ThreadedPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_workers, num_tasks, stream=None, result=None):
self.tasks = queue.Queue(num_tasks)
self.workers = []
for _ in range(num_workers):
worker = _Worker(self.tasks, result, stream)
self.workers.append(worker)
def start(self):
for worker in self.workers:
worker.start()
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
for worker in self.workers:
worker.join()
class OETestRunnerThreaded(OETestRunner):
streamLoggerClass = OEStreamLoggerThreaded
def __init__(self, tc, *args, **kwargs):
super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs)
self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__
def run(self, suites):
result = OETestResultThreaded(self.tc)
pool = _ThreadedPool(len(suites), len(suites), stream=self.stream,
result=result)
for s in suites:
pool.add_task(super(OETestRunnerThreaded, self).run, s)
pool.start()
pool.wait_completion()
result._fill_tc_results()
return result
class OETestContextThreaded(OETestContext):
loaderClass = OETestLoaderThreaded
runnerClass = OETestRunnerThreaded
def loadTests(self, module_paths, modules=[], tests=[],
modules_manifest="", modules_required=[], filters={}, process_num=0):
if modules_manifest:
modules = self._read_modules_from_manifest(modules_manifest)
self.loader = self.loaderClass(self, module_paths, modules, tests,
modules_required, filters, process_num)
self.suites = self.loader.discover()

View File

@@ -6,10 +6,9 @@ import sys
import glob
import re
from oeqa.core.context import OETestContextExecutor
from oeqa.core.threaded import OETestContextThreaded
from oeqa.core.context import OETestContext, OETestContextExecutor
class OESDKTestContext(OETestContextThreaded):
class OESDKTestContext(OETestContext):
sdk_files_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "files")
def __init__(self, td=None, logger=None, sdk_dir=None, sdk_env=None,