Files
poky/meta/recipes-core/systemd/systemd-systemctl/systemctl
Trent Piepho 5cc40d3e64 systemd-systemctl: Fix instance name parsing with escapes or periods
Fixes [YOCTO #16130]

When extracting the instance name from a template instances such as
'example@host.domain.com.service', the systemctl replacement script will
split the instance on the first period, producing an instance argument of
'host' and a template of 'example@.domain.com.service'.  This is incorrect,
as systemd will split on the last period, producing an instance argument of
'host.domain.com' and a template of 'example@.service'.

When constructing the template name, the script will also pass the string
as is to re.sub(), which will try to process any backslash escapes in the
string.  These are legal in systemd unit names and should be preserved.
They also are not valid Python escape sequences.  Use re.escape() to
preserve anything in the unit name that might be considered a regex
exscape.

(From OE-Core rev: 0514c317523330f75937123c45bb0528e4830f61)

Signed-off-by: Trent Piepho <trent.piepho@igorinstitute.com>
Signed-off-by: Yoann Congal <yoann.congal@smile.fr>
Signed-off-by: Paul Barker <paul@pbarker.dev>
2026-03-25 17:34:13 +00:00

12 KiB
Executable File

#!/usr/bin/env python3 """systemctl: subset of systemctl used for image construction

Mask/preset systemd units """

import argparse import fnmatch import os import re import sys

from collections import namedtuple from itertools import chain from pathlib import Path

version = 1.0

ROOT = Path("/") SYSCONFDIR = Path("etc") BASE_LIBDIR = Path("lib") LIBDIR = Path("usr", "lib")

locations = list()

class SystemdFile(): """Class representing a single systemd configuration file"""

_clearable_keys = ['WantedBy']

def __init__(self, root, path, instance_unit_name):
    self.sections = dict()
    self._parse(root, path)
    dirname = os.path.basename(path.name) + ".d"
    for location in locations:
        files = (root / location / "system" / dirname).glob("*.conf")
        if instance_unit_name:
            inst_dirname = instance_unit_name + ".d"
            files = chain(files, (root / location / "system" / inst_dirname).glob("*.conf"))
        for path2 in sorted(files):
            self._parse(root, path2)

def _parse(self, root, path):
    """Parse a systemd syntax configuration file

    Args:
        path: A pathlib.Path object pointing to the file

    """
    skip_re = re.compile(r"^\s*([#;]|$)")
    section_re = re.compile(r"^\s*\[(?P<section>.*)\]")
    kv_re = re.compile(r"^\s*(?P<key>[^\s]+)\s*=\s*(?P<value>.*)")
    section = None

    if path.is_symlink():
        try:
            path.resolve()
        except FileNotFoundError:
            # broken symlink, try relative to root
            path = root / Path(os.readlink(str(path))).relative_to(ROOT)

    with path.open() as f:
        for line in f:
            if skip_re.match(line):
                continue

            line = line.strip()
            m = section_re.match(line)
            if m:
                if m.group('section') not in self.sections:
                    section = dict()
                    self.sections[m.group('section')] = section
                else:
                    section = self.sections[m.group('section')]
                continue

            while line.endswith("\\"):
                line += f.readline().rstrip("\n")

            m = kv_re.match(line)
            k = m.group('key')
            v = m.group('value')
            if k not in section:
                section[k] = list()

            # If we come across a "key=" line for a "clearable key", then
            # forget all preceding assignments. This works because we are
            # processing files in correct parse order.
            if k in self._clearable_keys and not v:
                del section[k]
                continue

            section[k].extend(v.split())

def get(self, section, prop):
    """Get a property from section

    Args:
        section: Section to retrieve property from
        prop: Property to retrieve

    Returns:
        List representing all properties of type prop in section.

    Raises:
        KeyError: if ``section`` or ``prop`` not found
    """
    return self.sections[section][prop]

class Presets(): """Class representing all systemd presets""" def init(self, scope, root): self.directives = list() self._collect_presets(scope, root)

def _parse_presets(self, presets):
    """Parse presets out of a set of preset files"""
    skip_re = re.compile(r"^\s*([#;]|$)")
    directive_re = re.compile(r"^\s*(?P<action>enable|disable)\s+(?P<unit_name>(.+))")

    Directive = namedtuple("Directive", "action unit_name")
    for preset in presets:
        with preset.open() as f:
            for line in f:
                m = directive_re.match(line)
                if m:
                    directive = Directive(action=m.group('action'),
                                          unit_name=m.group('unit_name'))
                    self.directives.append(directive)
                elif skip_re.match(line):
                    pass
                else:
                    sys.exit("Unparsed preset line in {}".format(preset))

def _collect_presets(self, scope, root):
    """Collect list of preset files"""
    presets = dict()
    for location in locations:
        paths = (root / location / scope).glob("*.preset")
        for path in paths:
            # earlier names override later ones
            if path.name not in presets:
                presets[path.name] = path

    self._parse_presets([v for k, v in sorted(presets.items())])

def state(self, unit_name):
    """Return state of preset for unit_name

    Args:
        presets: set of presets
        unit_name: name of the unit

    Returns:
        None: no matching preset
        `enable`: unit_name is enabled
        `disable`: unit_name is disabled
    """
    for directive in self.directives:
        if fnmatch.fnmatch(unit_name, directive.unit_name):
            return directive.action

    return None

def add_link(path, target): try: path.parent.mkdir(parents=True) except FileExistsError: pass if not path.is_symlink(): print("ln -s {} {}".format(target, path)) path.symlink_to(target)

class SystemdUnitNotFoundError(Exception): def init(self, path, unit): self.path = path self.unit = unit

class SystemdUnit(): def init(self, root, unit): self.root = root self.unit = unit self.config = None

def _path_for_unit(self, unit):
    for location in locations:
        path = self.root / location / "system" / unit
        if path.exists() or path.is_symlink():
            return path

    raise SystemdUnitNotFoundError(self.root, unit)

def _process_deps(self, config, service, location, prop, dirstem, instance):
    systemdir = self.root / SYSCONFDIR / "systemd" / "system"

    target = ROOT / location.relative_to(self.root)
    try:
        for dependent in config.get('Install', prop):
            # expand any %i to instance (ignoring escape sequence %%)
            if instance is not None:
                dependent = re.sub("([^%](%%)*)%i", "\\g<1>{}".format(re.escape(instance)), dependent)
            wants = systemdir / "{}.{}".format(dependent, dirstem) / service
            add_link(wants, target)

    except KeyError:
        pass

def enable(self, units_enabled=[]):
    # if we're enabling an instance, first extract the actual instance
    # then figure out what the template unit is
    template = re.match(r"[^@]+@(?P<instance>.*)\.", self.unit)
    instance_unit_name = None
    if template:
        instance = template.group('instance')
        if instance != "":
            instance_unit_name = self.unit
        unit = re.sub(r"@{}\.".format(re.escape(instance)), "@.", self.unit, 1)
    else:
        instance = None
        unit = self.unit

    path = self._path_for_unit(unit)

    if path.is_symlink():
        # ignore aliases
        return

    config = SystemdFile(self.root, path, instance_unit_name)
    if instance == "":
        try:
            default_instance = config.get('Install', 'DefaultInstance')[0]
        except KeyError:
            # no default instance, so nothing to enable
            return

        service = self.unit.replace("@.",
                                    "@{}.".format(default_instance))
    else:
        service = self.unit

    self._process_deps(config, service, path, 'WantedBy', 'wants', instance)
    self._process_deps(config, service, path, 'RequiredBy', 'requires', instance)

    try:
        for also in config.get('Install', 'Also'):
            try:
                 units_enabled.append(unit)
                 if also not in units_enabled:
                    SystemdUnit(self.root, also).enable(units_enabled)
            except SystemdUnitNotFoundError as e:
                sys.exit("Error: Systemctl also enable issue with  %s (%s)" % (service, e.unit))

    except KeyError:
        pass

    systemdir = self.root / SYSCONFDIR / "systemd" / "system"
    target = ROOT / path.relative_to(self.root)
    try:
        for dest in config.get('Install', 'Alias'):
            alias = systemdir / dest
            add_link(alias, target)

    except KeyError:
        pass

def mask(self):
    systemdir = self.root / SYSCONFDIR / "systemd" / "system"
    add_link(systemdir / self.unit, "/dev/null")

def collect_services(root): """Collect list of service files""" services = set() for location in locations: paths = (root / location / "system").glob("*") for path in paths: if path.is_dir(): continue services.add(path.name)

return services

def preset_all(root): presets = Presets('system-preset', root) services = collect_services(root)

for service in services:
    state = presets.state(service)

    if state == "enable" or state is None:
        try:
            SystemdUnit(root, service).enable()
        except SystemdUnitNotFoundError:
            sys.exit("Error: Systemctl preset_all issue in %s" % service)

# If we populate the systemd links we also create /etc/machine-id, which
# allows systemd to boot with the filesystem read-only before generating
# a real value and then committing it back.
#
# For the stateless configuration, where /etc is generated at runtime
# (for example on a tmpfs), this script shouldn't run at all and we
# allow systemd to completely populate /etc.
(root / SYSCONFDIR / "machine-id").touch()

def main(): if sys.version_info < (3, 4, 0): sys.exit("Python 3.4 or greater is required")

parser = argparse.ArgumentParser()
parser.add_argument('command', nargs='?', choices=['enable', 'mask',
                                                 'preset-all'])
parser.add_argument('service', nargs=argparse.REMAINDER)
parser.add_argument('--root')
parser.add_argument('--preset-mode',
                    choices=['full', 'enable-only', 'disable-only'],
                    default='full')

args = parser.parse_args()

root = Path(args.root) if args.root else ROOT

locations.append(SYSCONFDIR / "systemd")
# Handle the usrmerge case by ignoring /lib when it's a symlink
if not (root / BASE_LIBDIR).is_symlink():
    locations.append(BASE_LIBDIR / "systemd")
locations.append(LIBDIR / "systemd")

command = args.command
if not command:
    parser.print_help()
    return 0

if command == "mask":
    for service in args.service:
        try:
            SystemdUnit(root, service).mask()
        except SystemdUnitNotFoundError as e:
            sys.exit("Error: Systemctl main mask issue in %s (%s)" % (service, e.unit))
elif command == "enable":
    for service in args.service:
        try:
            SystemdUnit(root, service).enable()
        except SystemdUnitNotFoundError as e:
            sys.exit("Error: Systemctl main enable issue in %s (%s)" % (service, e.unit))
elif command == "preset-all":
    if len(args.service) != 0:
        sys.exit("Too many arguments.")
    if args.preset_mode != "enable-only":
        sys.exit("Only enable-only is supported as preset-mode.")
    preset_all(root)
else:
    raise RuntimeError()

if name == 'main': main()