Files
poky/bitbake/lib/hashserv/__init__.py
Joshua Watt 6ebf01bfd4 bitbake: hashserv: Chunkify large messages
The hash equivalence client and server can occasionally send messages
that are too large for the server to fit in the receive buffer (64 KB).
To prevent this, support is added to the protocol to "chunkify" the
stream and break it up into manageable pieces that the server can each
side can back together.

Ideally, this would be negotiated by the client and server, but it's
currently hard coded to 32 KB to prevent the round-trip delay.

(Bitbake rev: 1a7bddb5471a02a744e7a441a3b4a6da693348b0)

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
(cherry picked from commit e27a28c1e40e886ee68ba4b99b537ffc9c3577d4)
Signed-off-by: Steve Sakoman <steve@sakoman.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
2020-07-02 16:11:40 +01:00

116 lines
3.1 KiB
Python

# Copyright (C) 2018-2019 Garmin Ltd.
#
# SPDX-License-Identifier: GPL-2.0-only
#
from contextlib import closing
import re
import sqlite3
import itertools
import json
UNIX_PREFIX = "unix://"
ADDR_TYPE_UNIX = 0
ADDR_TYPE_TCP = 1
# The Python async server defaults to a 64K receive buffer, so we hardcode our
# maximum chunk size. It would be better if the client and server reported to
# each other what the maximum chunk sizes were, but that will slow down the
# connection setup with a round trip delay so I'd rather not do that unless it
# is necessary
DEFAULT_MAX_CHUNK = 32 * 1024
def setup_database(database, sync=True):
db = sqlite3.connect(database)
db.row_factory = sqlite3.Row
with closing(db.cursor()) as cursor:
cursor.execute('''
CREATE TABLE IF NOT EXISTS tasks_v2 (
id INTEGER PRIMARY KEY AUTOINCREMENT,
method TEXT NOT NULL,
outhash TEXT NOT NULL,
taskhash TEXT NOT NULL,
unihash TEXT NOT NULL,
created DATETIME,
-- Optional fields
owner TEXT,
PN TEXT,
PV TEXT,
PR TEXT,
task TEXT,
outhash_siginfo TEXT,
UNIQUE(method, outhash, taskhash)
)
''')
cursor.execute('PRAGMA journal_mode = WAL')
cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF'))
# Drop old indexes
cursor.execute('DROP INDEX IF EXISTS taskhash_lookup')
cursor.execute('DROP INDEX IF EXISTS outhash_lookup')
# Create new indexes
cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)')
cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)')
return db
def parse_address(addr):
if addr.startswith(UNIX_PREFIX):
return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
else:
m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
if m is not None:
host = m.group('host')
port = m.group('port')
else:
host, port = addr.split(':')
return (ADDR_TYPE_TCP, (host, int(port)))
def chunkify(msg, max_chunk):
if len(msg) < max_chunk - 1:
yield ''.join((msg, "\n"))
else:
yield ''.join((json.dumps({
'chunk-stream': None
}), "\n"))
args = [iter(msg)] * (max_chunk - 1)
for m in map(''.join, itertools.zip_longest(*args, fillvalue='')):
yield ''.join(itertools.chain(m, "\n"))
yield "\n"
def create_server(addr, dbname, *, sync=True):
from . import server
db = setup_database(dbname, sync=sync)
s = server.Server(db)
(typ, a) = parse_address(addr)
if typ == ADDR_TYPE_UNIX:
s.start_unix_server(*a)
else:
s.start_tcp_server(*a)
return s
def create_client(addr):
from . import client
c = client.Client()
(typ, a) = parse_address(addr)
if typ == ADDR_TYPE_UNIX:
c.connect_unix(*a)
else:
c.connect_tcp(*a)
return c