push sheeet
Some checks failed
Periodic Merges (6h) / master → staging-nixos (push) Failing after 12m50s
Periodic Merges (6h) / master → staging-next (push) Failing after 12m54s
Periodic Merges (24h) / merge-base(master,staging) → haskell-updates (push) Failing after 11m54s
Periodic Merges (6h) / staging-next → staging (push) Failing after 12m13s
Periodic Merges (24h) / staging-next-25.05 → staging-25.05 (push) Failing after 13m24s
Periodic Merges (24h) / release-25.05 → staging-next-25.05 (push) Failing after 14m28s

This commit is contained in:
Dark Steveneq
2025-10-09 14:15:47 +02:00
commit 646b892680
49168 changed files with 5897842 additions and 0 deletions

View File

@@ -0,0 +1,187 @@
import errno
import os
from enum import IntEnum
from pathlib import Path
class Accessibility(IntEnum):
"""
The level of accessibility we have on a file or directory.
This is needed to assess the attack surface on the file system namespace we
have within a confined service. Higher levels mean more permissions for the
user and thus a bigger attack surface.
"""
NONE = 0
# Directories can be listed or files can be read.
READABLE = 1
# This is for special file systems such as procfs and for stuff such as
# FIFOs or character special files. The reason why this has a lower value
# than WRITABLE is because those files are more restricted on what and how
# they can be written to.
SPECIAL = 2
# Another special case are sticky directories, which do allow write access
# but restrict deletion. This does *not* apply to sticky directories that
# are read-only.
STICKY = 3
# Essentially full permissions, the kind of accessibility we want to avoid
# in most cases.
WRITABLE = 4
def assert_on(self, path: Path) -> None:
"""
Raise an AssertionError if the given 'path' allows for more
accessibility than 'self'.
"""
actual = self.NONE
if path.is_symlink():
actual = self.READABLE
elif path.is_dir():
writable = True
dummy_file = path / 'can_i_write'
try:
dummy_file.touch()
except OSError as e:
if e.errno in [errno.EROFS, errno.EACCES]:
writable = False
else:
raise
else:
dummy_file.unlink()
if writable:
# The reason why we test this *after* we made sure it's
# writable is because we could have a sticky directory where
# the current user doesn't have write access.
if path.stat().st_mode & 0o1000 == 0o1000:
actual = self.STICKY
else:
actual = self.WRITABLE
else:
actual = self.READABLE
elif path.is_file():
try:
with path.open('rb') as fp:
fp.read(1)
actual = self.READABLE
except PermissionError:
pass
writable = True
try:
with path.open('ab') as fp:
fp.write('x')
size = fp.tell()
fp.truncate(size)
except PermissionError:
writable = False
except OSError as e:
if e.errno == errno.ETXTBSY:
writable = os.access(path, os.W_OK)
elif e.errno == errno.EROFS:
writable = False
else:
raise
# Let's always try to fail towards being writable, so if *either*
# access(2) or a real write is successful it's writable. This is to
# make sure we don't accidentally introduce no-ops if we have bugs
# in the more complicated real write code above.
if writable or os.access(path, os.W_OK):
actual = self.WRITABLE
else:
# We need to be very careful when writing to or reading from
# special files (eg. FIFOs), since they can possibly block. So if
# it's not a file, just trust that access(2) won't lie.
if os.access(path, os.R_OK):
actual = self.READABLE
if os.access(path, os.W_OK):
actual = self.SPECIAL
if actual > self:
stat = path.stat()
details = ', '.join([
f'permissions: {stat.st_mode & 0o7777:o}',
f'uid: {stat.st_uid}',
f'group: {stat.st_gid}',
])
raise AssertionError(
f'Expected at most {self!r} but got {actual!r} for path'
f' {path} ({details}).'
)
def is_special_fs(path: Path) -> bool:
"""
Check whether the given path truly is a special file system such as procfs
or sysfs.
"""
try:
if path == Path('/proc'):
return (path / 'version').read_text().startswith('Linux')
elif path == Path('/sys'):
return b'Linux' in (path / 'kernel' / 'notes').read_bytes()
except FileNotFoundError:
pass
return False
def is_empty_dir(path: Path) -> bool:
try:
next(path.iterdir())
return False
except (StopIteration, PermissionError):
return True
def _assert_permissions_in_directory(
directory: Path,
accessibility: Accessibility,
subdirs: dict[Path, Accessibility],
) -> None:
accessibility.assert_on(directory)
for file in directory.iterdir():
if is_special_fs(file):
msg = f'Got unexpected special filesystem at {file}.'
assert subdirs.pop(file) == Accessibility.SPECIAL, msg
elif not file.is_symlink() and file.is_dir():
subdir_access = subdirs.pop(file, accessibility)
if is_empty_dir(file):
# Whenever we got an empty directory, we check the permission
# constraints on the current directory (except if specified
# explicitly in subdirs) because for example if we're non-root
# (the constraints of the current directory are thus
# Accessibility.READABLE), we really have to make sure that
# empty directories are *never* writable.
subdir_access.assert_on(file)
else:
_assert_permissions_in_directory(file, subdir_access, subdirs)
else:
subdirs.pop(file, accessibility).assert_on(file)
def assert_permissions(subdirs: dict[str, Accessibility]) -> None:
"""
Recursively check whether the file system conforms to the accessibility
specification we specified via 'subdirs'.
"""
root = Path('/')
absolute_subdirs = {root / p: a for p, a in subdirs.items()}
_assert_permissions_in_directory(
root,
Accessibility.WRITABLE if os.getuid() == 0 else Accessibility.READABLE,
absolute_subdirs,
)
for file in absolute_subdirs.keys():
msg = f'Expected {file} to exist, but it was nowwhere to be found.'
raise AssertionError(msg)

