diff --git a/bitbake/lib/bb/fetch2/__init__.py b/bitbake/lib/bb/fetch2/__init__.py index 224408de0f..2f54cb86e6 100644 --- a/bitbake/lib/bb/fetch2/__init__.py +++ b/bitbake/lib/bb/fetch2/__init__.py @@ -23,6 +23,7 @@ import collections import subprocess import pickle import errno +import shlex import bb.persist_data, bb.utils import bb.checksum import bb.process @@ -1567,16 +1568,19 @@ class FetchMethod(object): elif file.endswith('.deb') or file.endswith('.ipk'): output = subprocess.check_output(['ar', '-t', file], preexec_fn=subprocess_setup) datafile = None + valid_datafiles = ('data.tar', 'data.tar.gz', 'data.tar.xz', + 'data.tar.zst', 'data.tar.bz2', 'data.tar.lzma') if output: for line in output.decode().splitlines(): - if line.startswith('data.tar.'): + if line in valid_datafiles: datafile = line break else: - raise UnpackError("Unable to unpack deb/ipk package - does not contain data.tar.* file", urldata.url) + raise UnpackError("Unable to unpack deb/ipk package - does not contain supported data.tar* file", urldata.url) else: raise UnpackError("Unable to unpack deb/ipk package - could not list contents", urldata.url) - cmd = 'ar x %s %s && %s -p -f %s && rm %s' % (file, datafile, tar_cmd, datafile, datafile) + quoted_datafile = shlex.quote(datafile) + cmd = 'ar x %s %s && %s -p -f %s && rm %s' % (shlex.quote(file), quoted_datafile, tar_cmd, quoted_datafile, quoted_datafile) # If 'subdir' param exists, create a dir and use it as destination for unpack cmd if 'subdir' in urldata.parm: diff --git a/bitbake/lib/bb/tests/fetch.py b/bitbake/lib/bb/tests/fetch.py index 3775ab0f33..5735cf8f45 100644 --- a/bitbake/lib/bb/tests/fetch.py +++ b/bitbake/lib/bb/tests/fetch.py @@ -13,6 +13,7 @@ import tempfile import collections import os import signal +import subprocess import tarfile from bb.fetch2 import URI from bb.fetch2 import FetchMethod @@ -731,6 +732,34 @@ class FetcherLocalTest(FetcherTest): bb.process.run('tar cjf archive.tar.bz2 -C dir .', cwd=self.localsrcdir) self.d.setVar("FILESPATH", self.localsrcdir) + def make_ar_package(self, package_name, data_member="data.tar"): + if not shutil.which("ar"): + self.skipTest("ar not installed") + + workdir = tempfile.mkdtemp(dir=self.tempdir) + payload = os.path.join(workdir, "payload") + with open(payload, "w") as f: + f.write("payload\n") + + data_path = os.path.join(workdir, data_member) + mode = "w:gz" if data_member.endswith(".gz") else "w" + with tarfile.open(data_path, mode) as archive: + archive.add(payload, arcname="payload") + + with open(os.path.join(workdir, "debian-binary"), "w") as f: + f.write("2.0\n") + + control = os.path.join(workdir, "control") + with open(control, "w") as f: + f.write("Package: fetch-test\nVersion: 1\nArchitecture: all\n") + with tarfile.open(os.path.join(workdir, "control.tar"), "w") as archive: + archive.add(control, arcname="control") + + package_path = os.path.join(self.localsrcdir, package_name) + subprocess.check_call(["ar", "r", package_path, "debian-binary", "control.tar", data_member], + cwd=workdir, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return package_name + def fetchUnpack(self, uris): fetcher = bb.fetch.Fetch(uris, self.d) fetcher.download() @@ -800,6 +829,30 @@ class FetcherLocalTest(FetcherTest): tree = self.fetchUnpack(['file://archive.tar.bz2;subdir=bar;striplevel=1']) self.assertEqual(tree, ['bar/c', 'bar/d', 'bar/subdir/e']) + def test_local_deb_quoted_filename(self): + package = self.make_ar_package("archive$(id).deb") + tree = self.fetchUnpack(['file://%s' % package]) + self.assertEqual(tree, ['payload']) + + def test_local_ipk_gz_data_member(self): + package = self.make_ar_package("archive.ipk", data_member="data.tar.gz") + tree = self.fetchUnpack(['file://%s' % package]) + self.assertEqual(tree, ['payload']) + + def test_local_deb_rejects_unknown_data_member_suffix(self): + package = self.make_ar_package("archive.deb", data_member="data.tar.foo") + with self.assertRaises(bb.fetch2.UnpackError) as context: + self.fetchUnpack(['file://%s' % package]) + + self.assertIn("does not contain supported data.tar* file", str(context.exception)) + + def test_local_deb_rejects_unsafe_data_member(self): + package = self.make_ar_package("archive.deb", data_member="data.tar.xz;id") + with self.assertRaises(bb.fetch2.UnpackError) as context: + self.fetchUnpack(['file://%s' % package]) + + self.assertIn("does not contain supported data.tar* file", str(context.exception)) + def dummyGitTest(self, suffix): # Create dummy local Git repo src_dir = tempfile.mkdtemp(dir=self.tempdir,