Skip to content
10 changes: 9 additions & 1 deletion src/con_duct/_constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
"""Constants used throughout con-duct."""

__schema_version__ = "0.2.2"
__schema_version__ = "0.3.0"

# Skip pids whose ps `etime` reads "00:00" -- i.e. too young for ps
# to report a non-zero elapsed time. Mitigates con/duct#399's
# churn-of-short-lived-workers case at the cost of the whole
# record (cmd/rss/vsz/pmem) for that pid in that sample. Mirrors
# the smon ancestor's pattern. Module-level toggle for now;
# elevating to a CLI flag later is a one-line wire-up.
DROP_YOUNG_PIDS = True

ENV_PREFIXES = ("PBS_", "SLURM_", "OSG")
SUFFIXES = {
Expand Down
10 changes: 7 additions & 3 deletions src/con_duct/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,14 @@ class SystemInfo:

@dataclass
class ProcessStats:
pcpu: float # %CPU
pcpu: float # %CPU (corrected; equals pcpu_raw until calc is wired)
pcpu_raw: float # %CPU as reported by ps; preserved for audit
pmem: float # %MEM
rss: int # Memory Resident Set Size in Bytes
vsz: int # Virtual Memory size in Bytes
timestamp: str
etime: str
etime: str # raw ps etime string; legacy, see etimes
etimes: float # elapsed seconds (parsed from etime)
stat: Counter
cmd: str

Expand All @@ -93,11 +95,13 @@ def aggregate(self, other: ProcessStats) -> ProcessStats:
new_counter.update(other.stat)
return ProcessStats(
pcpu=max(self.pcpu, other.pcpu),
pcpu_raw=max(self.pcpu_raw, other.pcpu_raw),
pmem=max(self.pmem, other.pmem),
rss=max(self.rss, other.rss),
vsz=max(self.vsz, other.vsz),
timestamp=max(self.timestamp, other.timestamp),
etime=other.etime, # For the aggregate always take the latest
etimes=other.etimes,
stat=new_counter,
cmd=cmd,
)
Expand All @@ -111,7 +115,7 @@ def __post_init__(self) -> None:
self._validate()

def _validate(self) -> None:
assert_num(self.pcpu, self.pmem, self.rss, self.vsz)
assert_num(self.pcpu, self.pcpu_raw, self.pmem, self.rss, self.vsz, self.etimes)


@dataclass
Expand Down
67 changes: 58 additions & 9 deletions src/con_duct/_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import subprocess
import sys
from typing import Callable, Optional
from con_duct._constants import DROP_YOUNG_PIDS
from con_duct._models import Averages, ProcessStats, Sample
from con_duct._utils import etime_to_etimes, instantaneous_pcpu

SYSTEM = platform.system()

Expand All @@ -26,8 +28,11 @@
lgr = logging.getLogger("con-duct")


def _get_sample_linux(session_id: int) -> Sample:
def _get_sample_linux(
session_id: int, prev: dict[int, tuple[float, float]]
) -> tuple[Optional[Sample], dict[int, tuple[float, float]]]:
sample = Sample()
new_prev: dict[int, tuple[float, float]] = {}

ps_command = [
"ps",
Expand All @@ -45,21 +50,52 @@ def _get_sample_linux(session_id: int) -> Sample:

pid, pcpu, pmem, rss_kib, vsz_kib, etime, stat, cmd = line.split(maxsplit=7)

if DROP_YOUNG_PIDS and etime == "00:00":
# Pid too young for ps to report a non-zero elapsed
# time; skip the whole record (smon's pattern). Loses
# rss/vsz/cmd for this sample, but avoids inflating
# pcpu_raw via the first-observation fallback for the
# con/duct#399 churn case.
continue

pid_int = int(pid)
pcpu_raw_value = float(pcpu)
etimes_value = etime_to_etimes(etime)
prev_entry = prev.get(pid_int)
if prev_entry is not None:
prev_pcpu_raw, prev_etimes = prev_entry
pcpu_value = instantaneous_pcpu(
prev_pcpu_raw, prev_etimes, pcpu_raw_value, etimes_value
)
else:
# First observation of this pid; no prior state to delta against.
pcpu_value = pcpu_raw_value
new_prev[pid_int] = (pcpu_raw_value, etimes_value)

sample.add_pid(
pid=int(pid),
pid=pid_int,
stats=ProcessStats(
pcpu=float(pcpu),
pcpu=pcpu_value,
pcpu_raw=pcpu_raw_value,
pmem=float(pmem),
rss=int(rss_kib) * 1024,
vsz=int(vsz_kib) * 1024,
timestamp=datetime.now().astimezone().isoformat(),
etime=etime,
etimes=etimes_value,
stat=Counter([stat]),
cmd=cmd,
),
)
if not sample.stats:
# Every pid was filtered (e.g. DROP_YOUNG_PIDS dropped all
# the youngest observations) or ps returned no rows. Mirror
# _get_sample_mac's no-pids path so the monitor loop skips
# this round rather than crashing on Averages.from_sample's
# not-None assertions.
return None, new_prev
sample.averages = Averages.from_sample(sample=sample)
return sample
return sample, new_prev


def _try_to_get_sid(pid: int) -> int:
Expand Down Expand Up @@ -92,22 +128,30 @@ def _add_pid_to_sample_from_line_mac(
pid, pcpu, pmem, rss_kb, vsz_kb, etime, stat, cmd = line.split(maxsplit=7)

if pid_to_matching_sid.get(int(pid)) is not None:
pcpu_value = float(pcpu)
sample.add_pid(
pid=int(pid),
stats=ProcessStats(
pcpu=float(pcpu),
pcpu=pcpu_value,
pcpu_raw=pcpu_value,
pmem=float(pmem),
rss=int(rss_kb) * 1024,
vsz=int(vsz_kb) * 1024,
timestamp=datetime.now().astimezone().isoformat(),
etime=etime,
etimes=etime_to_etimes(etime),
stat=Counter([stat]),
cmd=cmd,
),
)


def _get_sample_mac(session_id: int) -> Optional[Sample]:
def _get_sample_mac(
session_id: int, prev: dict[int, tuple[float, float]]
) -> tuple[Optional[Sample], dict[int, tuple[float, float]]]:
# Darwin's ps reports a 5/8-decayed EWMA, not a cumulative ratio,
# so the delta calc does not apply. prev is accepted for uniform
# signature with the Linux sampler and returned unchanged.
sample = Sample()

lines = _get_ps_lines_mac()
Expand All @@ -120,7 +164,7 @@ def _get_sample_mac(session_id: int) -> Optional[Sample]:

if not pid_to_matching_sid:
lgr.debug(f"No processes found for session ID {session_id}.")
return None
return None, prev

# collections.deque with maxlen=0 is used to approximate the
# performance of list comprehension (superior to basic for-loop)
Expand All @@ -136,11 +180,16 @@ def _get_sample_mac(session_id: int) -> Optional[Sample]:
)

sample.averages = Averages.from_sample(sample=sample)
return sample
return sample, prev


_get_sample_per_system = {
"Linux": _get_sample_linux,
"Darwin": _get_sample_mac,
}
_get_sample: Callable[[int], Optional[Sample]] = _get_sample_per_system[SYSTEM] # type: ignore[assignment]
_get_sample: Callable[
[int, dict[int, tuple[float, float]]],
tuple[Optional[Sample], dict[int, tuple[float, float]]],
] = _get_sample_per_system[
SYSTEM
] # type: ignore[assignment]
6 changes: 5 additions & 1 deletion src/con_duct/_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def __init__(
self.system_info: SystemInfo | None = None
self.full_run_stats = Sample()
self.current_sample: Optional[Sample] = None
# Per-pid (pcpu_raw, etimes) carried forward across samples
# so the Linux sampler can compute instantaneous %CPU as a
# delta. Mac sampler accepts but does not consume it.
self.prev_raw: dict[int, tuple[float, float]] = {}
self.end_time: float | None = None
self.run_time_seconds: str | None = None
self.usage_file: TextIO | None = None
Expand Down Expand Up @@ -135,7 +139,7 @@ def get_system_info(self) -> None:
def collect_sample(self) -> Optional[Sample]:
assert self.session_id is not None
try:
sample = _get_sample(self.session_id)
sample, self.prev_raw = _get_sample(self.session_id, self.prev_raw)
return sample
except subprocess.CalledProcessError as exc: # when session_id has no processes
lgr.debug("Error collecting sample: %s", str(exc))
Expand Down
70 changes: 70 additions & 0 deletions src/con_duct/_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Utility functions for con-duct."""

from __future__ import annotations
from typing import Any


Expand All @@ -8,6 +9,75 @@ def assert_num(*values: Any) -> None:
assert isinstance(value, (float, int))


# TODO: consider asking ps for `etimes` (seconds) directly via
# `-o etimes` instead of parsing `etime`. Even if we switch, this
# parser is worth keeping for backwards compatibility with existing
# usage.jsonl logs that persist `etime` as a string.
def etime_to_etimes(etime: str) -> float:
"""Parse a ps ``etime`` string into seconds.

ps's ``etime`` format is ``[[DD-]HH:]MM:SS``: ``MM:SS`` always,
with ``HH:`` prepended after one hour and ``DD-`` prepended after
one day.

:param etime: elapsed-time string from ``ps -o etime``.
:returns: elapsed time in seconds.
:raises ValueError: if ``etime`` does not match the expected shape.
"""
if "-" in etime:
days_str, rest = etime.split("-", 1)
days = int(days_str)
else:
days = 0
rest = etime
parts = rest.split(":")
if len(parts) == 2:
hours, minutes, seconds = 0, int(parts[0]), int(parts[1])
elif len(parts) == 3:
hours, minutes, seconds = int(parts[0]), int(parts[1]), int(parts[2])
else:
raise ValueError(f"Unparsable etime: {etime!r}")
return float(days * 86400 + hours * 3600 + minutes * 60 + seconds)


def instantaneous_pcpu(
prev_pcpu: float,
prev_etimes: float,
curr_pcpu: float,
curr_etimes: float,
) -> float:
"""Instantaneous %CPU between two ps samples of the same pid.

Inverts the procps identity ``pcpu = cputime / etime * 100`` to
recover cputime at each sample, takes the cputime delta, and
divides by the elapsed interval. The ``/100`` and ``*100``
cancel, so the result is in the same units as ``pcpu``.

Linux-only: assumes ``pcpu`` is the cumulative ``cputime/etime``
ratio. Invalid on Darwin (decayed EWMA).

Precision floor: ps reports ``etime`` at 1-second resolution, so
at sample intervals near 1s this function frequently falls back
to ``curr_pcpu`` (see PROBLEMS.md in the resource-measurement
notebook).

:param prev_pcpu: %CPU from the earlier sample.
:param prev_etimes: elapsed seconds at the earlier sample.
:param curr_pcpu: %CPU from the later sample.
:param curr_etimes: elapsed seconds at the later sample.
:returns: instantaneous %CPU over the interval. Falls back to
``curr_pcpu`` when the interval is non-positive (etimes did
not advance, or regressed -- suspected pid reuse). The
fallback keeps the value honest about "something is here"
rather than emitting a silent zero; a sophisticated consumer
can detect fallback by comparing against ``pcpu_raw``.
"""
interval = curr_etimes - prev_etimes
if interval <= 0:
return curr_pcpu
return (curr_pcpu * curr_etimes - prev_pcpu * prev_etimes) / interval


def parse_version(version_str: str) -> tuple[int, int, int]:
x_y_z = version_str.split(".")
if len(x_y_z) != 3:
Expand Down
6 changes: 6 additions & 0 deletions src/con_duct/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,12 @@ def _create_ls_parser() -> argparse.ArgumentParser:

def run_command(args: argparse.Namespace) -> int:
"""Execute a command with duct monitoring."""
if args.sample_interval < 1.0:
lgr.warning(
"sample-interval %ss is below ps's 1s etime resolution; "
"this can cause phase artifacts (false pcpu spikes) on Linux.",
args.sample_interval,
)
kwargs = vars(args).copy()
# Remove arguments that are not for duct_execute
for key in ("func", "log_level", "quiet"):
Expand Down
33 changes: 33 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,39 @@ def set_test_config() -> Generator:
del os.environ[k]


@pytest.fixture(scope="session", autouse=True)
def disable_drop_young_pids_for_tests() -> Generator:
"""Disable the DROP_YOUNG_PIDS sampler filter for in-process
tests. Affects only the test process's interpreter; subprocess
tests (e.g. test_e2e.py) inherit the production default and
must use commands that outlive ps's 1-second etime quantum.
"""
import con_duct._sampling as sampling_module

orig = sampling_module.DROP_YOUNG_PIDS
sampling_module.DROP_YOUNG_PIDS = False
yield
sampling_module.DROP_YOUNG_PIDS = orig


@pytest.fixture
def enable_drop_young_pids(monkeypatch: pytest.MonkeyPatch) -> None:
"""Opt back into the production DROP_YOUNG_PIDS filter for a
single in-process test (used by unit tests in test_sampling.py
that specifically exercise the drop behavior).

Only effective for tests that exercise duct via in-process
Python imports (e.g. test_sampling.py with mocked ps,
test_execution.py via run_duct_command). Has no effect on
subprocess-based tests (test_e2e.py), which spawn fresh Python
interpreters that read DROP_YOUNG_PIDS from _constants.py
directly. Subprocess tests must instead use workloads that
outlive ps's 1-second etime quantum if they assert on
observability under the production default.
"""
monkeypatch.setattr("con_duct._sampling.DROP_YOUNG_PIDS", True)


@pytest.fixture(autouse=True)
def reset_logger_state() -> Generator:
"""Automatically reset logger state after each test.
Expand Down
Loading
Loading