Skip to content

k4bench.runner.executor

k4bench.runner.executor

Execute a single ddsim run and return a :class:RunResult.

Design principle

This module owns instrumentation: timing, logging, and metrics extraction. It does not own physics configuration. The caller decides which ddsim arguments to pass; the executor wraps them with /usr/bin/time -v and harvests the results.

The only ddsim arguments that the executor needs to know about are:

  • --compactFile — to allow per-run XML patching (geometry sweep)
  • --numberOfEvents — to compute events/sec
  • --outputFile — to measure output file size

Everything else (--enableGun, --gun.particle, --runType, steering files, …) is passed through verbatim via extra_args.

Per-event timing

When available, the k4Bench C++ timing plugin is loaded automatically as a DDG4 event action. The plugin writes per-event timing metrics to JSON files inside the log directory. These profiling artifacts are intentionally kept separate from :class:RunResult, which only stores run-level benchmark metrics.

run_ddsim

run_ddsim(*, xml_path: Path, label: str, n_events: int, output_file: Path, log_dir: Path, setup_script: Path | None = None, extra_args: list[str] | None = None, verbose: bool = False) -> RunResult

Run ddsim for one geometry configuration and return collected metrics.

The executor injects --compactFile, --numberOfEvents, and --outputFile automatically. All other ddsim options should be supplied via extra_args.

Parameters:

Name Type Description Default
xml_path Path

Compact XML file passed to --compactFile.

required
label str

Human-readable name for this run. Used as the log filename stem and stored in :attr:RunResult.label.

required
n_events int

Number of events; passed to --numberOfEvents and used to compute :attr:RunResult.events_per_sec.

required
output_file Path

EDM4hep ROOT output path passed to --outputFile. Its size is recorded after the run.

required
log_dir Path

Directory where <label>.log is written.

required
setup_script Path | None

Optional shell script sourced before ddsim.

None
extra_args list[str] | None

Additional ddsim arguments passed through verbatim.

None
verbose bool

Stream ddsim output live to stdout.

False

Returns:

Type Description
RunResult

Process-level timing, memory, and throughput metrics.

Source code in k4bench/runner/executor.py
def run_ddsim(
    *,
    xml_path: Path,
    label: str,
    n_events: int,
    output_file: Path,
    log_dir: Path,
    setup_script: Path | None = None,
    extra_args: list[str] | None = None,
    verbose: bool = False,
) -> RunResult:
    """Run ddsim for one geometry configuration and return collected metrics.

    The executor injects ``--compactFile``, ``--numberOfEvents``, and
    ``--outputFile`` automatically. All other ddsim options should be
    supplied via *extra_args*.

    Parameters
    ----------
    xml_path:
        Compact XML file passed to ``--compactFile``.

    label:
        Human-readable name for this run. Used as the log filename stem
        and stored in :attr:`RunResult.label`.

    n_events:
        Number of events; passed to ``--numberOfEvents`` and used to
        compute :attr:`RunResult.events_per_sec`.

    output_file:
        EDM4hep ROOT output path passed to ``--outputFile``.
        Its size is recorded after the run.

    log_dir:
        Directory where ``<label>.log`` is written.

    setup_script:
        Optional shell script sourced before ddsim.

    extra_args:
        Additional ddsim arguments passed through verbatim.

    verbose:
        Stream ddsim output live to stdout.

    Returns
    -------
    RunResult
        Process-level timing, memory, and throughput metrics.
    """
    log_dir.mkdir(parents=True, exist_ok=True)

    log_path = log_dir / f"{label}.log"

    # Optional plugin output artifacts
    event_json_path = log_dir / f"{label}_events.json"
    event_json_path.unlink(missing_ok=True)
    region_json_path = log_dir / f"{label}_regions.json"
    region_json_path.unlink(missing_ok=True)

    env = os.environ.copy()

    plugin_available = setup_plugin_environment(
        env=env,
        event_json_path=event_json_path,
        region_json_path=region_json_path,
    )

    cmd = _build_command(
        xml_path=xml_path,
        n_events=n_events,
        output_file=output_file,
        setup_script=setup_script,
        extra_args=extra_args,
        plugin_available=plugin_available,
    )

    try:
        proc = subprocess.Popen(
            cmd,
            shell=True,
            executable="/bin/bash",
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            env=env,
            start_new_session=True,
        )

        time_output_lines = deque(maxlen=200)

        with log_path.open("w") as log_file:

            if proc.stdout is None:
                raise RuntimeError("Failed to capture ddsim stdout.")

            for line in proc.stdout:

                # Stream to terminal if requested
                if verbose:
                    print(line, end="", flush=True)

                # Stream immediately to logfile
                log_file.write(line)

                # Keep only a rolling tail in memory
                time_output_lines.append(line)

            proc.wait()  # ensure returncode is populated

    except KeyboardInterrupt:
        print("\nStopping ddsim...", flush=True)

        try:
            os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
            proc.wait(timeout=5)

        except (subprocess.TimeoutExpired, ProcessLookupError):

            try:
                os.killpg(os.getpgid(proc.pid), signal.SIGKILL)

            except ProcessLookupError:
                pass

        raise

    metrics = parse_time_output("".join(time_output_lines))

    output_size_mb: float | None = None

    if output_file.exists():
        output_size_mb = output_file.stat().st_size / 1024**2

    events_per_sec: float | None = None

    if metrics["wall_time_s"] is not None and metrics["wall_time_s"] > 0:
        events_per_sec = round(
            n_events / metrics["wall_time_s"],
            4,
        )

    if metrics["wall_time_raw"] is None or metrics["peak_rss_mb"] is None:
        _warn_unparsed(label, log_path)

    return RunResult(
        label=label,
        returncode=proc.returncode,
        n_events=n_events,
        wall_time_raw=metrics["wall_time_raw"],
        wall_time_s=metrics["wall_time_s"],
        user_cpu_s=metrics["user_cpu_s"],
        sys_cpu_s=metrics["sys_cpu_s"],
        peak_rss_mb=metrics["peak_rss_mb"],
        major_page_faults=metrics["major_page_faults"],
        voluntary_ctx_switches=metrics["voluntary_ctx_switches"],
        involuntary_ctx_switches=metrics["involuntary_ctx_switches"],
        output_size_mb=output_size_mb,
        events_per_sec=events_per_sec,
    )