diff --git a/meta/lib/oeqa/utils/qemurunner.py b/meta/lib/oeqa/utils/qemurunner.py index cdd0db5877..4a2246733f 100644 --- a/meta/lib/oeqa/utils/qemurunner.py +++ b/meta/lib/oeqa/utils/qemurunner.py @@ -21,7 +21,9 @@ import threading import codecs import tempfile from collections import defaultdict +from contextlib import contextmanager import importlib +import traceback # Get Unicode non printable control chars control_range = list(range(0,32))+list(range(127,160)) @@ -517,8 +519,12 @@ class QemuRunner: except Exception as e: self.logger.warning('Extra log data exception %s' % repr(e)) data = None + self.thread.serial_lock.release() return False + with self.thread.serial_lock: + self.thread.set_serialsock(self.server_socket) + # If we are not able to login the tests can continue try: (status, output) = self.run_serial(self.boot_patterns['send_login_user'], raw=True, timeout=120) @@ -653,31 +659,32 @@ class QemuRunner: data = '' status = 0 - self.server_socket.sendall(command.encode('utf-8')) - start = time.time() - end = start + timeout - while True: - now = time.time() - if now >= end: - data += "<<< run_serial(): command timed out after %d seconds without output >>>\r\n\r\n" % timeout - break - try: - sread, _, _ = select.select([self.server_socket],[],[], end - now) - except InterruptedError: - continue - if sread: - # try to avoid reading single character at a time - time.sleep(0.1) - answer = self.server_socket.recv(1024) - if answer: - data += answer.decode('utf-8') - # Search the prompt to stop - if re.search(self.boot_patterns['search_cmd_finished'], data): - break - else: - if self.canexit: - return (1, "") - raise Exception("No data on serial console socket, connection closed?") + with self.thread.serial_lock: + self.server_socket.sendall(command.encode('utf-8')) + start = time.time() + end = start + timeout + while True: + now = time.time() + if now >= end: + data += "<<< run_serial(): command timed out after %d seconds without output >>>\r\n\r\n" % timeout + break + try: + sread, _, _ = select.select([self.server_socket],[],[], end - now) + except InterruptedError: + continue + if sread: + # try to avoid reading single character at a time + time.sleep(0.1) + answer = self.server_socket.recv(1024) + if answer: + data += answer.decode('utf-8') + # Search the prompt to stop + if re.search(self.boot_patterns['search_cmd_finished'], data): + break + else: + if self.canexit: + return (1, "") + raise Exception("No data on serial console socket, connection closed?") if data: if raw: @@ -696,6 +703,15 @@ class QemuRunner: status = 1 return (status, str(data)) +@contextmanager +def nonblocking_lock(lock): + locked = lock.acquire(False) + try: + yield locked + finally: + if locked: + lock.release() + # This class is for reading data from a socket and passing it to logfunc # to be processed. It's completely event driven and has a straightforward # event loop. The mechanism for stopping the thread is a simple pipe which @@ -703,8 +719,10 @@ class QemuRunner: class LoggingThread(threading.Thread): def __init__(self, logfunc, sock, logger, qemuoutput): self.connection_established = threading.Event() + self.serial_lock = threading.Lock() self.serversock = sock + self.serialsock = None self.qemuoutput = qemuoutput self.logfunc = logfunc self.logger = logger @@ -717,9 +735,14 @@ class LoggingThread(threading.Thread): threading.Thread.__init__(self, target=self.threadtarget) + def set_serialsock(self, serialsock): + self.serialsock = serialsock + def threadtarget(self): try: self.eventloop() + except Exception as e: + self.logger.warning("Exception %s in logging thread" % traceback.format_exception(e)) finally: self.teardown() @@ -753,6 +776,7 @@ class LoggingThread(threading.Thread): event_read_mask = self.errorevents | self.readevents if self.serversock: poll.register(self.serversock.fileno()) + serial_registered = False poll.register(self.qemuoutput.fileno()) poll.register(self.readpipe, event_read_mask) @@ -760,7 +784,7 @@ class LoggingThread(threading.Thread): self.running = True self.logger.debug("Starting thread event loop") while not breakout: - events = poll.poll() + events = poll.poll(2) for event in events: # An error occurred, bail out if event[1] & self.errorevents: @@ -785,18 +809,34 @@ class LoggingThread(threading.Thread): # Actual data to be logged elif self.readsock.fileno() == event[0]: - data = self.recv(1024) + data = self.recv(1024, self.readsock) self.logfunc(data) elif self.qemuoutput.fileno() == event[0]: data = self.qemuoutput.read() self.logger.debug("Data received on qemu stdout %s" % data) self.logfunc(data, ".stdout") + elif self.serialsock and self.serialsock.fileno() == event[0]: + if self.serial_lock.acquire(blocking=False): + data = self.recv(1024, self.serialsock) + self.logger.debug("Data received serial thread %s" % data.decode('utf-8', 'replace')) + self.logfunc(data, ".2") + self.serial_lock.release() + else: + serial_registered = False + poll.unregister(self.serialsock.fileno()) + + if not serial_registered and self.serialsock: + with nonblocking_lock(self.serial_lock) as l: + if l: + serial_registered = True + poll.register(self.serialsock.fileno(), event_read_mask) + # Since the socket is non-blocking make sure to honor EAGAIN # and EWOULDBLOCK. - def recv(self, count): + def recv(self, count, sock): try: - data = self.readsock.recv(count) + data = sock.recv(count) except socket.error as e: if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK: return b''