Files
poky/meta/classes/cve-check.bbclass
Ernst Sjöstrand cc3cefdb43 cve-check: Only include installed packages for rootfs manifest
Before this the rootfs manifest and the summary were identical.
We should separate the summary and rootfs manifest more clearly,
now the summary is for all CVEs and the rootfs manifest is only for
things in that image. This is even more useful if you build multiple
images.

(From OE-Core rev: 2bacd7cc67b2f624885ce9c9c9e48950b359387d)

Signed-off-by: Ernst Sjöstrand <ernstp@gmail.com>
Signed-off-by: Luca Ceresoli <luca.ceresoli@bootlin.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
(cherry picked from commit 3b8cc6fc45f0ea5677729ee2b1819bdc7a441ab1)
Signed-off-by: Steve Sakoman <steve@sakoman.com>
(cherry picked from commit 65498411d73e8008d5550c2d0a1148f990717587)
Signed-off-by: Steve Sakoman <steve@sakoman.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
2022-06-04 12:16:59 +01:00

600 lines
22 KiB
Plaintext

# This class is used to check recipes against public CVEs.
#
# In order to use this class just inherit the class in the
# local.conf file and it will add the cve_check task for
# every recipe. The task can be used per recipe, per image,
# or using the special cases "world" and "universe". The
# cve_check task will print a warning for every unpatched
# CVE found and generate a file in the recipe WORKDIR/cve
# directory. If an image is build it will generate a report
# in DEPLOY_DIR_IMAGE for all the packages used.
#
# Example:
# bitbake -c cve_check openssl
# bitbake core-image-sato
# bitbake -k -c cve_check universe
#
# DISCLAIMER
#
# This class/tool is meant to be used as support and not
# the only method to check against CVEs. Running this tool
# doesn't guarantee your packages are free of CVEs.
# The product name that the CVE database uses defaults to BPN, but may need to
# be overriden per recipe (for example tiff.bb sets CVE_PRODUCT=libtiff).
CVE_PRODUCT ??= "${BPN}"
CVE_VERSION ??= "${PV}"
CVE_CHECK_DB_DIR ?= "${DL_DIR}/CVE_CHECK"
CVE_CHECK_DB_FILE ?= "${CVE_CHECK_DB_DIR}/nvdcve_1.1.db"
CVE_CHECK_DB_FILE_LOCK ?= "${CVE_CHECK_DB_FILE}.lock"
CVE_CHECK_LOG ?= "${T}/cve.log"
CVE_CHECK_TMP_FILE ?= "${TMPDIR}/cve_check"
CVE_CHECK_SUMMARY_DIR ?= "${LOG_DIR}/cve"
CVE_CHECK_SUMMARY_FILE_NAME ?= "cve-summary"
CVE_CHECK_SUMMARY_FILE ?= "${CVE_CHECK_SUMMARY_DIR}/${CVE_CHECK_SUMMARY_FILE_NAME}"
CVE_CHECK_SUMMARY_FILE_NAME_JSON = "cve-summary.json"
CVE_CHECK_SUMMARY_INDEX_PATH = "${CVE_CHECK_SUMMARY_DIR}/cve-summary-index.txt"
CVE_CHECK_LOG_JSON ?= "${T}/cve.json"
CVE_CHECK_DIR ??= "${DEPLOY_DIR}/cve"
CVE_CHECK_RECIPE_FILE ?= "${CVE_CHECK_DIR}/${PN}"
CVE_CHECK_RECIPE_FILE_JSON ?= "${CVE_CHECK_DIR}/${PN}_cve.json"
CVE_CHECK_MANIFEST ?= "${DEPLOY_DIR_IMAGE}/${IMAGE_NAME}${IMAGE_NAME_SUFFIX}.cve"
CVE_CHECK_MANIFEST_JSON ?= "${DEPLOY_DIR_IMAGE}/${IMAGE_NAME}${IMAGE_NAME_SUFFIX}.json"
CVE_CHECK_COPY_FILES ??= "1"
CVE_CHECK_CREATE_MANIFEST ??= "1"
CVE_CHECK_REPORT_PATCHED ??= "1"
# Provide text output
CVE_CHECK_FORMAT_TEXT ??= "1"
# Provide JSON output - disabled by default for backward compatibility
CVE_CHECK_FORMAT_JSON ??= "0"
# Whitelist for packages (PN)
CVE_CHECK_PN_WHITELIST ?= ""
# Whitelist for CVE. If a CVE is found, then it is considered patched.
# The value is a string containing space separated CVE values:
#
# CVE_CHECK_WHITELIST = 'CVE-2014-2524 CVE-2018-1234'
#
CVE_CHECK_WHITELIST ?= ""
# Layers to be excluded
CVE_CHECK_LAYER_EXCLUDELIST ??= ""
# Layers to be included
CVE_CHECK_LAYER_INCLUDELIST ??= ""
# set to "alphabetical" for version using single alphabetical character as increment release
CVE_VERSION_SUFFIX ??= ""
def update_symlinks(target_path, link_path):
if link_path != target_path and os.path.exists(target_path):
if os.path.exists(os.path.realpath(link_path)):
os.remove(link_path)
os.symlink(os.path.basename(target_path), link_path)
def generate_json_report(d, out_path, link_path):
if os.path.exists(d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH")):
import json
from oe.cve_check import cve_check_merge_jsons
bb.note("Generating JSON CVE summary")
index_file = d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH")
summary = {"version":"1", "package": []}
with open(index_file) as f:
filename = f.readline()
while filename:
with open(filename.rstrip()) as j:
data = json.load(j)
cve_check_merge_jsons(summary, data)
filename = f.readline()
with open(out_path, "w") as f:
json.dump(summary, f, indent=2)
update_symlinks(out_path, link_path)
python cve_save_summary_handler () {
import shutil
import datetime
cve_tmp_file = d.getVar("CVE_CHECK_TMP_FILE")
cve_summary_name = d.getVar("CVE_CHECK_SUMMARY_FILE_NAME")
cvelogpath = d.getVar("CVE_CHECK_SUMMARY_DIR")
bb.utils.mkdirhier(cvelogpath)
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
cve_summary_file = os.path.join(cvelogpath, "%s-%s.txt" % (cve_summary_name, timestamp))
if os.path.exists(cve_tmp_file):
shutil.copyfile(cve_tmp_file, cve_summary_file)
cvefile_link = os.path.join(cvelogpath, cve_summary_name)
update_symlinks(cve_summary_file, cvefile_link)
bb.plain("Complete CVE report summary created at: %s" % cvefile_link)
if d.getVar("CVE_CHECK_FORMAT_JSON") == "1":
json_summary_link_name = os.path.join(cvelogpath, d.getVar("CVE_CHECK_SUMMARY_FILE_NAME_JSON"))
json_summary_name = os.path.join(cvelogpath, "%s-%s.json" % (cve_summary_name, timestamp))
generate_json_report(d, json_summary_name, json_summary_link_name)
bb.plain("Complete CVE JSON report summary created at: %s" % json_summary_link_name)
}
addhandler cve_save_summary_handler
cve_save_summary_handler[eventmask] = "bb.event.BuildCompleted"
python do_cve_check () {
"""
Check recipe for patched and unpatched CVEs
"""
if os.path.exists(d.getVar("CVE_CHECK_DB_FILE")):
try:
patched_cves = get_patches_cves(d)
except FileNotFoundError:
bb.fatal("Failure in searching patches")
whitelisted, patched, unpatched = check_cves(d, patched_cves)
if patched or unpatched:
cve_data = get_cve_info(d, patched + unpatched)
cve_write_data(d, patched, unpatched, whitelisted, cve_data)
else:
bb.note("No CVE database found, skipping CVE check")
}
addtask cve_check before do_build
do_cve_check[depends] = "cve-update-db-native:do_fetch"
do_cve_check[nostamp] = "1"
python cve_check_cleanup () {
"""
Delete the file used to gather all the CVE information.
"""
bb.utils.remove(e.data.getVar("CVE_CHECK_TMP_FILE"))
bb.utils.remove(e.data.getVar("CVE_CHECK_SUMMARY_INDEX_PATH"))
}
addhandler cve_check_cleanup
cve_check_cleanup[eventmask] = "bb.cooker.CookerExit"
python cve_check_write_rootfs_manifest () {
"""
Create CVE manifest when building an image
"""
import shutil
import json
from oe.rootfs import image_list_installed_packages
from oe.cve_check import cve_check_merge_jsons
if d.getVar("CVE_CHECK_COPY_FILES") == "1":
deploy_file = d.getVar("CVE_CHECK_RECIPE_FILE")
if os.path.exists(deploy_file):
bb.utils.remove(deploy_file)
deploy_file_json = d.getVar("CVE_CHECK_RECIPE_FILE_JSON")
if os.path.exists(deploy_file_json):
bb.utils.remove(deploy_file_json)
# Create a list of relevant recipies
recipies = set()
for pkg in list(image_list_installed_packages(d)):
pkg_info = os.path.join(d.getVar('PKGDATA_DIR'),
'runtime-reverse', pkg)
pkg_data = oe.packagedata.read_pkgdatafile(pkg_info)
recipies.add(pkg_data["PN"])
bb.note("Writing rootfs CVE manifest")
deploy_dir = d.getVar("DEPLOY_DIR_IMAGE")
link_name = d.getVar("IMAGE_LINK_NAME")
json_data = {"version":"1", "package": []}
text_data = ""
enable_json = d.getVar("CVE_CHECK_FORMAT_JSON") == "1"
enable_text = d.getVar("CVE_CHECK_FORMAT_TEXT") == "1"
save_pn = d.getVar("PN")
for pkg in recipies:
# To be able to use the CVE_CHECK_RECIPE_FILE variable we have to evaluate
# it with the different PN names set each time.
d.setVar("PN", pkg)
if enable_text:
pkgfilepath = d.getVar("CVE_CHECK_RECIPE_FILE")
if os.path.exists(pkgfilepath):
with open(pkgfilepath) as pfile:
text_data += pfile.read()
if enable_json:
pkgfilepath = d.getVar("CVE_CHECK_RECIPE_FILE_JSON")
if os.path.exists(pkgfilepath):
with open(pkgfilepath) as j:
data = json.load(j)
cve_check_merge_jsons(json_data, data)
d.setVar("PN", save_pn)
if enable_text:
link_path = os.path.join(deploy_dir, "%s.cve" % link_name)
manifest_name = d.getVar("CVE_CHECK_MANIFEST")
with open(manifest_name, "w") as f:
f.write(text_data)
update_symlinks(manifest_name, link_path)
bb.plain("Image CVE report stored in: %s" % manifest_name)
if enable_json:
link_path = os.path.join(deploy_dir, "%s.json" % link_name)
manifest_name = d.getVar("CVE_CHECK_MANIFEST_JSON")
with open(manifest_name, "w") as f:
json.dump(json_data, f, indent=2)
update_symlinks(manifest_name, link_path)
bb.plain("Image CVE JSON report stored in: %s" % manifest_name)
}
ROOTFS_POSTPROCESS_COMMAND_prepend = "${@'cve_check_write_rootfs_manifest; ' if d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}"
do_rootfs[recrdeptask] += "${@'do_cve_check' if d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}"
do_populate_sdk[recrdeptask] += "${@'do_cve_check' if d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}"
def get_patches_cves(d):
"""
Get patches that solve CVEs using the "CVE: " tag.
"""
import re
pn = d.getVar("PN")
cve_match = re.compile("CVE:( CVE\-\d{4}\-\d+)+")
# Matches the last "CVE-YYYY-ID" in the file name, also if written
# in lowercase. Possible to have multiple CVE IDs in a single
# file name, but only the last one will be detected from the file name.
# However, patch files contents addressing multiple CVE IDs are supported
# (cve_match regular expression)
cve_file_name_match = re.compile(".*([Cc][Vv][Ee]\-\d{4}\-\d+)")
patched_cves = set()
bb.debug(2, "Looking for patches that solves CVEs for %s" % pn)
for url in src_patches(d):
patch_file = bb.fetch.decodeurl(url)[2]
if not os.path.isfile(patch_file):
bb.error("File Not found: %s" % patch_file)
raise FileNotFoundError
# Check patch file name for CVE ID
fname_match = cve_file_name_match.search(patch_file)
if fname_match:
cve = fname_match.group(1).upper()
patched_cves.add(cve)
bb.debug(2, "Found CVE %s from patch file name %s" % (cve, patch_file))
with open(patch_file, "r", encoding="utf-8") as f:
try:
patch_text = f.read()
except UnicodeDecodeError:
bb.debug(1, "Failed to read patch %s using UTF-8 encoding"
" trying with iso8859-1" % patch_file)
f.close()
with open(patch_file, "r", encoding="iso8859-1") as f:
patch_text = f.read()
# Search for one or more "CVE: " lines
text_match = False
for match in cve_match.finditer(patch_text):
# Get only the CVEs without the "CVE: " tag
cves = patch_text[match.start()+5:match.end()]
for cve in cves.split():
bb.debug(2, "Patch %s solves %s" % (patch_file, cve))
patched_cves.add(cve)
text_match = True
if not fname_match and not text_match:
bb.debug(2, "Patch %s doesn't solve CVEs" % patch_file)
return patched_cves
def check_cves(d, patched_cves):
"""
Connect to the NVD database and find unpatched cves.
"""
from oe.cve_check import Version
pn = d.getVar("PN")
real_pv = d.getVar("PV")
suffix = d.getVar("CVE_VERSION_SUFFIX")
cves_unpatched = []
# CVE_PRODUCT can contain more than one product (eg. curl/libcurl)
products = d.getVar("CVE_PRODUCT").split()
# If this has been unset then we're not scanning for CVEs here (for example, image recipes)
if not products:
return ([], [], [])
pv = d.getVar("CVE_VERSION").split("+git")[0]
# If the recipe has been whitelisted we return empty lists
if pn in d.getVar("CVE_CHECK_PN_WHITELIST").split():
bb.note("Recipe has been whitelisted, skipping check")
return ([], [], [])
cve_whitelist = d.getVar("CVE_CHECK_WHITELIST").split()
import sqlite3
db_file = d.expand("file:${CVE_CHECK_DB_FILE}?mode=ro")
conn = sqlite3.connect(db_file, uri=True)
# For each of the known product names (e.g. curl has CPEs using curl and libcurl)...
for product in products:
if ":" in product:
vendor, product = product.split(":", 1)
else:
vendor = "%"
# Find all relevant CVE IDs.
for cverow in conn.execute("SELECT DISTINCT ID FROM PRODUCTS WHERE PRODUCT IS ? AND VENDOR LIKE ?", (product, vendor)):
cve = cverow[0]
if cve in cve_whitelist:
bb.note("%s-%s has been whitelisted for %s" % (product, pv, cve))
# TODO: this should be in the report as 'whitelisted'
patched_cves.add(cve)
continue
elif cve in patched_cves:
bb.note("%s has been patched" % (cve))
continue
vulnerable = False
for row in conn.execute("SELECT * FROM PRODUCTS WHERE ID IS ? AND PRODUCT IS ? AND VENDOR LIKE ?", (cve, product, vendor)):
(_, _, _, version_start, operator_start, version_end, operator_end) = row
#bb.debug(2, "Evaluating row " + str(row))
if (operator_start == '=' and pv == version_start) or version_start == '-':
vulnerable = True
else:
if operator_start:
try:
vulnerable_start = (operator_start == '>=' and Version(pv,suffix) >= Version(version_start,suffix))
vulnerable_start |= (operator_start == '>' and Version(pv,suffix) > Version(version_start,suffix))
except:
bb.warn("%s: Failed to compare %s %s %s for %s" %
(product, pv, operator_start, version_start, cve))
vulnerable_start = False
else:
vulnerable_start = False
if operator_end:
try:
vulnerable_end = (operator_end == '<=' and Version(pv,suffix) <= Version(version_end,suffix) )
vulnerable_end |= (operator_end == '<' and Version(pv,suffix) < Version(version_end,suffix) )
except:
bb.warn("%s: Failed to compare %s %s %s for %s" %
(product, pv, operator_end, version_end, cve))
vulnerable_end = False
else:
vulnerable_end = False
if operator_start and operator_end:
vulnerable = vulnerable_start and vulnerable_end
else:
vulnerable = vulnerable_start or vulnerable_end
if vulnerable:
bb.note("%s-%s is vulnerable to %s" % (pn, real_pv, cve))
cves_unpatched.append(cve)
break
if not vulnerable:
bb.note("%s-%s is not vulnerable to %s" % (pn, real_pv, cve))
# TODO: not patched but not vulnerable
patched_cves.add(cve)
conn.close()
return (list(cve_whitelist), list(patched_cves), cves_unpatched)
def get_cve_info(d, cves):
"""
Get CVE information from the database.
"""
import sqlite3
cve_data = {}
db_file = d.expand("file:${CVE_CHECK_DB_FILE}?mode=ro")
conn = sqlite3.connect(db_file, uri=True)
for cve in cves:
for row in conn.execute("SELECT * FROM NVD WHERE ID IS ?", (cve,)):
cve_data[row[0]] = {}
cve_data[row[0]]["summary"] = row[1]
cve_data[row[0]]["scorev2"] = row[2]
cve_data[row[0]]["scorev3"] = row[3]
cve_data[row[0]]["modified"] = row[4]
cve_data[row[0]]["vector"] = row[5]
conn.close()
return cve_data
def cve_write_data_text(d, patched, unpatched, whitelisted, cve_data):
"""
Write CVE information in WORKDIR; and to CVE_CHECK_DIR, and
CVE manifest if enabled.
"""
cve_file = d.getVar("CVE_CHECK_LOG")
fdir_name = d.getVar("FILE_DIRNAME")
layer = fdir_name.split("/")[-3]
include_layers = d.getVar("CVE_CHECK_LAYER_INCLUDELIST").split()
exclude_layers = d.getVar("CVE_CHECK_LAYER_EXCLUDELIST").split()
if exclude_layers and layer in exclude_layers:
return
if include_layers and layer not in include_layers:
return
nvd_link = "https://nvd.nist.gov/vuln/detail/"
write_string = ""
unpatched_cves = []
bb.utils.mkdirhier(os.path.dirname(cve_file))
for cve in sorted(cve_data):
is_patched = cve in patched
if is_patched and (d.getVar("CVE_CHECK_REPORT_PATCHED") != "1"):
continue
write_string += "LAYER: %s\n" % layer
write_string += "PACKAGE NAME: %s\n" % d.getVar("PN")
write_string += "PACKAGE VERSION: %s%s\n" % (d.getVar("EXTENDPE"), d.getVar("PV"))
write_string += "CVE: %s\n" % cve
if cve in whitelisted:
write_string += "CVE STATUS: Whitelisted\n"
elif is_patched:
write_string += "CVE STATUS: Patched\n"
else:
unpatched_cves.append(cve)
write_string += "CVE STATUS: Unpatched\n"
write_string += "CVE SUMMARY: %s\n" % cve_data[cve]["summary"]
write_string += "CVSS v2 BASE SCORE: %s\n" % cve_data[cve]["scorev2"]
write_string += "CVSS v3 BASE SCORE: %s\n" % cve_data[cve]["scorev3"]
write_string += "VECTOR: %s\n" % cve_data[cve]["vector"]
write_string += "MORE INFORMATION: %s%s\n\n" % (nvd_link, cve)
if unpatched_cves:
bb.warn("Found unpatched CVE (%s), for more information check %s" % (" ".join(unpatched_cves),cve_file))
if write_string:
with open(cve_file, "w") as f:
bb.note("Writing file %s with CVE information" % cve_file)
f.write(write_string)
if d.getVar("CVE_CHECK_COPY_FILES") == "1":
deploy_file = d.getVar("CVE_CHECK_RECIPE_FILE")
bb.utils.mkdirhier(os.path.dirname(deploy_file))
with open(deploy_file, "w") as f:
f.write(write_string)
if d.getVar("CVE_CHECK_CREATE_MANIFEST") == "1":
cvelogpath = d.getVar("CVE_CHECK_SUMMARY_DIR")
bb.utils.mkdirhier(cvelogpath)
with open(d.getVar("CVE_CHECK_TMP_FILE"), "a") as f:
f.write("%s" % write_string)
def cve_check_write_json_output(d, output, direct_file, deploy_file, manifest_file):
"""
Write CVE information in the JSON format: to WORKDIR; and to
CVE_CHECK_DIR, if CVE manifest if enabled, write fragment
files that will be assembled at the end in cve_check_write_rootfs_manifest.
"""
import json
write_string = json.dumps(output, indent=2)
with open(direct_file, "w") as f:
bb.note("Writing file %s with CVE information" % direct_file)
f.write(write_string)
if d.getVar("CVE_CHECK_COPY_FILES") == "1":
bb.utils.mkdirhier(os.path.dirname(deploy_file))
with open(deploy_file, "w") as f:
f.write(write_string)
if d.getVar("CVE_CHECK_CREATE_MANIFEST") == "1":
cvelogpath = d.getVar("CVE_CHECK_SUMMARY_DIR")
index_path = d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH")
bb.utils.mkdirhier(cvelogpath)
fragment_file = os.path.basename(deploy_file)
fragment_path = os.path.join(cvelogpath, fragment_file)
with open(fragment_path, "w") as f:
f.write(write_string)
with open(index_path, "a+") as f:
f.write("%s\n" % fragment_path)
def cve_write_data_json(d, patched, unpatched, ignored, cve_data):
"""
Prepare CVE data for the JSON format, then write it.
"""
output = {"version":"1", "package": []}
nvd_link = "https://nvd.nist.gov/vuln/detail/"
fdir_name = d.getVar("FILE_DIRNAME")
layer = fdir_name.split("/")[-3]
include_layers = d.getVar("CVE_CHECK_LAYER_INCLUDELIST").split()
exclude_layers = d.getVar("CVE_CHECK_LAYER_EXCLUDELIST").split()
if exclude_layers and layer in exclude_layers:
return
if include_layers and layer not in include_layers:
return
unpatched_cves = []
package_version = "%s%s" % (d.getVar("EXTENDPE"), d.getVar("PV"))
package_data = {
"name" : d.getVar("PN"),
"layer" : layer,
"version" : package_version
}
cve_list = []
for cve in sorted(cve_data):
is_patched = cve in patched
status = "Unpatched"
if is_patched and (d.getVar("CVE_CHECK_REPORT_PATCHED") != "1"):
continue
if cve in ignored:
status = "Ignored"
elif is_patched:
status = "Patched"
else:
# default value of status is Unpatched
unpatched_cves.append(cve)
issue_link = "%s%s" % (nvd_link, cve)
cve_item = {
"id" : cve,
"summary" : cve_data[cve]["summary"],
"scorev2" : cve_data[cve]["scorev2"],
"scorev3" : cve_data[cve]["scorev3"],
"vector" : cve_data[cve]["vector"],
"status" : status,
"link": issue_link
}
cve_list.append(cve_item)
package_data["issue"] = cve_list
output["package"].append(package_data)
direct_file = d.getVar("CVE_CHECK_LOG_JSON")
deploy_file = d.getVar("CVE_CHECK_RECIPE_FILE_JSON")
manifest_file = d.getVar("CVE_CHECK_SUMMARY_FILE_NAME_JSON")
cve_check_write_json_output(d, output, direct_file, deploy_file, manifest_file)
def cve_write_data(d, patched, unpatched, ignored, cve_data):
"""
Write CVE data in each enabled format.
"""
if d.getVar("CVE_CHECK_FORMAT_TEXT") == "1":
cve_write_data_text(d, patched, unpatched, ignored, cve_data)
if d.getVar("CVE_CHECK_FORMAT_JSON") == "1":
cve_write_data_json(d, patched, unpatched, ignored, cve_data)