Source code for diskimgtool

# Copyright (C) 2023 Leandro Lisboa Penz <lpenz@lpenz.org>
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.


import logging
import os
import re
import subprocess
import tempfile
from contextlib import contextmanager
from time import sleep
from typing import Generator, Optional


def _log() -> logging.Logger:
    if not hasattr(_log, "logger"):
        setattr(_log, "logger", logging.getLogger(os.path.basename("diskimgtool")))
    logger: logging.Logger = getattr(_log, "logger")
    return logger


[docs] def run(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess[bytes]: _log().info(f'+ {" ".join(cmd)}') return subprocess.run(cmd, check=check)
[docs] def run_capture(cmd: list[str], check: bool = True) -> str: _log().info(f'+ {" ".join(cmd)}') return subprocess.check_output(cmd, encoding="ascii")
[docs] @contextmanager def chdir(path: str) -> Generator[None, None, None]: _log().info(f"+ cd {path}") cwd = os.getcwd() os.chdir(path) try: yield finally: _log().info("+ cd -") os.chdir(cwd)
[docs] @contextmanager def loopback_setup(image: str) -> Generator[str, None, None]: data = run_capture(["kpartx", "-a", "-v", image]) for line in data.split("\n"): if line: _log().info(f" {line}") try: m = re.search(r"add map (loop[0-9]+)p.*", data) if not m: raise Exception("regex didnt match") loop = f"/dev/mapper/{m.group(1)}" yield loop finally: run(["kpartx", "-d", image], check=False) run(["sync"])
[docs] @contextmanager def mount( src: str, dst: str, mtype: str = "auto", bind: bool = False, args: Optional[list[str]] = None, ) -> Generator[None, None, None]: if bind: mtype = "none" c = ["mount", "-t", mtype] if bind: c.extend(["-o", "bind"]) if args: c.extend(args) c.extend([src, dst]) run(c) try: yield finally: for i in range(5): r = run(["umount", dst], check=False) if r.returncode == 0: return run(["lsof", dst], check=False) sleep(1) run(["umount", "-l", dst])
[docs] @contextmanager def root_mounts(rootdir: str) -> Generator[None, None, None]: with mount("dev", f"{rootdir}/dev", mtype="devtmpfs"): with mount("devpts", f"{rootdir}/dev/pts", mtype="devpts"): with mount("tmpfs", f"{rootdir}/dev/shm", mtype="tmpfs"): with mount("proc", f"{rootdir}/proc", mtype="proc"): with mount("sysfs", f"{rootdir}/sys", mtype="sysfs"): with mount("tmpfs", f"{rootdir}/run", mtype="tmpfs"): dirs = [f"{rootdir}/run/lock", f"{rootdir}/run/shm"] run(["mkdir", "-p"] + dirs) yield
[docs] def chroot(rootdir: str, cmd: list[str]) -> subprocess.CompletedProcess[bytes]: cmd = ["chroot", rootdir] + cmd return run(cmd)
[docs] @contextmanager def image_fully_mounted(image: str) -> Generator[str, None, None]: with tempfile.TemporaryDirectory(dir=os.getcwd()) as rootdir: with loopback_setup(image) as loop: with mount(f"{loop}p2", rootdir): with mount(f"{loop}p1", f"{rootdir}/boot"): yield rootdir