bitbake: hashserv: test: Add bitbake-hashclient tests

The bitbake-hashclient command-line tool now has a lot more features
which should be tested, so add some tests for them.

(Bitbake rev: 178cf99673d7ddf8e0bb63a5a43331a18f3286d5)

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Joshua Watt
2023-11-03 08:26:35 -06:00
committed by Richard Purdie
parent c1574ae46f
commit 92a9d6d55d

View File

@@ -19,6 +19,14 @@ import unittest
import socket
import time
import signal
import subprocess
import json
import re
from pathlib import Path
THIS_DIR = Path(__file__).parent
BIN_DIR = THIS_DIR.parent.parent / "bin"
def server_prefunc(server, idx):
logging.basicConfig(level=logging.DEBUG, filename='bbhashserv-%d.log' % idx, filemode='w',
@@ -103,8 +111,22 @@ class HashEquivalenceTestSetup(object):
result = client.get_unihash(self.METHOD, taskhash)
self.assertEqual(result, unihash)
def assertUserPerms(self, user, permissions):
with self.auth_client(user) as client:
info = client.get_user()
self.assertEqual(info, {
"username": user["username"],
"permissions": permissions,
})
def assertUserCanAuth(self, user):
with self.start_client(self.auth_server.address) as client:
client.auth(user["username"], user["token"])
def assertUserCannotAuth(self, user):
with self.start_client(self.auth_server.address) as client, self.assertRaises(InvokeError):
client.auth(user["username"], user["token"])
class HashEquivalenceCommonTests(object):
def create_test_hash(self, client):
# Simple test that hashes can be created
taskhash = '35788efcb8dfb0a02659d81cf2bfd695fb30faf9'
@@ -117,6 +139,24 @@ class HashEquivalenceCommonTests(object):
self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash')
return taskhash, outhash, unihash
def run_hashclient(self, args, **kwargs):
try:
p = subprocess.run(
[BIN_DIR / "bitbake-hashclient"] + args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="utf-8",
**kwargs
)
except subprocess.CalledProcessError as e:
print(e.output)
raise e
print(p.stdout)
return p
class HashEquivalenceCommonTests(object):
def test_create_hash(self):
return self.create_test_hash(self.client)
@@ -161,7 +201,7 @@ class HashEquivalenceCommonTests(object):
self.assertClientGetHash(self.client, taskhash, unihash)
def test_remove_taskhash(self):
taskhash, outhash, unihash = self.test_create_hash()
taskhash, outhash, unihash = self.create_test_hash(self.client)
result = self.client.remove({"taskhash": taskhash})
self.assertGreater(result["count"], 0)
self.assertClientGetHash(self.client, taskhash, None)
@@ -170,13 +210,13 @@ class HashEquivalenceCommonTests(object):
self.assertIsNone(result_outhash)
def test_remove_unihash(self):
taskhash, outhash, unihash = self.test_create_hash()
taskhash, outhash, unihash = self.create_test_hash(self.client)
result = self.client.remove({"unihash": unihash})
self.assertGreater(result["count"], 0)
self.assertClientGetHash(self.client, taskhash, None)
def test_remove_outhash(self):
taskhash, outhash, unihash = self.test_create_hash()
taskhash, outhash, unihash = self.create_test_hash(self.client)
result = self.client.remove({"outhash": outhash})
self.assertGreater(result["count"], 0)
@@ -184,7 +224,7 @@ class HashEquivalenceCommonTests(object):
self.assertIsNone(result_outhash)
def test_remove_method(self):
taskhash, outhash, unihash = self.test_create_hash()
taskhash, outhash, unihash = self.create_test_hash(self.client)
result = self.client.remove({"method": self.METHOD})
self.assertGreater(result["count"], 0)
self.assertClientGetHash(self.client, taskhash, None)
@@ -193,7 +233,7 @@ class HashEquivalenceCommonTests(object):
self.assertIsNone(result_outhash)
def test_clean_unused(self):
taskhash, outhash, unihash = self.test_create_hash()
taskhash, outhash, unihash = self.create_test_hash(self.client)
# Clean the database, which should not remove anything because all hashes an in-use
result = self.client.clean_unused(0)
@@ -497,7 +537,7 @@ class HashEquivalenceCommonTests(object):
admin_client = self.start_auth_server()
# Create hashes with non-authenticated server
taskhash, outhash, unihash = self.test_create_hash()
taskhash, outhash, unihash = self.create_test_hash(self.client)
# Validate hash can be retrieved using authenticated client
with self.auth_perms("@read") as client:
@@ -534,14 +574,6 @@ class HashEquivalenceCommonTests(object):
with self.start_client(self.auth_server.address) as client, self.assertRaises(InvokeError):
client.refresh_token()
def assertUserCanAuth(self, user):
with self.start_client(self.auth_server.address) as client:
client.auth(user["username"], user["token"])
def assertUserCannotAuth(self, user):
with self.start_client(self.auth_server.address) as client, self.assertRaises(InvokeError):
client.auth(user["username"], user["token"])
def test_auth_self_token_refresh(self):
admin_client = self.start_auth_server()
@@ -650,14 +682,6 @@ class HashEquivalenceCommonTests(object):
with self.auth_perms("@user-admin") as client, self.assertRaises(InvokeError):
client.delete_user(user["username"])
def assertUserPerms(self, user, permissions):
with self.auth_client(user) as client:
info = client.get_user()
self.assertEqual(info, {
"username": user["username"],
"permissions": permissions,
})
def test_auth_set_user_perms(self):
admin_client = self.start_auth_server()
@@ -785,6 +809,236 @@ class HashEquivalenceCommonTests(object):
for col in columns:
self.client.remove({col: ""})
class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
def get_server_addr(self, server_idx):
return "unix://" + os.path.join(self.temp_dir.name, 'sock%d' % server_idx)
def test_stats(self):
self.run_hashclient(["--address", self.server_address, "stats"], check=True)
def test_stress(self):
self.run_hashclient(["--address", self.server_address, "stress"], check=True)
def test_remove_taskhash(self):
taskhash, outhash, unihash = self.create_test_hash(self.client)
self.run_hashclient([
"--address", self.server_address,
"remove",
"--where", "taskhash", taskhash,
], check=True)
self.assertClientGetHash(self.client, taskhash, None)
result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash)
self.assertIsNone(result_outhash)
def test_remove_unihash(self):
taskhash, outhash, unihash = self.create_test_hash(self.client)
self.run_hashclient([
"--address", self.server_address,
"remove",
"--where", "unihash", unihash,
], check=True)
self.assertClientGetHash(self.client, taskhash, None)
def test_remove_outhash(self):
taskhash, outhash, unihash = self.create_test_hash(self.client)
self.run_hashclient([
"--address", self.server_address,
"remove",
"--where", "outhash", outhash,
], check=True)
result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash)
self.assertIsNone(result_outhash)
def test_remove_method(self):
taskhash, outhash, unihash = self.create_test_hash(self.client)
self.run_hashclient([
"--address", self.server_address,
"remove",
"--where", "method", self.METHOD,
], check=True)
self.assertClientGetHash(self.client, taskhash, None)
result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash)
self.assertIsNone(result_outhash)
def test_clean_unused(self):
taskhash, outhash, unihash = self.create_test_hash(self.client)
# Clean the database, which should not remove anything because all hashes an in-use
self.run_hashclient([
"--address", self.server_address,
"clean-unused", "0",
], check=True)
self.assertClientGetHash(self.client, taskhash, unihash)
# Remove the unihash. The row in the outhash table should still be present
self.run_hashclient([
"--address", self.server_address,
"remove",
"--where", "unihash", unihash,
], check=True)
result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash, False)
self.assertIsNotNone(result_outhash)
# Now clean with no minimum age which will remove the outhash
self.run_hashclient([
"--address", self.server_address,
"clean-unused", "0",
], check=True)
result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash, False)
self.assertIsNone(result_outhash)
def test_refresh_token(self):
admin_client = self.start_auth_server()
user = admin_client.new_user("test-user", ["@read", "@report"])
p = self.run_hashclient([
"--address", self.auth_server.address,
"--login", user["username"],
"--password", user["token"],
"refresh-token"
], check=True)
new_token = None
for l in p.stdout.splitlines():
l = l.rstrip()
m = re.match(r'Token: +(.*)$', l)
if m is not None:
new_token = m.group(1)
self.assertTrue(new_token)
print("New token is %r" % new_token)
self.run_hashclient([
"--address", self.auth_server.address,
"--login", user["username"],
"--password", new_token,
"get-user"
], check=True)
def test_set_user_perms(self):
admin_client = self.start_auth_server()
user = admin_client.new_user("test-user", ["@read"])
self.run_hashclient([
"--address", self.auth_server.address,
"--login", admin_client.username,
"--password", admin_client.password,
"set-user-perms",
"-u", user["username"],
"@read", "@report",
], check=True)
new_user = admin_client.get_user(user["username"])
self.assertEqual(set(new_user["permissions"]), {"@read", "@report"})
def test_get_user(self):
admin_client = self.start_auth_server()
user = admin_client.new_user("test-user", ["@read"])
p = self.run_hashclient([
"--address", self.auth_server.address,
"--login", admin_client.username,
"--password", admin_client.password,
"get-user",
"-u", user["username"],
], check=True)
self.assertIn("Username:", p.stdout)
self.assertIn("Permissions:", p.stdout)
p = self.run_hashclient([
"--address", self.auth_server.address,
"--login", user["username"],
"--password", user["token"],
"get-user",
], check=True)
self.assertIn("Username:", p.stdout)
self.assertIn("Permissions:", p.stdout)
def test_get_all_users(self):
admin_client = self.start_auth_server()
admin_client.new_user("test-user1", ["@read"])
admin_client.new_user("test-user2", ["@read"])
p = self.run_hashclient([
"--address", self.auth_server.address,
"--login", admin_client.username,
"--password", admin_client.password,
"get-all-users",
], check=True)
self.assertIn("admin", p.stdout)
self.assertIn("test-user1", p.stdout)
self.assertIn("test-user2", p.stdout)
def test_new_user(self):
admin_client = self.start_auth_server()
p = self.run_hashclient([
"--address", self.auth_server.address,
"--login", admin_client.username,
"--password", admin_client.password,
"new-user",
"-u", "test-user",
"@read", "@report",
], check=True)
new_token = None
for l in p.stdout.splitlines():
l = l.rstrip()
m = re.match(r'Token: +(.*)$', l)
if m is not None:
new_token = m.group(1)
self.assertTrue(new_token)
user = {
"username": "test-user",
"token": new_token,
}
self.assertUserPerms(user, ["@read", "@report"])
def test_delete_user(self):
admin_client = self.start_auth_server()
user = admin_client.new_user("test-user", ["@read"])
p = self.run_hashclient([
"--address", self.auth_server.address,
"--login", admin_client.username,
"--password", admin_client.password,
"delete-user",
"-u", user["username"],
], check=True)
self.assertIsNone(admin_client.get_user(user["username"]))
def test_get_db_usage(self):
p = self.run_hashclient([
"--address", self.server_address,
"get-db-usage",
], check=True)
def test_get_db_query_columns(self):
p = self.run_hashclient([
"--address", self.server_address,
"get-db-query-columns",
], check=True)
class TestHashEquivalenceUnixServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase):
def get_server_addr(self, server_idx):
return "unix://" + os.path.join(self.temp_dir.name, 'sock%d' % server_idx)