patchtest: add supporting modules

Add modules that support core patchtest functionality to
meta/lib/patchtest. These include classes and functions for handling
repository and patch objects, parsing the patchtest CLI arguments, and
other utilities.

(From OE-Core rev: 499cdad7a16f6cc256837069c7add294132127a4)

Signed-off-by: Trevor Gamblin <tgamblin@baylibre.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Trevor Gamblin
2023-10-16 15:44:56 -04:00
committed by Richard Purdie
parent 790aa2096f
commit 9d137188ad
4 changed files with 532 additions and 0 deletions

View File

@@ -0,0 +1,95 @@
# ex:ts=4:sw=4:sts=4:et
# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
#
# patchtestdata: module used to share command line arguments between
# patchtest & test suite and a data store between test cases
#
# Copyright (C) 2016 Intel Corporation
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Author: Leo Sandoval <leonardo.sandoval.gonzalez@linux.intel.com>
#
# NOTE: Strictly speaking, unit test should be isolated from outside,
# but patchtest test suites uses command line input data and
# pretest and test test cases may use the datastore defined
# on this module
import os
import argparse
import collections
import tempfile
import logging
logger=logging.getLogger('patchtest')
info=logger.info
# Data store commonly used to share values between pre and post-merge tests
PatchTestDataStore = collections.defaultdict(str)
class PatchTestInput(object):
"""Abstract the patchtest argument parser"""
@classmethod
def set_namespace(cls):
parser = cls.get_parser()
parser.parse_args(namespace=cls)
@classmethod
def get_parser(cls):
parser = argparse.ArgumentParser()
target_patch_group = parser.add_mutually_exclusive_group(required=True)
target_patch_group.add_argument('--patch', metavar='PATCH', dest='patch_path',
help='The patch to be tested')
target_patch_group.add_argument('--directory', metavar='DIRECTORY', dest='patch_path',
help='The directory containing patches to be tested')
parser.add_argument('repodir', metavar='REPO',
help="Name of the repository where patch is merged")
parser.add_argument('startdir', metavar='TESTDIR',
help="Directory where test cases are located")
parser.add_argument('--top-level-directory', '-t',
dest='topdir',
default=None,
help="Top level directory of project (defaults to start directory)")
parser.add_argument('--pattern', '-p',
dest='pattern',
default='test*.py',
help="Pattern to match test files")
parser.add_argument('--base-branch', '-b',
dest='basebranch',
help="Branch name used by patchtest to branch from. By default, it uses the current one.")
parser.add_argument('--base-commit', '-c',
dest='basecommit',
help="Commit ID used by patchtest to branch from. By default, it uses HEAD.")
parser.add_argument('--debug', '-d',
action='store_true',
help='Enable debug output')
parser.add_argument('--log-results',
action='store_true',
help='Enable logging to a file matching the target patch name with ".testresult" appended')
return parser

View File

@@ -0,0 +1,73 @@
# ex:ts=4:sw=4:sts=4:et
# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
#
# patchtestpatch: PatchTestPatch class which abstracts a patch file
#
# Copyright (C) 2016 Intel Corporation
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import logging
import utils
logger = logging.getLogger('patchtest')
class PatchTestPatch(object):
MERGE_STATUS_INVALID = 'INVALID'
MERGE_STATUS_NOT_MERGED = 'NOTMERGED'
MERGE_STATUS_MERGED_SUCCESSFULL = 'PASS'
MERGE_STATUS_MERGED_FAIL = 'FAIL'
MERGE_STATUS = (MERGE_STATUS_INVALID,
MERGE_STATUS_NOT_MERGED,
MERGE_STATUS_MERGED_SUCCESSFULL,
MERGE_STATUS_MERGED_FAIL)
def __init__(self, path, forcereload=False):
self._path = path
self._forcereload = forcereload
self._contents = None
self._branch = None
self._merge_status = PatchTestPatch.MERGE_STATUS_NOT_MERGED
@property
def contents(self):
if self._forcereload or (not self._contents):
logger.debug('Reading %s contents' % self._path)
try:
with open(self._path, newline='') as _f:
self._contents = _f.read()
except IOError:
logger.warn("Reading the mbox %s failed" % self.resource)
return self._contents
@property
def path(self):
return self._path
@property
def branch(self):
if not self._branch:
self._branch = utils.get_branch(self._path)
return self._branch
def setmergestatus(self, status):
self._merge_status = status
def getmergestatus(self):
return self._merge_status
merge_status = property(getmergestatus, setmergestatus)

185
meta/lib/patchtest/repo.py Normal file
View File

