mirror of
https://git.yoctoproject.org/poky
synced 2026-02-09 18:23:02 +01:00
asyncio in older Python 3.x (seen with 3.7) can seemingly hang if new_event_loop is called repeatedly in the same process. The reuse of processes in the Bitbake thread pool during parsing seems to be able to trigger this with the PR server export selftest. It appears that calling set_event_loop with the new loop avoids the issue, so that is now done in the asyncrpc Client initializer (with an explanatory comment). This should be revisited when the day arrives that Python 3.9 becomes the minimum required for BitBake. Additionally, it was discovered that using get_event_loop in the asyncrpc server initialization can trigger hangs in the hashserv unittests when the second test is run. To avoid this, switch to calling new_event_loop + set_event_loop in the initialization code there as well. (Bitbake rev: bb9a36563505652294b20b4c88199b24fa208342) Signed-off-by: Scott Murray <scott.murray@konsulko.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
287 lines
9.2 KiB
Python
287 lines
9.2 KiB
Python
#
|
|
# SPDX-License-Identifier: GPL-2.0-only
|
|
#
|
|
|
|
import abc
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import signal
|
|
import socket
|
|
import sys
|
|
import multiprocessing
|
|
from . import chunkify, DEFAULT_MAX_CHUNK
|
|
|
|
|
|
class ClientError(Exception):
|
|
pass
|
|
|
|
|
|
class ServerError(Exception):
|
|
pass
|
|
|
|
|
|
class AsyncServerConnection(object):
|
|
def __init__(self, reader, writer, proto_name, logger):
|
|
self.reader = reader
|
|
self.writer = writer
|
|
self.proto_name = proto_name
|
|
self.max_chunk = DEFAULT_MAX_CHUNK
|
|
self.handlers = {
|
|
'chunk-stream': self.handle_chunk,
|
|
'ping': self.handle_ping,
|
|
}
|
|
self.logger = logger
|
|
|
|
async def process_requests(self):
|
|
try:
|
|
self.addr = self.writer.get_extra_info('peername')
|
|
self.logger.debug('Client %r connected' % (self.addr,))
|
|
|
|
# Read protocol and version
|
|
client_protocol = await self.reader.readline()
|
|
if client_protocol is None:
|
|
return
|
|
|
|
(client_proto_name, client_proto_version) = client_protocol.decode('utf-8').rstrip().split()
|
|
if client_proto_name != self.proto_name:
|
|
self.logger.debug('Rejecting invalid protocol %s' % (self.proto_name))
|
|
return
|
|
|
|
self.proto_version = tuple(int(v) for v in client_proto_version.split('.'))
|
|
if not self.validate_proto_version():
|
|
self.logger.debug('Rejecting invalid protocol version %s' % (client_proto_version))
|
|
return
|
|
|
|
# Read headers. Currently, no headers are implemented, so look for
|
|
# an empty line to signal the end of the headers
|
|
while True:
|
|
line = await self.reader.readline()
|
|
if line is None:
|
|
return
|
|
|
|
line = line.decode('utf-8').rstrip()
|
|
if not line:
|
|
break
|
|
|
|
# Handle messages
|
|
while True:
|
|
d = await self.read_message()
|
|
if d is None:
|
|
break
|
|
await self.dispatch_message(d)
|
|
await self.writer.drain()
|
|
except ClientError as e:
|
|
self.logger.error(str(e))
|
|
finally:
|
|
self.writer.close()
|
|
|
|
async def dispatch_message(self, msg):
|
|
for k in self.handlers.keys():
|
|
if k in msg:
|
|
self.logger.debug('Handling %s' % k)
|
|
await self.handlers[k](msg[k])
|
|
return
|
|
|
|
raise ClientError("Unrecognized command %r" % msg)
|
|
|
|
def write_message(self, msg):
|
|
for c in chunkify(json.dumps(msg), self.max_chunk):
|
|
self.writer.write(c.encode('utf-8'))
|
|
|
|
async def read_message(self):
|
|
l = await self.reader.readline()
|
|
if not l:
|
|
return None
|
|
|
|
try:
|
|
message = l.decode('utf-8')
|
|
|
|
if not message.endswith('\n'):
|
|
return None
|
|
|
|
return json.loads(message)
|
|
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
|
self.logger.error('Bad message from client: %r' % message)
|
|
raise e
|
|
|
|
async def handle_chunk(self, request):
|
|
lines = []
|
|
try:
|
|
while True:
|
|
l = await self.reader.readline()
|
|
l = l.rstrip(b"\n").decode("utf-8")
|
|
if not l:
|
|
break
|
|
lines.append(l)
|
|
|
|
msg = json.loads(''.join(lines))
|
|
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
|
self.logger.error('Bad message from client: %r' % lines)
|
|
raise e
|
|
|
|
if 'chunk-stream' in msg:
|
|
raise ClientError("Nested chunks are not allowed")
|
|
|
|
await self.dispatch_message(msg)
|
|
|
|
async def handle_ping(self, request):
|
|
response = {'alive': True}
|
|
self.write_message(response)
|
|
|
|
|
|
class AsyncServer(object):
|
|
def __init__(self, logger):
|
|
self._cleanup_socket = None
|
|
self.logger = logger
|
|
self.start = None
|
|
self.address = None
|
|
self.loop = None
|
|
|
|
def start_tcp_server(self, host, port):
|
|
def start_tcp():
|
|
self.server = self.loop.run_until_complete(
|
|
asyncio.start_server(self.handle_client, host, port)
|
|
)
|
|
|
|
for s in self.server.sockets:
|
|
self.logger.debug('Listening on %r' % (s.getsockname(),))
|
|
# Newer python does this automatically. Do it manually here for
|
|
# maximum compatibility
|
|
s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
|
|
s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
|
|
|
|
name = self.server.sockets[0].getsockname()
|
|
if self.server.sockets[0].family == socket.AF_INET6:
|
|
self.address = "[%s]:%d" % (name[0], name[1])
|
|
else:
|
|
self.address = "%s:%d" % (name[0], name[1])
|
|
|
|
self.start = start_tcp
|
|
|
|
def start_unix_server(self, path):
|
|
def cleanup():
|
|
os.unlink(path)
|
|
|
|
def start_unix():
|
|
cwd = os.getcwd()
|
|
try:
|
|
# Work around path length limits in AF_UNIX
|
|
os.chdir(os.path.dirname(path))
|
|
self.server = self.loop.run_until_complete(
|
|
asyncio.start_unix_server(self.handle_client, os.path.basename(path))
|
|
)
|
|
finally:
|
|
os.chdir(cwd)
|
|
|
|
self.logger.debug('Listening on %r' % path)
|
|
|
|
self._cleanup_socket = cleanup
|
|
self.address = "unix://%s" % os.path.abspath(path)
|
|
|
|
self.start = start_unix
|
|
|
|
@abc.abstractmethod
|
|
def accept_client(self, reader, writer):
|
|
pass
|
|
|
|
async def handle_client(self, reader, writer):
|
|
# writer.transport.set_write_buffer_limits(0)
|
|
try:
|
|
client = self.accept_client(reader, writer)
|
|
await client.process_requests()
|
|
except Exception as e:
|
|
import traceback
|
|
self.logger.error('Error from client: %s' % str(e), exc_info=True)
|
|
traceback.print_exc()
|
|
writer.close()
|
|
self.logger.debug('Client disconnected')
|
|
|
|
def run_loop_forever(self):
|
|
try:
|
|
self.loop.run_forever()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
def signal_handler(self):
|
|
self.logger.debug("Got exit signal")
|
|
self.loop.stop()
|
|
|
|
def _serve_forever(self):
|
|
try:
|
|
self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler)
|
|
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM])
|
|
|
|
self.run_loop_forever()
|
|
self.server.close()
|
|
|
|
self.loop.run_until_complete(self.server.wait_closed())
|
|
self.logger.debug('Server shutting down')
|
|
finally:
|
|
if self._cleanup_socket is not None:
|
|
self._cleanup_socket()
|
|
|
|
def serve_forever(self):
|
|
"""
|
|
Serve requests in the current process
|
|
"""
|
|
# Create loop and override any loop that may have existed in
|
|
# a parent process. It is possible that the usecases of
|
|
# serve_forever might be constrained enough to allow using
|
|
# get_event_loop here, but better safe than sorry for now.
|
|
self.loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self.loop)
|
|
self.start()
|
|
self._serve_forever()
|
|
|
|
def serve_as_process(self, *, prefunc=None, args=()):
|
|
"""
|
|
Serve requests in a child process
|
|
"""
|
|
def run(queue):
|
|
# Create loop and override any loop that may have existed
|
|
# in a parent process. Without doing this and instead
|
|
# using get_event_loop, at the very minimum the hashserv
|
|
# unit tests will hang when running the second test.
|
|
# This happens since get_event_loop in the spawned server
|
|
# process for the second testcase ends up with the loop
|
|
# from the hashserv client created in the unit test process
|
|
# when running the first testcase. The problem is somewhat
|
|
# more general, though, as any potential use of asyncio in
|
|
# Cooker could create a loop that needs to replaced in this
|
|
# new process.
|
|
self.loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self.loop)
|
|
try:
|
|
self.start()
|
|
finally:
|
|
queue.put(self.address)
|
|
queue.close()
|
|
|
|
if prefunc is not None:
|
|
prefunc(self, *args)
|
|
|
|
self._serve_forever()
|
|
|
|
if sys.version_info >= (3, 6):
|
|
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
|
|
self.loop.close()
|
|
|
|
queue = multiprocessing.Queue()
|
|
|
|
# Temporarily block SIGTERM. The server process will inherit this
|
|
# block which will ensure it doesn't receive the SIGTERM until the
|
|
# handler is ready for it
|
|
mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGTERM])
|
|
try:
|
|
self.process = multiprocessing.Process(target=run, args=(queue,))
|
|
self.process.start()
|
|
|
|
self.address = queue.get()
|
|
queue.close()
|
|
queue.join_thread()
|
|
|
|
return self.process
|
|
finally:
|
|
signal.pthread_sigmask(signal.SIG_SETMASK, mask)
|