bitbake: prserv: Replace XML RPC with modern asyncrpc implementation

Update the prserv client and server classes to use the modern json and
asyncio based RPC system implemented by the asyncrpc module.

(Bitbake rev: 6a2b23e27bb61185b8afb382e20ce79f996d9183)

Signed-off-by: Paul Barker <pbarker@konsulko.com>
[updated for asyncrpc changes, client split to separate file]
Signed-off-by: Scott Murray <scott.murray@konsulko.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Paul Barker
2021-08-19 12:46:43 -04:00
committed by Richard Purdie
parent 4df610473f
commit fb3b05fe8d
2 changed files with 156 additions and 137 deletions

View File

@@ -0,0 +1,41 @@
#
# SPDX-License-Identifier: GPL-2.0-only
#
import logging
import bb.asyncrpc
logger = logging.getLogger("BitBake.PRserv")
class PRAsyncClient(bb.asyncrpc.AsyncClient):
def __init__(self):
super().__init__('PRSERVICE', '1.0', logger)
async def getPR(self, version, pkgarch, checksum):
response = await self.send_message(
{'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}}
)
if response:
return response['value']
async def importone(self, version, pkgarch, checksum, value):
response = await self.send_message(
{'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}}
)
if response:
return response['value']
async def export(self, version, pkgarch, checksum, colinfo):
response = await self.send_message(
{'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}}
)
if response:
return (response['metainfo'], response['datainfo'])
class PRClient(bb.asyncrpc.Client):
def __init__(self):
super().__init__()
self._add_methods('getPR', 'importone', 'export')
def _get_async_client(self):
return PRAsyncClient()

View File

@@ -4,157 +4,125 @@
import os,sys,logging
import signal, time
from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
import socket
import io
import sqlite3
import bb.server.xmlrpcclient
import prserv
import prserv.db
import errno
import multiprocessing
import bb.asyncrpc
logger = logging.getLogger("BitBake.PRserv")
class Handler(SimpleXMLRPCRequestHandler):
def _dispatch(self,method,params):
try:
value=self.server.funcs[method](*params)
except:
import traceback
traceback.print_exc()
raise
return value
PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
singleton = None
class PRServerClient(bb.asyncrpc.AsyncServerConnection):
def __init__(self, reader, writer, table):
super().__init__(reader, writer, 'PRSERVICE', logger)
self.handlers.update({
'get-pr': self.handle_get_pr,
'import-one': self.handle_import_one,
'export': self.handle_export,
})
self.table = table
class PRServer(SimpleXMLRPCServer):
def __init__(self, dbfile, logfile, interface):
''' constructor '''
def validate_proto_version(self):
return (self.proto_version == (1, 0))
async def dispatch_message(self, msg):
try:
SimpleXMLRPCServer.__init__(self, interface,
logRequests=False, allow_none=True)
except socket.error:
ip=socket.gethostbyname(interface[0])
port=interface[1]
msg="PR Server unable to bind to %s:%s\n" % (ip, port)
sys.stderr.write(msg)
raise PRServiceConfigError
self.dbfile=dbfile
self.logfile=logfile
self.host, self.port = self.socket.getsockname()
self.register_function(self.getPR, "getPR")
self.register_function(self.ping, "ping")
self.register_function(self.export, "export")
self.register_function(self.importone, "importone")
self.register_introspection_functions()
self.iter_count = 0
# 60 iterations between syncs or sync if dirty every ~30 seconds
self.iterations_between_sync = 60
def sigint_handler(self, signum, stack):
if self.table:
self.table.sync()
def sigterm_handler(self, signum, stack):
if self.table:
self.table.sync()
raise(SystemExit)
def process_request(self, request, client_address):
if request is None:
return
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
self.iter_count = (self.iter_count + 1) % self.iterations_between_sync
if self.iter_count == 0:
self.table.sync_if_dirty()
await super().dispatch_message(msg)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)
self.table.sync()
raise
self.table.sync_if_dirty()
def serve_forever(self, poll_interval=0.5):
signal.signal(signal.SIGINT, self.sigint_handler)
signal.signal(signal.SIGTERM, self.sigterm_handler)
async def handle_get_pr(self, request):
version = request['version']
pkgarch = request['pkgarch']
checksum = request['checksum']
self.db = prserv.db.PRData(self.dbfile)
self.table = self.db["PRMAIN"]
return super().serve_forever(poll_interval)
def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
response = None
try:
return self.table.export(version, pkgarch, checksum, colinfo)
except sqlite3.Error as exc:
logger.error(str(exc))
return None
def importone(self, version, pkgarch, checksum, value):
return self.table.importone(version, pkgarch, checksum, value)
def ping(self):
return True
def getinfo(self):
return (self.host, self.port)
def getPR(self, version, pkgarch, checksum):
try:
return self.table.getValue(version, pkgarch, checksum)
value = self.table.getValue(version, pkgarch, checksum)
response = {'value': value}
except prserv.NotFoundError:
logger.error("can not find value for (%s, %s)",version, checksum)
return None
except sqlite3.Error as exc:
logger.error(str(exc))
return None
self.write_message(response)
async def handle_import_one(self, request):
version = request['version']
pkgarch = request['pkgarch']
checksum = request['checksum']
value = request['value']
value = self.table.importone(version, pkgarch, checksum, value)
if value is not None:
response = {'value': value}
else:
response = None
self.write_message(response)
async def handle_export(self, request):
version = request['version']
pkgarch = request['pkgarch']
checksum = request['checksum']
colinfo = request['colinfo']
try:
(metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo)
except sqlite3.Error as exc:
logger.error(str(exc))
metainfo = datainfo = None
response = {'metainfo': metainfo, 'datainfo': datainfo}
self.write_message(response)
class PRServer(bb.asyncrpc.AsyncServer):
def __init__(self, dbfile):
super().__init__(logger)
self.dbfile = dbfile
self.table = None
def accept_client(self, reader, writer):
return PRServerClient(reader, writer, self.table)
def _serve_forever(self):
self.db = prserv.db.PRData(self.dbfile)
self.table = self.db["PRMAIN"]
logger.debug("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
(self.dbfile, self.address, str(os.getpid())))
super()._serve_forever()
self.table.sync_if_dirty()
self.db.disconnect()
def signal_handler(self):
super().signal_handler()
if self.table:
self.table.sync()
class PRServSingleton(object):
def __init__(self, dbfile, logfile, interface):
def __init__(self, dbfile, logfile, host, port):
self.dbfile = dbfile
self.logfile = logfile
self.interface = interface
self.host = None
self.port = None
def start(self):
self.prserv = PRServer(self.dbfile, self.logfile, self.interface)
self.process = multiprocessing.Process(target=self.prserv.serve_forever)
self.process.start()
self.host, self.port = self.prserv.getinfo()
def getinfo(self):
return (self.host, self.port)
class PRServerConnection(object):
def __init__(self, host, port):
if is_local_special(host, port):
host, port = singleton.getinfo()
self.host = host
self.port = port
self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port)
def getPR(self, version, pkgarch, checksum):
return self.connection.getPR(version, pkgarch, checksum)
def start(self):
self.prserv = PRServer(self.dbfile)
self.prserv.start_tcp_server(self.host, self.port)
self.process = self.prserv.serve_as_process()
def ping(self):
return self.connection.ping()
def export(self,version=None, pkgarch=None, checksum=None, colinfo=True):
return self.connection.export(version, pkgarch, checksum, colinfo)
def importone(self, version, pkgarch, checksum, value):
return self.connection.importone(version, pkgarch, checksum, value)
def getinfo(self):
return self.host, self.port
if not self.port:
self.port = int(self.prserv.address.rsplit(':', 1)[1])
def run_as_daemon(func, pidfile, logfile):
"""
@@ -240,15 +208,13 @@ def start_daemon(dbfile, host, port, logfile):
% pidfile)
return 1
server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile))
dbfile = os.path.abspath(dbfile)
def daemon_main():
server = PRServer(dbfile)
server.start_tcp_server(host, port)
server.serve_forever()
# Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
# the one the server actually is listening, so at least warn the user about it
_,rport = server.getinfo()
if port != rport:
sys.stdout.write("Server is listening at port %s instead of %s\n"
% (rport,port))
run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile))
return 0
def stop_daemon(host, port):
@@ -302,7 +268,7 @@ def is_running(pid):
return True
def is_local_special(host, port):
if host.strip().upper() == 'localhost'.upper() and (not port):
if host.strip().lower() == 'localhost' and not port:
return True
else:
return False
@@ -340,20 +306,19 @@ def auto_start(d):
auto_shutdown()
if not singleton:
bb.utils.mkdirhier(cachedir)
singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0))
singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), "localhost", 0)
singleton.start()
if singleton:
host, port = singleton.getinfo()
host = singleton.host
port = singleton.port
else:
host = host_params[0]
port = int(host_params[1])
try:
connection = PRServerConnection(host,port)
connection.ping()
realhost, realport = connection.getinfo()
return str(realhost) + ":" + str(realport)
ping(host, port)
return str(host) + ":" + str(port)
except Exception:
logger.critical("PRservice %s:%d not available" % (host, port))
raise PRServiceConfigError
@@ -366,8 +331,21 @@ def auto_shutdown():
singleton = None
def ping(host, port):
conn=PRServerConnection(host, port)
from . import client
conn = client.PRClient()
conn.connect_tcp(host, port)
return conn.ping()
def connect(host, port):
return PRServerConnection(host, port)
from . import client
global singleton
if host.strip().lower() == 'localhost' and not port:
host = 'localhost'
port = singleton.port
conn = client.PRClient()
conn.connect_tcp(host, port)
return conn