@@ -0,0 +1,185 @@
# ex:ts=4:sw=4:sts=4:et
# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
#
# patchtestrepo: PatchTestRepo class used mainly to control a git repo from patchtest
#
# Copyright (C) 2016 Intel Corporation
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import os
import utils
import logging
import json
from patch import PatchTestPatch
logger = logging.getLogger('patchtest')
info=logger.info
class PatchTestRepo(object):
# prefixes used for temporal branches/stashes
prefix = 'patchtest'
def __init__(self, patch, repodir, commit=None, branch=None):
self._repodir = repodir
self._patch = PatchTestPatch(patch)
self._current_branch = self._get_current_branch()
# targeted branch defined on the patch may be invalid, so make sure there
# is a corresponding remote branch
valid_patch_branch = None
if self._patch.branch in self.upstream_branches():
valid_patch_branch = self._patch.branch
# Target Branch
# Priority (top has highest priority):
# 1. branch given at cmd line
# 2. branch given at the patch
# 3. current branch
self._branch = branch or valid_patch_branch or self._current_branch
# Target Commit
# Priority (top has highest priority):
# 1. commit given at cmd line
# 2. branch given at cmd line
# 3. branch given at the patch
# 3. current HEAD
self._commit = self._get_commitid(commit) or \
self._get_commitid(branch) or \
self._get_commitid(valid_patch_branch) or \
self._get_commitid('HEAD')
self._workingbranch = "%s_%s" % (PatchTestRepo.prefix, os.getpid())
# create working branch
self._exec({'cmd': ['git', 'checkout', '-b', self._workingbranch, self._commit]})
self._patchmerged = False
# Check if patch can be merged using git-am
self._patchcanbemerged = True
try:
self._exec({'cmd': ['git', 'am', '--keep-cr'], 'input': self._patch.contents})
except utils.CmdException as ce:
self._exec({'cmd': ['git', 'am', '--abort']})
self._patchcanbemerged = False
finally:
# if patch was applied, remove it
if self._patchcanbemerged:
self._exec({'cmd':['git', 'reset', '--hard', self._commit]})
# for debugging purposes, print all repo parameters
logger.debug("Parameters")
logger.debug("\tRepository : %s" % self._repodir)
logger.debug("\tTarget Commit : %s" % self._commit)
logger.debug("\tTarget Branch : %s" % self._branch)
logger.debug("\tWorking branch : %s" % self._workingbranch)
logger.debug("\tPatch : %s" % self._patch)
@property
def patch(self):
return self._patch.path
@property
def branch(self):
return self._branch
@property
def commit(self):
return self._commit
@property
def ismerged(self):
return self._patchmerged
@property
def canbemerged(self):
return self._patchcanbemerged
def _exec(self, cmds):
_cmds = []
if isinstance(cmds, dict):
_cmds.append(cmds)
elif isinstance(cmds, list):
_cmds = cmds
else:
raise utils.CmdException({'cmd':str(cmds)})
results = []
cmdfailure = False
try:
results = utils.exec_cmds(_cmds, self._repodir)
except utils.CmdException as ce:
cmdfailure = True
raise ce
finally:
if cmdfailure:
for cmd in _cmds:
logger.debug("CMD: %s" % ' '.join(cmd['cmd']))
else:
for result in results:
cmd, rc, stdout, stderr = ' '.join(result['cmd']), result['returncode'], result['stdout'], result['stderr']
logger.debug("CMD: %s RCODE: %s STDOUT: %s STDERR: %s" % (cmd, rc, stdout, stderr))
return results
def _get_current_branch(self, commit='HEAD'):
cmd = {'cmd':['git', 'rev-parse', '--abbrev-ref', commit]}
cb = self._exec(cmd)[0]['stdout']
if cb == commit:
logger.warning('You may be detached so patchtest will checkout to master after execution')
cb = 'master'
return cb
def _get_commitid(self, commit):
if not commit:
return None
try:
cmd = {'cmd':['git', 'rev-parse', '--short', commit]}
return self._exec(cmd)[0]['stdout']
except utils.CmdException as ce:
# try getting the commit under any remotes
cmd = {'cmd':['git', 'remote']}
remotes = self._exec(cmd)[0]['stdout']
for remote in remotes.splitlines():
cmd = {'cmd':['git', 'rev-parse', '--short', '%s/%s' % (remote, commit)]}
try:
return self._exec(cmd)[0]['stdout']
except utils.CmdException:
pass
return None
def upstream_branches(self):
cmd = {'cmd':['git', 'branch', '--remotes']}
remote_branches = self._exec(cmd)[0]['stdout']
# just get the names, without the remote name
branches = set(branch.split('/')[-1] for branch in remote_branches.splitlines())
return branches
def merge(self):
if self._patchcanbemerged:
self._exec({'cmd': ['git', 'am', '--keep-cr'],
'input': self._patch.contents,
'updateenv': {'PTRESOURCE':self._patch.path}})
self._patchmerged = True
def clean(self):
self._exec({'cmd':['git', 'checkout', '%s' % self._current_branch]})
self._exec({'cmd':['git', 'branch', '-D', self._workingbranch]})
self._patchmerged = False