View File

@@ -0,0 +1,346 @@
import ../make-test-python.nix {
name = "systemd-confinement";
nodes.machine =
{ pkgs, lib, ... }:
let
testLib = pkgs.python3Packages.buildPythonPackage {
name = "confinement-testlib";
format = "setuptools";
unpackPhase = ''
cat > setup.py <<EOF
from setuptools import setup
setup(name='confinement-testlib', py_modules=["checkperms"])
EOF
cp ${./checkperms.py} checkperms.py
'';
};
mkTest =
name: testScript:
pkgs.writers.writePython3 "${name}.py"
{
libraries = [
pkgs.python3Packages.pytest
testLib
];
}
''
# This runs our test script by using pytest's assertion rewriting, so
# that whenever we use "assert <something>", the actual values are
# printed rather than getting a generic AssertionError or the need to
# pass an explicit assertion error message.
import ast
from pathlib import Path
from _pytest.assertion.rewrite import rewrite_asserts
script = Path('${pkgs.writeText "${name}-main.py" ''
import errno, os, pytest, signal
from subprocess import run
from checkperms import Accessibility, assert_permissions
${testScript}
''}') # noqa
filename = str(script)
source = script.read_bytes()
tree = ast.parse(source, filename=filename)
rewrite_asserts(tree, source, filename)
exec(compile(tree, filename, 'exec', dont_inherit=True))
'';
mkTestStep =
num:
{
description,
testScript,
config ? { },
serviceName ? "test${toString num}",
rawUnit ? null,
}:
{
systemd.packages = lib.optional (rawUnit != null) (
pkgs.writeTextFile {
name = serviceName;
destination = "/etc/systemd/system/${serviceName}.service";
text = rawUnit;
}
);
systemd.services.${serviceName} = {
inherit description;
requiredBy = [ "multi-user.target" ];
confinement = (config.confinement or { }) // {
enable = true;
};
serviceConfig = (config.serviceConfig or { }) // {
ExecStart = mkTest serviceName testScript;
Type = "oneshot";
};
}
// removeAttrs config [
"confinement"
"serviceConfig"
];
};
parametrisedTests =
lib.concatMap
(
{ user, privateTmp }:
let
withTmp = if privateTmp then "with PrivateTmp" else "without PrivateTmp";
serviceConfig =
if user == "static-user" then
{
User = "chroot-testuser";
Group = "chroot-testgroup";
}
else if user == "dynamic-user" then
{
DynamicUser = true;
}
else
{ };
in
[
{
description = "${user}, chroot-only confinement ${withTmp}";
config = {
confinement.mode = "chroot-only";
# Only set if privateTmp is true to ensure that the default is false.
serviceConfig =
serviceConfig
// lib.optionalAttrs privateTmp {
PrivateTmp = true;
};
};
testScript =
if user == "root" then
''
assert os.getuid() == 0
assert os.getgid() == 0
assert_permissions({
'bin': Accessibility.READABLE,
'nix': Accessibility.READABLE,
'run': Accessibility.READABLE,
${lib.optionalString privateTmp "'tmp': Accessibility.STICKY,"}
${lib.optionalString privateTmp "'var': Accessibility.READABLE,"}
${lib.optionalString privateTmp "'var/tmp': Accessibility.STICKY,"}
})
''
else
''
assert os.getuid() != 0
assert os.getgid() != 0
assert_permissions({
'bin': Accessibility.READABLE,
'nix': Accessibility.READABLE,
'run': Accessibility.READABLE,
${lib.optionalString privateTmp "'tmp': Accessibility.STICKY,"}
${lib.optionalString privateTmp "'var': Accessibility.READABLE,"}
${lib.optionalString privateTmp "'var/tmp': Accessibility.STICKY,"}
})
'';
}
{
description = "${user}, full APIVFS confinement ${withTmp}";
config = {
# Only set if privateTmp is false to ensure that the default is true.
serviceConfig =
serviceConfig
// lib.optionalAttrs (!privateTmp) {
PrivateTmp = false;
};
};
testScript =
if user == "root" then
''
assert os.getuid() == 0
assert os.getgid() == 0
assert_permissions({
'bin': Accessibility.READABLE,
'nix': Accessibility.READABLE,
${lib.optionalString privateTmp "'tmp': Accessibility.STICKY,"}
'run': Accessibility.WRITABLE,
'proc': Accessibility.SPECIAL,
'sys': Accessibility.SPECIAL,
'dev': Accessibility.WRITABLE,
${lib.optionalString privateTmp "'var': Accessibility.READABLE,"}
${lib.optionalString privateTmp "'var/tmp': Accessibility.STICKY,"}
})
''
else
''
assert os.getuid() != 0
assert os.getgid() != 0
assert_permissions({
'bin': Accessibility.READABLE,
'nix': Accessibility.READABLE,
${lib.optionalString privateTmp "'tmp': Accessibility.STICKY,"}
'run': Accessibility.STICKY,
'proc': Accessibility.SPECIAL,
'sys': Accessibility.SPECIAL,
'dev': Accessibility.SPECIAL,
'dev/shm': Accessibility.STICKY,
'dev/mqueue': Accessibility.STICKY,
${lib.optionalString privateTmp "'var': Accessibility.READABLE,"}
${lib.optionalString privateTmp "'var/tmp': Accessibility.STICKY,"}
})
'';
}
]
)
(
lib.cartesianProduct {
user = [
"root"
"dynamic-user"
"static-user"
];
privateTmp = [
true
false
];
}
);
in
{
imports = lib.imap1 mkTestStep (
parametrisedTests
++ [
{
description = "existence of bind-mounted /etc";
config.serviceConfig.BindReadOnlyPaths = [ "/etc" ];
testScript = ''
assert Path('/etc/passwd').read_text()
'';
}
(
let
symlink = pkgs.runCommand "symlink" {
target = pkgs.writeText "symlink-target" "got me";
} "ln -s \"$target\" \"$out\"";
in
{
description = "check if symlinks are properly bind-mounted";
config.confinement.packages = lib.singleton symlink;
testScript = ''
assert Path('${symlink}').read_text() == 'got me'
'';
}
)
{
description = "check if StateDirectory works";
config.serviceConfig.User = "chroot-testuser";
config.serviceConfig.Group = "chroot-testgroup";
config.serviceConfig.StateDirectory = "testme";
# We restart on purpose here since we want to check whether the state
# directory actually persists.
config.serviceConfig.Restart = "on-failure";
config.serviceConfig.RestartMode = "direct";
testScript = ''
assert not Path('/tmp/canary').exists()
Path('/tmp/canary').touch()
if (foo := Path('/var/lib/testme/foo')).exists():
assert Path('/var/lib/testme/foo').read_text() == 'works'
else:
Path('/var/lib/testme/foo').write_text('works')
print('<4>Exiting with failure to check persistence on restart.')
raise SystemExit(1)
'';
}
{
description = "check if /bin/sh works";
testScript = ''
assert Path('/bin/sh').exists()
result = run(
['/bin/sh', '-c', 'echo -n bar'],
capture_output=True,
check=True,
)
assert result.stdout == b'bar'
'';
}
{
description = "check if suppressing /bin/sh works";
config.confinement.binSh = null;
testScript = ''
assert not Path('/bin/sh').exists()
with pytest.raises(FileNotFoundError):
run(['/bin/sh', '-c', 'echo foo'])
'';
}
{
description = "check if we can set /bin/sh to something different";
config.confinement.binSh = "${pkgs.hello}/bin/hello";
testScript = ''
assert Path('/bin/sh').exists()
result = run(
['/bin/sh', '-g', 'foo'],
capture_output=True,
check=True,
)
assert result.stdout == b'foo\n'
'';
}
{
description = "check if only Exec* dependencies are included";
config.environment.FOOBAR = pkgs.writeText "foobar" "eek";
testScript = ''
with pytest.raises(FileNotFoundError):
Path(os.environ['FOOBAR']).read_text()
'';
}
{
description = "check if fullUnit includes all dependencies";
config.environment.FOOBAR = pkgs.writeText "foobar" "eek";
config.confinement.fullUnit = true;
testScript = ''
assert Path(os.environ['FOOBAR']).read_text() == 'eek'
'';
}
{
description = "check if shipped unit file still works";
config.confinement.mode = "chroot-only";
rawUnit = ''
[Service]
SystemCallFilter=~kill
SystemCallErrorNumber=ELOOP
'';
testScript = ''
with pytest.raises(OSError) as excinfo:
os.kill(os.getpid(), signal.SIGKILL)
assert excinfo.value.errno == errno.ELOOP
'';
}
]
);
config.users.groups.chroot-testgroup = { };
config.users.users.chroot-testuser = {
isSystemUser = true;
description = "Chroot Test User";
group = "chroot-testgroup";
};
};
testScript = ''
machine.wait_for_unit("multi-user.target")
'';
}