oeqa/core/threaded: Add OETestResultThreaded{,Internal} classes

The OETestResultThreadedInternal extends OETestResult to stores
results by Thread.

The OETestResultThreaded is a simple class that provides the
implementation of interfaces needed by outside like wasSuccesful,
stop, logSummary, logDetails.

[YOCTO #11450]

(From OE-Core rev: 8e71844fc4dd3fcc8a19f9d4c25aafb09c5525fe)

Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Aníbal Limón
2017-05-26 15:37:36 -05:00
committed by Richard Purdie
parent aeb8c9341b
commit 44285351f5

View File

@@ -7,7 +7,7 @@ import multiprocessing
from unittest.suite import TestSuite
from oeqa.core.loader import OETestLoader
from oeqa.core.runner import OEStreamLogger
from oeqa.core.runner import OEStreamLogger, OETestResult
class OETestLoaderThreaded(OETestLoader):
def __init__(self, tc, module_paths, modules, tests, modules_required,
@@ -114,3 +114,74 @@ class OEStreamLoggerThreaded(OEStreamLogger):
for line in self.buffers[tid].split('\n'):
self.logger.info(line)
self._lock.release()
class OETestResultThreadedInternal(OETestResult):
def _tc_map_results(self):
tid = threading.get_ident()
# PyUnit generates a result for every test module run, test
# if the thread already has an entry to avoid lose the previous
# test module results.
if not tid in self.tc._results:
self.tc._results[tid] = {}
self.tc._results[tid]['failures'] = self.failures
self.tc._results[tid]['errors'] = self.errors
self.tc._results[tid]['skipped'] = self.skipped
self.tc._results[tid]['expectedFailures'] = self.expectedFailures
class OETestResultThreaded(object):
_results = {}
_lock = threading.Lock()
def __init__(self, tc):
self.tc = tc
def _fill_tc_results(self):
tids = list(self.tc._results.keys())
fields = ['failures', 'errors', 'skipped', 'expectedFailures']
for tid in tids:
result = self.tc._results[tid]
for field in fields:
if not field in self.tc._results:
self.tc._results[field] = []
self.tc._results[field].extend(result[field])
def addResult(self, result, run_start_time, run_end_time):
tid = threading.get_ident()
self._lock.acquire()
self._results[tid] = {}
self._results[tid]['result'] = result
self._results[tid]['run_start_time'] = run_start_time
self._results[tid]['run_end_time'] = run_end_time
self._results[tid]['result'] = result
self._lock.release()
def wasSuccessful(self):
wasSuccessful = True
for tid in self._results.keys():
wasSuccessful = wasSuccessful and \
self._results[tid]['result'].wasSuccessful()
return wasSuccessful
def stop(self):
for tid in self._results.keys():
self._results[tid]['result'].stop()
def logSummary(self, component, context_msg=''):
elapsed_time = (self.tc._run_end_time - self.tc._run_start_time)
self.tc.logger.info("SUMMARY:")
self.tc.logger.info("%s (%s) - Ran %d tests in %.3fs" % (component,
context_msg, len(self.tc._registry['cases']), elapsed_time))
if self.wasSuccessful():
msg = "%s - OK - All required tests passed" % component
else:
msg = "%s - FAIL - Required tests failed" % component
self.tc.logger.info(msg)
def logDetails(self):
tid = list(self._results)[0]
result = self._results[tid]['result']
result.logDetails()