179
meta/lib/patchtest/utils.py Normal file
View File

@@ -0,0 +1,179 @@
# ex:ts=4:sw=4:sts=4:et
# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
#
# utils: common methods used by the patchtest framework
#
# Copyright (C) 2016 Intel Corporation
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import os
import subprocess
import logging
import sys
import re
import mailbox
class CmdException(Exception):
""" Simple exception class where its attributes are the ones passed when instantiated """
def __init__(self, cmd):
self._cmd = cmd
def __getattr__(self, name):
value = None
if self._cmd.has_key(name):
value = self._cmd[name]
return value
def exec_cmd(cmd, cwd, ignore_error=False, input=None, strip=True, updateenv={}):
"""
Input:
cmd: dict containing the following keys:
cmd : the command itself as an array of strings
ignore_error: if False, no exception is raised
strip: indicates if strip is done on the output (stdout and stderr)
input: input data to the command (stdin)
updateenv: environment variables to be appended to the current
process environment variables
NOTE: keys 'ignore_error' and 'input' are optional; if not included,
the defaults are the ones specify in the arguments
cwd: directory where commands are executed
ignore_error: raise CmdException if command fails to execute and
this value is False
input: input data (stdin) for the command
Output: dict containing the following keys:
cmd: the same as input
ignore_error: the same as input
strip: the same as input
input: the same as input
stdout: Standard output after command's execution
stderr: Standard error after command's execution
returncode: Return code after command's execution
"""
cmddefaults = {
'cmd':'',
'ignore_error':ignore_error,
'strip':strip,
'input':input,
'updateenv':updateenv,
}
# update input values if necessary
cmddefaults.update(cmd)
_cmd = cmddefaults
if not _cmd['cmd']:
raise CmdException({'cmd':None, 'stderr':'no command given'})
# update the environment
env = os.environ
env.update(_cmd['updateenv'])
_command = [e for e in _cmd['cmd']]
p = subprocess.Popen(_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
cwd=cwd,
env=env)
# execute the command and strip output
(_stdout, _stderr) = p.communicate(_cmd['input'])
if _cmd['strip']:
_stdout, _stderr = map(str.strip, [_stdout, _stderr])
# generate the result
result = _cmd
result.update({'cmd':_command,'stdout':_stdout,'stderr':_stderr,'returncode':p.returncode})
# launch exception if necessary
if not _cmd['ignore_error'] and p.returncode:
raise CmdException(result)
return result
def exec_cmds(cmds, cwd):
""" Executes commands
Input:
cmds: Array of commands
cwd: directory where commands are executed
Output: Array of output commands
"""
results = []
_cmds = cmds
for cmd in _cmds:
result = exec_cmd(cmd, cwd)
results.append(result)
return results
def logger_create(name):
logger = logging.getLogger(name)
loggerhandler = logging.StreamHandler()
loggerhandler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(loggerhandler)
logger.setLevel(logging.INFO)
return logger
def get_subject_prefix(path):
prefix = ""
mbox = mailbox.mbox(path)
if len(mbox):
subject = mbox[0]['subject']
if subject:
pattern = re.compile("(\[.*\])", re.DOTALL)
match = pattern.search(subject)
if match:
prefix = match.group(1)
return prefix
def valid_branch(branch):
""" Check if branch is valid name """
lbranch = branch.lower()
invalid = lbranch.startswith('patch') or \
lbranch.startswith('rfc') or \
lbranch.startswith('resend') or \
re.search('^v\d+', lbranch) or \
re.search('^\d+/\d+', lbranch)
return not invalid
def get_branch(path):
""" Get the branch name from mbox """
fullprefix = get_subject_prefix(path)
branch, branches, valid_branches = None, [], []
if fullprefix:
prefix = fullprefix.strip('[]')
branches = [ b.strip() for b in prefix.split(',')]
valid_branches = [b for b in branches if valid_branch(b)]
if len(valid_branches):
branch = valid_branches[0]
return branch