bitbake: prserv: Handle requests in main thread

The prserver process is cleanly separated from the main bitbake process
so requests can be handled in the main thread. This removes the need for
a request queue and a separate request handling thread.

(Bitbake rev: 6b09415bed6b5e7c12aaf39b677d9ef72844e233)

Signed-off-by: Paul Barker <pbarker@konsulko.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Paul Barker
2021-04-29 15:11:12 +01:00
committed by Richard Purdie
parent d66a1d83f5
commit 81b55a050d

View File

@@ -5,8 +5,6 @@
import os,sys,logging
import signal, time
from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
import threading
import queue
import socket
import io
import sqlite3
@@ -14,7 +12,6 @@ import bb.server.xmlrpcclient
import prserv
import prserv.db
import errno
import select
import multiprocessing
logger = logging.getLogger("BitBake.PRserv")
@@ -48,54 +45,17 @@ class PRServer(SimpleXMLRPCServer):
self.dbfile=dbfile
self.logfile=logfile
self.working_thread=None
self.host, self.port = self.socket.getsockname()
self.pidfile=PIDPREFIX % (self.host, self.port)
self.register_function(self.getPR, "getPR")
self.register_function(self.quit, "quit")
self.register_function(self.ping, "ping")
self.register_function(self.export, "export")
self.register_function(self.importone, "importone")
self.register_introspection_functions()
self.quitpipein, self.quitpipeout = os.pipe()
self.requestqueue = queue.Queue()
self.handlerthread = threading.Thread(target = self.process_request_thread)
self.handlerthread.daemon = False
def process_request_thread(self):
"""Same as in BaseServer but as a thread.
In addition, exception handling is done here.
"""
iter_count = 1
self.iter_count = 0
# 60 iterations between syncs or sync if dirty every ~30 seconds
iterations_between_sync = 60
bb.utils.set_process_name("PRServ Handler")
while not self.quitflag:
try:
(request, client_address) = self.requestqueue.get(True, 30)
except queue.Empty:
self.table.sync_if_dirty()
continue
if request is None:
continue
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
iter_count = (iter_count + 1) % iterations_between_sync
if iter_count == 0:
self.table.sync_if_dirty()
except:
self.handle_error(request, client_address)
self.shutdown_request(request)
self.table.sync()
self.table.sync_if_dirty()
self.iterations_between_sync = 60
def sigint_handler(self, signum, stack):
if self.table:
@@ -104,11 +64,30 @@ class PRServer(SimpleXMLRPCServer):
def sigterm_handler(self, signum, stack):
if self.table:
self.table.sync()
self.quit()
self.requestqueue.put((None, None))
raise(SystemExit)
def process_request(self, request, client_address):
self.requestqueue.put((request, client_address))
if request is None:
return
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
self.iter_count = (self.iter_count + 1) % self.iterations_between_sync
if self.iter_count == 0:
self.table.sync_if_dirty()
except:
self.handle_error(request, client_address)
self.shutdown_request(request)
self.table.sync()
self.table.sync_if_dirty()
def serve_forever(self, poll_interval=0.5):
signal.signal(signal.SIGINT, self.sigint_handler)
signal.signal(signal.SIGTERM, self.sigterm_handler)
self.db = prserv.db.PRData(self.dbfile)
self.table = self.db["PRMAIN"]
return super().serve_forever(poll_interval)
def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
try:
@@ -121,7 +100,7 @@ class PRServer(SimpleXMLRPCServer):
return self.table.importone(version, pkgarch, checksum, value)
def ping(self):
return not self.quitflag
return True
def getinfo(self):
return (self.host, self.port)
@@ -136,45 +115,6 @@ class PRServer(SimpleXMLRPCServer):
logger.error(str(exc))
return None
def quit(self):
self.quitflag=True
os.write(self.quitpipeout, b"q")
os.close(self.quitpipeout)
return
def work_forever(self,):
self.quitflag = False
# This timeout applies to the poll in TCPServer, we need the select
# below to wake on our quit pipe closing. We only ever call into handle_request
# if there is data there.
self.timeout = 0.01
signal.signal(signal.SIGINT, self.sigint_handler)
signal.signal(signal.SIGTERM, self.sigterm_handler)
bb.utils.set_process_name("PRServ")
# DB connection must be created after all forks
self.db = prserv.db.PRData(self.dbfile)
self.table = self.db["PRMAIN"]
logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
(self.dbfile, self.host, self.port, str(os.getpid())))
self.handlerthread.start()
while not self.quitflag:
ready = select.select([self.fileno(), self.quitpipein], [], [], 30)
if self.quitflag:
break
if self.fileno() in ready[0]:
self.handle_request()
self.handlerthread.join()
self.db.disconnect()
logger.info("PRServer: stopping...")
self.server_close()
os.close(self.quitpipein)
return
class PRServSingleton(object):
def __init__(self, dbfile, logfile, interface):
self.dbfile = dbfile
@@ -185,7 +125,7 @@ class PRServSingleton(object):
def start(self):
self.prserv = PRServer(self.dbfile, self.logfile, self.interface)
self.process = multiprocessing.Process(target=self.prserv.work_forever)
self.process = multiprocessing.Process(target=self.prserv.serve_forever)
self.process.start()
self.host, self.port = self.prserv.getinfo()
@@ -201,13 +141,6 @@ class PRServerConnection(object):
self.port = port
self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port)
def terminate(self):
try:
logger.info("Terminating PRServer...")
self.connection.quit()
except Exception as exc:
sys.stderr.write("%s\n" % str(exc))
def getPR(self, version, pkgarch, checksum):
return self.connection.getPR(version, pkgarch, checksum)
@@ -308,7 +241,7 @@ def start_daemon(dbfile, host, port, logfile):
return 1
server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
run_as_daemon(server.work_forever, pidfile, os.path.abspath(logfile))
run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile))
# Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
# the one the server actually is listening, so at least warn the user about it
@@ -345,25 +278,13 @@ def stop_daemon(host, port):
return 1
try:
PRServerConnection(ip, port).terminate()
except:
logger.critical("Stop PRService %s:%d failed" % (host,port))
if is_running(pid):
print("Sending SIGTERM to pr-server.")
os.kill(pid, signal.SIGTERM)
time.sleep(0.1)
try:
if pid:
wait_timeout = 0
print("Waiting for pr-server to exit.")
while is_running(pid) and wait_timeout < 50:
time.sleep(0.1)
wait_timeout += 1
if is_running(pid):
print("Sending SIGTERM to pr-server.")
os.kill(pid,signal.SIGTERM)
time.sleep(0.1)
if os.path.exists(pidfile):
os.remove(pidfile)
if os.path.exists(pidfile):
os.remove(pidfile)
except OSError as e:
err = str(e)
@@ -439,17 +360,9 @@ def auto_start(d):
def auto_shutdown():
global singleton
if singleton:
host, port = singleton.getinfo()
try:
PRServerConnection(host, port).terminate()
except:
logger.critical("Stop PRService %s:%d failed" % (host,port))
try:
os.waitpid(singleton.prserv.pid, 0)
except ChildProcessError:
pass
if singleton and singleton.process:
singleton.process.terminate()
singleton.process.join()
singleton = None
def ping(host, port):