Files
poky/meta/recipes-core/systemd/systemd-systemctl/systemctl
Yuta Hayama d0a5ec84b4 systemd-systemctl: fix errors in instance name expansion
If the instance name indicated by %i begins with a number, the meaning of the
replacement string "\\1{}".format(instance) is ambiguous.

To indicate group number 1 regardless of the instance name, use "\g<1>".

(From OE-Core rev: d18b939fb08b37380ce95934da38e6522392621c)

Signed-off-by: Yuta Hayama <hayama@lineo.co.jp>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
2023-07-10 11:36:34 +01:00

12 KiB
Executable File

#!/usr/bin/env python3 """systemctl: subset of systemctl used for image construction

Mask/preset systemd units """

import argparse import fnmatch import os import re import sys

from collections import namedtuple from itertools import chain from pathlib import Path

version = 1.0

ROOT = Path("/") SYSCONFDIR = Path("etc") BASE_LIBDIR = Path("lib") LIBDIR = Path("usr", "lib")

locations = list()

class SystemdFile(): """Class representing a single systemd configuration file"""

_clearable_keys = ['WantedBy']

def __init__(self, root, path, instance_unit_name):
    self.sections = dict()
    self._parse(root, path)
    dirname = os.path.basename(path.name) + ".d"
    for location in locations:
        files = (root / location / "system" / dirname).glob("*.conf")
        if instance_unit_name:
            inst_dirname = instance_unit_name + ".d"
            files = chain(files, (root / location / "system" / inst_dirname).glob("*.conf"))
        for path2 in sorted(files):
            self._parse(root, path2)

def _parse(self, root, path):
    """Parse a systemd syntax configuration file

    Args:
        path: A pathlib.Path object pointing to the file

    """
    skip_re = re.compile(r"^\s*([#;]|$)")
    section_re = re.compile(r"^\s*\[(?P<section>.*)\]")
    kv_re = re.compile(r"^\s*(?P<key>[^\s]+)\s*=\s*(?P<value>.*)")
    section = None

    if path.is_symlink():
        try:
            path.resolve()
        except FileNotFoundError:
            # broken symlink, try relative to root
            path = root / Path(os.readlink(str(path))).relative_to(ROOT)

    with path.open() as f:
        for line in f:
            if skip_re.match(line):
                continue

            line = line.strip()
            m = section_re.match(line)
            if m:
                if m.group('section') not in self.sections:
                    section = dict()
                    self.sections[m.group('section')] = section
                else:
                    section = self.sections[m.group('section')]
                continue

            while line.endswith("\\"):
                line += f.readline().rstrip("\n")

            m = kv_re.match(line)
            k = m.group('key')
            v = m.group('value')
            if k not in section:
                section[k] = list()

            # If we come across a "key=" line for a "clearable key", then
            # forget all preceding assignments. This works because we are
            # processing files in correct parse order.
            if k in self._clearable_keys and not v:
                del section[k]
                continue

            section[k].extend(v.split())

def get(self, section, prop):
    """Get a property from section

    Args:
        section: Section to retrieve property from
        prop: Property to retrieve

    Returns:
        List representing all properties of type prop in section.

    Raises:
        KeyError: if ``section`` or ``prop`` not found
    """
    return self.sections[section][prop]

class Presets(): """Class representing all systemd presets""" def init(self, scope, root): self.directives = list() self._collect_presets(scope, root)

def _parse_presets(self, presets):
    """Parse presets out of a set of preset files"""
    skip_re = re.compile(r"^\s*([#;]|$)")
    directive_re = re.compile(r"^\s*(?P<action>enable|disable)\s+(?P<unit_name>(.+))")

    Directive = namedtuple("Directive", "action unit_name")
    for preset in presets:
        with preset.open() as f:
            for line in f:
                m = directive_re.match(line)
                if m:
                    directive = Directive(action=m.group('action'),
                                          unit_name=m.group('unit_name'))
                    self.directives.append(directive)
                elif skip_re.match(line):
                    pass
                else:
                    sys.exit("Unparsed preset line in {}".format(preset))

def _collect_presets(self, scope, root):
    """Collect list of preset files"""
    presets = dict()
    for location in locations:
        paths = (root / location / scope).glob("*.preset")
        for path in paths:
            # earlier names override later ones
            if path.name not in presets:
                presets[path.name] = path

    self._parse_presets([v for k, v in sorted(presets.items())])

def state(self, unit_name):
    """Return state of preset for unit_name

    Args:
        presets: set of presets
        unit_name: name of the unit

    Returns:
        None: no matching preset
        `enable`: unit_name is enabled
        `disable`: unit_name is disabled
    """
    for directive in self.directives:
        if fnmatch.fnmatch(unit_name, directive.unit_name):
            return directive.action

    return None

def add_link(path, target): try: path.parent.mkdir(parents=True) except FileExistsError: pass if not path.is_symlink(): print("ln -s {} {}".format(target, path)) path.symlink_to(target)

class SystemdUnitNotFoundError(Exception): def init(self, path, unit): self.path = path self.unit = unit

class SystemdUnit(): def init(self, root, unit): self.root = root self.unit = unit self.config = None

def _path_for_unit(self, unit):
    for location in locations:
        path = self.root / location / "system" / unit
        if path.exists() or path.is_symlink():
            return path

    raise SystemdUnitNotFoundError(self.root, unit)

def _process_deps(self, config, service, location, prop, dirstem, instance):
    systemdir = self.root / SYSCONFDIR / "systemd" / "system"

    target = ROOT / location.relative_to(self.root)
    try:
        for dependent in config.get('Install', prop):
            # expand any %i to instance (ignoring escape sequence %%)
            dependent = re.sub("([^%](%%)*)%i", "\\g<1>{}".format(instance), dependent)
            wants = systemdir / "{}.{}".format(dependent, dirstem) / service
            add_link(wants, target)

    except KeyError:
        pass

def enable(self, caller_unit=None):
    # if we're enabling an instance, first extract the actual instance
    # then figure out what the template unit is
    template = re.match(r"[^@]+@(?P<instance>[^\.]*)\.", self.unit)
    instance_unit_name = None
    if template:
        instance = template.group('instance')
        if instance != "":
            instance_unit_name = self.unit
        unit = re.sub(r"@[^\.]*\.", "@.", self.unit, 1)
    else:
        instance = None
        unit = self.unit

    path = self._path_for_unit(unit)

    if path.is_symlink():
        # ignore aliases
        return

    config = SystemdFile(self.root, path, instance_unit_name)
    if instance == "":
        try:
            default_instance = config.get('Install', 'DefaultInstance')[0]
        except KeyError:
            # no default instance, so nothing to enable
            return

        service = self.unit.replace("@.",
                                    "@{}.".format(default_instance))
    else:
        service = self.unit

    self._process_deps(config, service, path, 'WantedBy', 'wants', instance)
    self._process_deps(config, service, path, 'RequiredBy', 'requires', instance)

    try:
        for also in config.get('Install', 'Also'):
            try:
                if caller_unit != also:
                    SystemdUnit(self.root, also).enable(unit)
            except SystemdUnitNotFoundError as e:
                sys.exit("Error: Systemctl also enable issue with  %s (%s)" % (service, e.unit))

    except KeyError:
        pass

    systemdir = self.root / SYSCONFDIR / "systemd" / "system"
    target = ROOT / path.relative_to(self.root)
    try:
        for dest in config.get('Install', 'Alias'):
            alias = systemdir / dest
            add_link(alias, target)

    except KeyError:
        pass

def mask(self):
    systemdir = self.root / SYSCONFDIR / "systemd" / "system"
    add_link(systemdir / self.unit, "/dev/null")

def collect_services(root): """Collect list of service files""" services = set() for location in locations: paths = (root / location / "system").glob("*") for path in paths: if path.is_dir(): continue services.add(path.name)

return services

def preset_all(root): presets = Presets('system-preset', root) services = collect_services(root)

for service in services:
    state = presets.state(service)

    if state == "enable" or state is None:
        try:
            SystemdUnit(root, service).enable()
        except SystemdUnitNotFoundError:
            sys.exit("Error: Systemctl preset_all issue in %s" % service)

# If we populate the systemd links we also create /etc/machine-id, which
# allows systemd to boot with the filesystem read-only before generating
# a real value and then committing it back.
#
# For the stateless configuration, where /etc is generated at runtime
# (for example on a tmpfs), this script shouldn't run at all and we
# allow systemd to completely populate /etc.
(root / SYSCONFDIR / "machine-id").touch()

def main(): if sys.version_info < (3, 4, 0): sys.exit("Python 3.4 or greater is required")

parser = argparse.ArgumentParser()
parser.add_argument('command', nargs='?', choices=['enable', 'mask',
                                                 'preset-all'])
parser.add_argument('service', nargs=argparse.REMAINDER)
parser.add_argument('--root')
parser.add_argument('--preset-mode',
                    choices=['full', 'enable-only', 'disable-only'],
                    default='full')

args = parser.parse_args()

root = Path(args.root) if args.root else ROOT

locations.append(SYSCONFDIR / "systemd")
# Handle the usrmerge case by ignoring /lib when it's a symlink
if not (root / BASE_LIBDIR).is_symlink():
    locations.append(BASE_LIBDIR / "systemd")
locations.append(LIBDIR / "systemd")

command = args.command
if not command:
    parser.print_help()
    return 0

if command == "mask":
    for service in args.service:
        try:
            SystemdUnit(root, service).mask()
        except SystemdUnitNotFoundError as e:
            sys.exit("Error: Systemctl main mask issue in %s (%s)" % (service, e.unit))
elif command == "enable":
    for service in args.service:
        try:
            SystemdUnit(root, service).enable()
        except SystemdUnitNotFoundError as e:
            sys.exit("Error: Systemctl main enable issue in %s (%s)" % (service, e.unit))
elif command == "preset-all":
    if len(args.service) != 0:
        sys.exit("Too many arguments.")
    if args.preset_mode != "enable-only":
        sys.exit("Only enable-only is supported as preset-mode.")
    preset_all(root)
else:
    raise RuntimeError()

if name == 'main': main()