bitbake: asyncrpc: Add Client Pool object

Adds an abstract base class that can be used to implement a pool of
client connections. The class implements a thread that runs an async
event loop, and allows derived classes to schedule work on the loop and
wait for the work to be finished.

(Bitbake rev: f113456417f9ac0a4b44b291a6e22ea8219c3a5f)

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Joshua Watt
2024-02-18 15:59:49 -07:00
committed by Richard Purdie
parent 3bd2c69e70
commit 2406bd1055
2 changed files with 78 additions and 1 deletions

View File

@@ -5,7 +5,7 @@
#
from .client import AsyncClient, Client
from .client import AsyncClient, Client, ClientPool
from .serv import AsyncServer, AsyncServerConnection
from .connection import DEFAULT_MAX_CHUNK
from .exceptions import (

View File

@@ -10,6 +10,8 @@ import json
import os
import socket
import sys
import contextlib
from threading import Thread
from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK
from .exceptions import ConnectionClosedError, InvokeError
@@ -180,3 +182,78 @@ class Client(object):
def __exit__(self, exc_type, exc_value, traceback):
self.close()
return False
class ClientPool(object):
def __init__(self, max_clients):
self.avail_clients = []
self.num_clients = 0
self.max_clients = max_clients
self.loop = None
self.client_condition = None
@abc.abstractmethod
async def _new_client(self):
raise NotImplementedError("Must be implemented in derived class")
def close(self):
if self.client_condition:
self.client_condition = None
if self.loop:
self.loop.run_until_complete(self.__close_clients())
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
self.loop.close()
self.loop = None
def run_tasks(self, tasks):
if not self.loop:
self.loop = asyncio.new_event_loop()
thread = Thread(target=self.__thread_main, args=(tasks,))
thread.start()
thread.join()
@contextlib.asynccontextmanager
async def get_client(self):
async with self.client_condition:
if self.avail_clients:
client = self.avail_clients.pop()
elif self.num_clients < self.max_clients:
self.num_clients += 1
client = await self._new_client()
else:
while not self.avail_clients:
await self.client_condition.wait()
client = self.avail_clients.pop()
try:
yield client
finally:
async with self.client_condition:
self.avail_clients.append(client)
self.client_condition.notify()
def __thread_main(self, tasks):
async def process_task(task):
async with self.get_client() as client:
await task(client)
asyncio.set_event_loop(self.loop)
if not self.client_condition:
self.client_condition = asyncio.Condition()
tasks = [process_task(t) for t in tasks]
self.loop.run_until_complete(asyncio.gather(*tasks))
async def __close_clients(self):
for c in self.avail_clients:
await c.close()
self.avail_clients = []
self.num_clients = 0
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
return False