From d6054d6e7b2a3150fb75f3d4695372035e5f2f32 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Mon, 27 Apr 2026 17:55:00 -0500 Subject: [PATCH 01/10] utils: add etime_to_etimes parser Parses ps's elapsed-time string format ([[DD-]HH:]MM:SS) into float seconds. The downstream instantaneous-pcpu calculation needs a numeric elapsed time, but ps -o etime emits only the formatted string. A future change may switch to ps -o etimes (which ps can produce directly as seconds); the parser stays useful either way for reading existing usage.jsonl logs that persist the string form. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/con_duct/_utils.py | 32 +++++++++++++++++++++++++++++++ test/duct_main/test_duct_utils.py | 23 +++++++++++++++++++++- 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/src/con_duct/_utils.py b/src/con_duct/_utils.py index 6ccd29c0..60d92811 100644 --- a/src/con_duct/_utils.py +++ b/src/con_duct/_utils.py @@ -1,5 +1,6 @@ """Utility functions for con-duct.""" +from __future__ import annotations from typing import Any @@ -8,6 +9,37 @@ def assert_num(*values: Any) -> None: assert isinstance(value, (float, int)) +# TODO: consider asking ps for `etimes` (seconds) directly via +# `-o etimes` instead of parsing `etime`. Even if we switch, this +# parser is worth keeping for backwards compatibility with existing +# usage.jsonl logs that persist `etime` as a string. +def etime_to_etimes(etime: str) -> float: + """Parse a ps ``etime`` string into seconds. + + ps's ``etime`` format is ``[[DD-]HH:]MM:SS``: ``MM:SS`` always, + with ``HH:`` prepended after one hour and ``DD-`` prepended after + one day. + + :param etime: elapsed-time string from ``ps -o etime``. + :returns: elapsed time in seconds. + :raises ValueError: if ``etime`` does not match the expected shape. + """ + if "-" in etime: + days_str, rest = etime.split("-", 1) + days = int(days_str) + else: + days = 0 + rest = etime + parts = rest.split(":") + if len(parts) == 2: + hours, minutes, seconds = 0, int(parts[0]), int(parts[1]) + elif len(parts) == 3: + hours, minutes, seconds = int(parts[0]), int(parts[1]), int(parts[2]) + else: + raise ValueError(f"Unparsable etime: {etime!r}") + return float(days * 86400 + hours * 3600 + minutes * 60 + seconds) + + def parse_version(version_str: str) -> tuple[int, int, int]: x_y_z = version_str.split(".") if len(x_y_z) != 3: diff --git a/test/duct_main/test_duct_utils.py b/test/duct_main/test_duct_utils.py index c25a4991..39435d51 100644 --- a/test/duct_main/test_duct_utils.py +++ b/test/duct_main/test_duct_utils.py @@ -1,7 +1,7 @@ """Tests for utility functions in _duct_main.py""" import pytest -from con_duct._utils import assert_num +from con_duct._utils import assert_num, etime_to_etimes @pytest.mark.parametrize("input_value", [0, 1, 2, -1, 100, 0.001, -1.68]) @@ -13,3 +13,24 @@ def test_assert_num_green(input_value: int) -> None: def test_assert_num_red(input_value: int) -> None: with pytest.raises(AssertionError): assert_num(input_value) + + +@pytest.mark.parametrize( + "etime,expected", + [ + ("00:42", 42.0), + ("01:30", 90.0), + ("01:00:00", 3600.0), + ("12:34:56", 12 * 3600 + 34 * 60 + 56), + ("02-03:04:05", 2 * 86400 + 3 * 3600 + 4 * 60 + 5), + ("100-00:00:00", 100 * 86400.0), + ], +) +def test_etime_to_etimes_green(etime: str, expected: float) -> None: + assert etime_to_etimes(etime) == expected + + +@pytest.mark.parametrize("etime", ["", "garbage", "12", "1:2:3:4", "ab:cd"]) +def test_etime_to_etimes_red(etime: str) -> None: + with pytest.raises(ValueError): + etime_to_etimes(etime) From 56bae660a612ddad5c9a30adbec1ecfb69e091b4 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Mon, 27 Apr 2026 17:56:53 -0500 Subject: [PATCH 02/10] utils: add instantaneous_pcpu calculator Pure function that recovers instantaneous %CPU between two ps samples of the same pid by inverting the procps cumulative-pcpu identity: cputime = pcpu * etime / 100. Subtracting cputime at two samples and dividing by the interval gives a value in the same units as the input pcpu. Linux-only by assumption (Darwin's ps reports a 5/8-decayed EWMA, not a cumulative ratio, so the identity does not hold there). Returns None when the elapsed-time interval is non-positive -- either the pid has not aged a full ps tick, or etimes regressed (suspected pid reuse). This commit only adds the function and its tests; wiring into the sampler is a separate change. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/con_duct/_utils.py | 35 +++++++++++++++++++++++ test/duct_main/test_duct_utils.py | 46 ++++++++++++++++++++++++++++++- 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/src/con_duct/_utils.py b/src/con_duct/_utils.py index 60d92811..f358223c 100644 --- a/src/con_duct/_utils.py +++ b/src/con_duct/_utils.py @@ -40,6 +40,41 @@ def etime_to_etimes(etime: str) -> float: return float(days * 86400 + hours * 3600 + minutes * 60 + seconds) +def instantaneous_pcpu( + prev_pcpu: float, + prev_etimes: float, + curr_pcpu: float, + curr_etimes: float, +) -> float | None: + """Instantaneous %CPU between two ps samples of the same pid. + + Inverts the procps identity ``pcpu = cputime / etime * 100`` to + recover cputime at each sample, takes the cputime delta, and + divides by the elapsed interval. The ``/100`` and ``*100`` + cancel, so the result is in the same units as ``pcpu``. + + Linux-only: assumes ``pcpu`` is the cumulative ``cputime/etime`` + ratio. Invalid on Darwin (decayed EWMA). + + Precision floor: ps reports ``etime`` at 1-second resolution, so + at sample intervals near 1s this function is noisy or returns + ``None`` frequently (see PROBLEMS.md in the resource-measurement + notebook). + + :param prev_pcpu: %CPU from the earlier sample. + :param prev_etimes: elapsed seconds at the earlier sample. + :param curr_pcpu: %CPU from the later sample. + :param curr_etimes: elapsed seconds at the later sample. + :returns: instantaneous %CPU over the interval, or ``None`` if + the interval is non-positive (etimes did not advance, or + regressed -- suspected pid reuse). + """ + interval = curr_etimes - prev_etimes + if interval <= 0: + return None + return (curr_pcpu * curr_etimes - prev_pcpu * prev_etimes) / interval + + def parse_version(version_str: str) -> tuple[int, int, int]: x_y_z = version_str.split(".") if len(x_y_z) != 3: diff --git a/test/duct_main/test_duct_utils.py b/test/duct_main/test_duct_utils.py index 39435d51..482994d9 100644 --- a/test/duct_main/test_duct_utils.py +++ b/test/duct_main/test_duct_utils.py @@ -1,7 +1,7 @@ """Tests for utility functions in _duct_main.py""" import pytest -from con_duct._utils import assert_num, etime_to_etimes +from con_duct._utils import assert_num, etime_to_etimes, instantaneous_pcpu @pytest.mark.parametrize("input_value", [0, 1, 2, -1, 100, 0.001, -1.68]) @@ -34,3 +34,47 @@ def test_etime_to_etimes_green(etime: str, expected: float) -> None: def test_etime_to_etimes_red(etime: str) -> None: with pytest.raises(ValueError): etime_to_etimes(etime) + + +@pytest.mark.parametrize( + "prev_pcpu,prev_etimes,curr_pcpu,curr_etimes,expected", + [ + # Motivating con/duct#399 case: 100% for 60s then idle for 60s. + # Lifetime pcpu still reads 50% at t=120 (60 cputime / 120 + # etime), but the corrected instantaneous reading is 0%. + (100.0, 60.0, 50.0, 120.0, 0.0), + # Constant 84% load across a 10s interval. + (80.0, 10.0, 82.0, 20.0, 84.0), + # Pid that ramps from 50% lifetime to 75% lifetime over 100s + # of new wall time -> 100% during the new interval. + (50.0, 100.0, 75.0, 200.0, 100.0), + ], +) +def test_instantaneous_pcpu_green( + prev_pcpu: float, + prev_etimes: float, + curr_pcpu: float, + curr_etimes: float, + expected: float, +) -> None: + assert ( + instantaneous_pcpu(prev_pcpu, prev_etimes, curr_pcpu, curr_etimes) == expected + ) + + +@pytest.mark.parametrize( + "prev_pcpu,prev_etimes,curr_pcpu,curr_etimes", + [ + # etimes regressed -> suspected pid reuse. + (80.0, 100.0, 10.0, 2.0), + # Same instant -> interval is zero, no rate definable. + (50.0, 100.0, 50.0, 100.0), + ], +) +def test_instantaneous_pcpu_none( + prev_pcpu: float, + prev_etimes: float, + curr_pcpu: float, + curr_etimes: float, +) -> None: + assert instantaneous_pcpu(prev_pcpu, prev_etimes, curr_pcpu, curr_etimes) is None From ab382adcbf645a27e2f5833048d2f5915370fc9e Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Mon, 27 Apr 2026 17:57:11 -0500 Subject: [PATCH 03/10] models: persist pcpu_raw and etimes per ProcessStats Add two required fields to ProcessStats so the sampler keeps a parsed numeric elapsed time alongside ps's raw outputs: - etimes: float seconds derived from etime via etime_to_etimes. Used by the upcoming delta-pcpu calc; etime (string) is retained in this commit for back-compat with existing usage.jsonl readers. - pcpu_raw: copy of the value ps reported. Equal to pcpu in this commit; pcpu will diverge once the delta calc is wired and pcpu_raw becomes the audit trail for the derivation. Both samplers (Linux and Darwin) populate the new fields. Aggregate takes max for pcpu_raw and the later sample's etimes, mirroring existing behavior for pcpu and etime. Test fixtures across test_aggregation and test_report pick up matching values. A backwards-compat layer for existing usage.jsonl logs that pre-date these fields is intentionally deferred. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/con_duct/_models.py | 10 +++++++--- src/con_duct/_sampling.py | 11 +++++++++-- test/duct_main/test_aggregation.py | 10 ++++++++++ test/duct_main/test_report.py | 10 ++++++++++ 4 files changed, 36 insertions(+), 5 deletions(-) diff --git a/src/con_duct/_models.py b/src/con_duct/_models.py index d3400893..414db429 100644 --- a/src/con_duct/_models.py +++ b/src/con_duct/_models.py @@ -66,12 +66,14 @@ class SystemInfo: @dataclass class ProcessStats: - pcpu: float # %CPU + pcpu: float # %CPU (corrected; equals pcpu_raw until calc is wired) + pcpu_raw: float # %CPU as reported by ps; preserved for audit pmem: float # %MEM rss: int # Memory Resident Set Size in Bytes vsz: int # Virtual Memory size in Bytes timestamp: str - etime: str + etime: str # raw ps etime string; legacy, see etimes + etimes: float # elapsed seconds (parsed from etime) stat: Counter cmd: str @@ -93,11 +95,13 @@ def aggregate(self, other: ProcessStats) -> ProcessStats: new_counter.update(other.stat) return ProcessStats( pcpu=max(self.pcpu, other.pcpu), + pcpu_raw=max(self.pcpu_raw, other.pcpu_raw), pmem=max(self.pmem, other.pmem), rss=max(self.rss, other.rss), vsz=max(self.vsz, other.vsz), timestamp=max(self.timestamp, other.timestamp), etime=other.etime, # For the aggregate always take the latest + etimes=other.etimes, stat=new_counter, cmd=cmd, ) @@ -111,7 +115,7 @@ def __post_init__(self) -> None: self._validate() def _validate(self) -> None: - assert_num(self.pcpu, self.pmem, self.rss, self.vsz) + assert_num(self.pcpu, self.pcpu_raw, self.pmem, self.rss, self.vsz, self.etimes) @dataclass diff --git a/src/con_duct/_sampling.py b/src/con_duct/_sampling.py index caa90517..a0b9c1b1 100644 --- a/src/con_duct/_sampling.py +++ b/src/con_duct/_sampling.py @@ -10,6 +10,7 @@ import sys from typing import Callable, Optional from con_duct._models import Averages, ProcessStats, Sample +from con_duct._utils import etime_to_etimes SYSTEM = platform.system() @@ -45,15 +46,18 @@ def _get_sample_linux(session_id: int) -> Sample: pid, pcpu, pmem, rss_kib, vsz_kib, etime, stat, cmd = line.split(maxsplit=7) + pcpu_value = float(pcpu) sample.add_pid( pid=int(pid), stats=ProcessStats( - pcpu=float(pcpu), + pcpu=pcpu_value, + pcpu_raw=pcpu_value, pmem=float(pmem), rss=int(rss_kib) * 1024, vsz=int(vsz_kib) * 1024, timestamp=datetime.now().astimezone().isoformat(), etime=etime, + etimes=etime_to_etimes(etime), stat=Counter([stat]), cmd=cmd, ), @@ -92,15 +96,18 @@ def _add_pid_to_sample_from_line_mac( pid, pcpu, pmem, rss_kb, vsz_kb, etime, stat, cmd = line.split(maxsplit=7) if pid_to_matching_sid.get(int(pid)) is not None: + pcpu_value = float(pcpu) sample.add_pid( pid=int(pid), stats=ProcessStats( - pcpu=float(pcpu), + pcpu=pcpu_value, + pcpu_raw=pcpu_value, pmem=float(pmem), rss=int(rss_kb) * 1024, vsz=int(vsz_kb) * 1024, timestamp=datetime.now().astimezone().isoformat(), etime=etime, + etimes=etime_to_etimes(etime), stat=Counter([stat]), cmd=cmd, ), diff --git a/test/duct_main/test_aggregation.py b/test/duct_main/test_aggregation.py index fb6925ca..6c9e4dcf 100644 --- a/test/duct_main/test_aggregation.py +++ b/test/duct_main/test_aggregation.py @@ -10,54 +10,64 @@ stat0 = ProcessStats( pcpu=0.0, + pcpu_raw=0.0, pmem=0, rss=0, vsz=0, timestamp="2024-06-11T10:09:37-04:00", etime="00:00", + etimes=0.0, cmd="cmd 0", stat=Counter(["stat0"]), ) stat1 = ProcessStats( pcpu=1.0, + pcpu_raw=1.0, pmem=1.0, rss=1, vsz=1, timestamp="2024-06-11T10:13:23-04:00", etime="00:02", + etimes=2.0, cmd="cmd 1", stat=Counter(["stat1"]), ) stat2 = ProcessStats( pcpu=2.0, + pcpu_raw=2.0, pmem=2.0, rss=2, vsz=2, timestamp="2024-06-11T10:13:23-04:00", etime="00:02", + etimes=2.0, cmd="cmd 2", stat=Counter(["stat2"]), ) stat100 = ProcessStats( pcpu=100.0, + pcpu_raw=100.0, pmem=100.0, rss=2, vsz=2, timestamp="2024-06-11T10:13:23-04:00", etime="00:02", + etimes=2.0, cmd="cmd 100", stat=Counter(["stat100"]), ) stat_big = ProcessStats( pcpu=20000.0, + pcpu_raw=20000.0, pmem=21234234.0, rss=43645634562, vsz=2345234523452342, timestamp="2024-06-11T10:13:23-04:00", etime="00:02", + etimes=2.0, cmd="cmd 2", stat=Counter(["statbig"]), ) diff --git a/test/duct_main/test_report.py b/test/duct_main/test_report.py index c4cb9206..9982ca7a 100644 --- a/test/duct_main/test_report.py +++ b/test/duct_main/test_report.py @@ -12,33 +12,39 @@ stat0 = ProcessStats( pcpu=0.0, + pcpu_raw=0.0, pmem=0, rss=0, vsz=0, timestamp="2024-06-11T10:09:37-04:00", etime="00:00", + etimes=0.0, cmd="cmd 1", stat=Counter(["stat0"]), ) stat1 = ProcessStats( pcpu=1.0, + pcpu_raw=1.0, pmem=0, rss=0, vsz=0, timestamp="2024-06-11T10:13:23-04:00", etime="00:02", + etimes=2.0, cmd="cmd 1", stat=Counter(["stat1"]), ) stat2 = ProcessStats( pcpu=1.1, + pcpu_raw=1.1, pmem=1.1, rss=11, vsz=11, timestamp="2024-06-11T10:13:23-04:00", etime="00:02", + etimes=2.0, cmd="cmd 1", stat=Counter(["stat2"]), ) @@ -178,11 +184,13 @@ def test_process_stats_green( # Assert does not raise ProcessStats( pcpu=pcpu, + pcpu_raw=pcpu, pmem=pmem, rss=rss, vsz=vsz, timestamp=datetime.now().astimezone().isoformat(), etime=etime, + etimes=0.0, cmd=cmd, stat=Counter(["stat0"]), ) @@ -204,11 +212,13 @@ def test_process_stats_red( with pytest.raises(AssertionError): ProcessStats( pcpu=pcpu, + pcpu_raw=0.0, pmem=pmem, rss=rss, vsz=vsz, timestamp=datetime.now().astimezone().isoformat(), etime=etime, + etimes=0.0, cmd=cmd, stat=Counter(["stat0"]), ) From 7ec330f8e5edf9bdf7515d9bb7180e37b91ff54a Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Mon, 27 Apr 2026 19:00:34 -0500 Subject: [PATCH 04/10] utils: instantaneous_pcpu falls back to curr_pcpu Previous semantics returned None when the elapsed-time interval was non-positive (etimes did not advance, or regressed). That forced every caller into the same fallback boilerplate. New semantics: return curr_pcpu in the non-positive case. A sophisticated consumer can still detect "fallback was used" by comparing the result against pcpu_raw post-hoc; the audit data is unchanged. The function's return type tightens from float | None to float, and the only existing caller (the test suite) is updated. Rationale: returning curr_pcpu is the conservative answer that keeps the value honest about "something is here" rather than emitting a silent zero -- avoiding the false-zero failure mode of under-reporting churning workloads as idle. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/con_duct/_utils.py | 17 ++++++++++------- test/duct_main/test_duct_utils.py | 18 +++++++++++------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/con_duct/_utils.py b/src/con_duct/_utils.py index f358223c..ac2d5148 100644 --- a/src/con_duct/_utils.py +++ b/src/con_duct/_utils.py @@ -45,7 +45,7 @@ def instantaneous_pcpu( prev_etimes: float, curr_pcpu: float, curr_etimes: float, -) -> float | None: +) -> float: """Instantaneous %CPU between two ps samples of the same pid. Inverts the procps identity ``pcpu = cputime / etime * 100`` to @@ -57,21 +57,24 @@ def instantaneous_pcpu( ratio. Invalid on Darwin (decayed EWMA). Precision floor: ps reports ``etime`` at 1-second resolution, so - at sample intervals near 1s this function is noisy or returns - ``None`` frequently (see PROBLEMS.md in the resource-measurement + at sample intervals near 1s this function frequently falls back + to ``curr_pcpu`` (see PROBLEMS.md in the resource-measurement notebook). :param prev_pcpu: %CPU from the earlier sample. :param prev_etimes: elapsed seconds at the earlier sample. :param curr_pcpu: %CPU from the later sample. :param curr_etimes: elapsed seconds at the later sample. - :returns: instantaneous %CPU over the interval, or ``None`` if - the interval is non-positive (etimes did not advance, or - regressed -- suspected pid reuse). + :returns: instantaneous %CPU over the interval. Falls back to + ``curr_pcpu`` when the interval is non-positive (etimes did + not advance, or regressed -- suspected pid reuse). The + fallback keeps the value honest about "something is here" + rather than emitting a silent zero; a sophisticated consumer + can detect fallback by comparing against ``pcpu_raw``. """ interval = curr_etimes - prev_etimes if interval <= 0: - return None + return curr_pcpu return (curr_pcpu * curr_etimes - prev_pcpu * prev_etimes) / interval diff --git a/test/duct_main/test_duct_utils.py b/test/duct_main/test_duct_utils.py index 482994d9..a14d890e 100644 --- a/test/duct_main/test_duct_utils.py +++ b/test/duct_main/test_duct_utils.py @@ -63,18 +63,22 @@ def test_instantaneous_pcpu_green( @pytest.mark.parametrize( - "prev_pcpu,prev_etimes,curr_pcpu,curr_etimes", + "prev_pcpu,prev_etimes,curr_pcpu,curr_etimes,expected", [ - # etimes regressed -> suspected pid reuse. - (80.0, 100.0, 10.0, 2.0), - # Same instant -> interval is zero, no rate definable. - (50.0, 100.0, 50.0, 100.0), + # etimes regressed -> suspected pid reuse; fall back to curr. + (80.0, 100.0, 10.0, 2.0, 10.0), + # Same instant -> interval is zero, no rate definable; fall + # back to curr. + (50.0, 100.0, 50.0, 100.0, 50.0), ], ) -def test_instantaneous_pcpu_none( +def test_instantaneous_pcpu_falls_back_to_curr( prev_pcpu: float, prev_etimes: float, curr_pcpu: float, curr_etimes: float, + expected: float, ) -> None: - assert instantaneous_pcpu(prev_pcpu, prev_etimes, curr_pcpu, curr_etimes) is None + assert ( + instantaneous_pcpu(prev_pcpu, prev_etimes, curr_pcpu, curr_etimes) == expected + ) From a1e61ae3a881197080ef1a6e131d927fd71607ad Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Mon, 27 Apr 2026 19:13:38 -0500 Subject: [PATCH 05/10] sampling: wire delta-pcpu into Linux sampler Linux _get_sample_linux now accepts a prev-state dict mapping pid -> (pcpu_raw, etimes), looks up each pid's prior raw inputs, and computes corrected pcpu via instantaneous_pcpu. First-observed pids fall back to pcpu_raw (no prior to delta against). The dict is rebuilt fresh from the current sample, so pids that disappear between samples are dropped automatically -- no unbounded growth across long runs. Mac sampler matches the new signature for uniform dispatch but returns prev unchanged; Darwin's ps reports a 5/8-decayed EWMA, not a cumulative ratio, so the delta calc does not apply there. Report holds the prev_raw dict across collect_sample calls, initialized empty in __init__. No public-API changes; tests that construct Sample/ProcessStats directly are untouched. Limitations (also captured in PROBLEMS.md in the resource-measurement-notebook worktree): - For workloads where no pid survives long enough for etimes to advance past the previous sample's value (the con/duct#399 trigger case of sequentially-churning short-lived workers), every observation falls back to pcpu_raw and the calc provides no improvement. - Sub-second sample intervals frequently land both samples in the same etime quantum, yielding the fallback path. - The persisted pcpu_raw alongside the corrected pcpu lets a consumer detect "fallback was used" by comparing the two. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/con_duct/_sampling.py | 48 ++++++++++++++++++++++++++++++--------- src/con_duct/_tracker.py | 6 ++++- 2 files changed, 42 insertions(+), 12 deletions(-) diff --git a/src/con_duct/_sampling.py b/src/con_duct/_sampling.py index a0b9c1b1..a95480f4 100644 --- a/src/con_duct/_sampling.py +++ b/src/con_duct/_sampling.py @@ -10,7 +10,7 @@ import sys from typing import Callable, Optional from con_duct._models import Averages, ProcessStats, Sample -from con_duct._utils import etime_to_etimes +from con_duct._utils import etime_to_etimes, instantaneous_pcpu SYSTEM = platform.system() @@ -27,8 +27,11 @@ lgr = logging.getLogger("con-duct") -def _get_sample_linux(session_id: int) -> Sample: +def _get_sample_linux( + session_id: int, prev: dict[int, tuple[float, float]] +) -> tuple[Sample, dict[int, tuple[float, float]]]: sample = Sample() + new_prev: dict[int, tuple[float, float]] = {} ps_command = [ "ps", @@ -46,24 +49,37 @@ def _get_sample_linux(session_id: int) -> Sample: pid, pcpu, pmem, rss_kib, vsz_kib, etime, stat, cmd = line.split(maxsplit=7) - pcpu_value = float(pcpu) + pid_int = int(pid) + pcpu_raw_value = float(pcpu) + etimes_value = etime_to_etimes(etime) + prev_entry = prev.get(pid_int) + if prev_entry is not None: + prev_pcpu_raw, prev_etimes = prev_entry + pcpu_value = instantaneous_pcpu( + prev_pcpu_raw, prev_etimes, pcpu_raw_value, etimes_value + ) + else: + # First observation of this pid; no prior state to delta against. + pcpu_value = pcpu_raw_value + new_prev[pid_int] = (pcpu_raw_value, etimes_value) + sample.add_pid( - pid=int(pid), + pid=pid_int, stats=ProcessStats( pcpu=pcpu_value, - pcpu_raw=pcpu_value, + pcpu_raw=pcpu_raw_value, pmem=float(pmem), rss=int(rss_kib) * 1024, vsz=int(vsz_kib) * 1024, timestamp=datetime.now().astimezone().isoformat(), etime=etime, - etimes=etime_to_etimes(etime), + etimes=etimes_value, stat=Counter([stat]), cmd=cmd, ), ) sample.averages = Averages.from_sample(sample=sample) - return sample + return sample, new_prev def _try_to_get_sid(pid: int) -> int: @@ -114,7 +130,12 @@ def _add_pid_to_sample_from_line_mac( ) -def _get_sample_mac(session_id: int) -> Optional[Sample]: +def _get_sample_mac( + session_id: int, prev: dict[int, tuple[float, float]] +) -> tuple[Optional[Sample], dict[int, tuple[float, float]]]: + # Darwin's ps reports a 5/8-decayed EWMA, not a cumulative ratio, + # so the delta calc does not apply. prev is accepted for uniform + # signature with the Linux sampler and returned unchanged. sample = Sample() lines = _get_ps_lines_mac() @@ -127,7 +148,7 @@ def _get_sample_mac(session_id: int) -> Optional[Sample]: if not pid_to_matching_sid: lgr.debug(f"No processes found for session ID {session_id}.") - return None + return None, prev # collections.deque with maxlen=0 is used to approximate the # performance of list comprehension (superior to basic for-loop) @@ -143,11 +164,16 @@ def _get_sample_mac(session_id: int) -> Optional[Sample]: ) sample.averages = Averages.from_sample(sample=sample) - return sample + return sample, prev _get_sample_per_system = { "Linux": _get_sample_linux, "Darwin": _get_sample_mac, } -_get_sample: Callable[[int], Optional[Sample]] = _get_sample_per_system[SYSTEM] # type: ignore[assignment] +_get_sample: Callable[ + [int, dict[int, tuple[float, float]]], + tuple[Optional[Sample], dict[int, tuple[float, float]]], +] = _get_sample_per_system[ + SYSTEM +] # type: ignore[assignment] diff --git a/src/con_duct/_tracker.py b/src/con_duct/_tracker.py index 1d563b1a..e9577362 100644 --- a/src/con_duct/_tracker.py +++ b/src/con_duct/_tracker.py @@ -56,6 +56,10 @@ def __init__( self.system_info: SystemInfo | None = None self.full_run_stats = Sample() self.current_sample: Optional[Sample] = None + # Per-pid (pcpu_raw, etimes) carried forward across samples + # so the Linux sampler can compute instantaneous %CPU as a + # delta. Mac sampler accepts but does not consume it. + self.prev_raw: dict[int, tuple[float, float]] = {} self.end_time: float | None = None self.run_time_seconds: str | None = None self.usage_file: TextIO | None = None @@ -135,7 +139,7 @@ def get_system_info(self) -> None: def collect_sample(self) -> Optional[Sample]: assert self.session_id is not None try: - sample = _get_sample(self.session_id) + sample, self.prev_raw = _get_sample(self.session_id, self.prev_raw) return sample except subprocess.CalledProcessError as exc: # when session_id has no processes lgr.debug("Error collecting sample: %s", str(exc)) From 3953290860173ff1ac483b1c97cdf47ca53f7a71 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Tue, 28 Apr 2026 09:34:13 -0500 Subject: [PATCH 06/10] constants: bump schema_version to 0.3.0 ProcessStats gained two required fields (pcpu_raw, etimes) and pcpu's semantics shifted on Linux from "lifetime average" to "corrected when computable, equals pcpu_raw at fallback." The schema is additive on the JSON wire (existing keys keep their names), but consumers reading new files with old expectations should know the meaning of pcpu/total_pcpu has changed -- hence the minor bump. ls.py's ensure_compliant_schema patches info.json fields only (working_directory at 0.2.1, message at 0.2.2). No 0.3.0 entry is needed there: the additions live on usage.jsonl ProcessStats, and no consumer in this codebase reconstructs ProcessStats from parsed JSON (all read dicts directly). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/con_duct/_constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/con_duct/_constants.py b/src/con_duct/_constants.py index db84702f..07457347 100644 --- a/src/con_duct/_constants.py +++ b/src/con_duct/_constants.py @@ -1,6 +1,6 @@ """Constants used throughout con-duct.""" -__schema_version__ = "0.2.2" +__schema_version__ = "0.3.0" ENV_PREFIXES = ("PBS_", "SLURM_", "OSG") SUFFIXES = { From dd18f13f4224c01a2f04392cc58634c0c05190a9 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Tue, 28 Apr 2026 09:42:51 -0500 Subject: [PATCH 07/10] tests: cover prev-state threading in _get_sample_linux The unit tests in test_duct_utils.py exercise instantaneous_pcpu in isolation, but nothing covers the wiring layer between it and the live sampler -- the dict-keyed prev lookup, the first-observation fallback, and the rebuild-from-scratch behavior of new_prev that drops vanished pids. Three-call test with a mocked ps subprocess: - Call 1: empty prev -> pcpu falls back to pcpu_raw. - Call 2: corrected via instantaneous_pcpu (matches the constant 84% case from the parameterized green tests). - Call 3: pid disappears, new pid appears -> new_prev contains only the surviving pid. Catches regressions where the (pcpu_raw, etimes) tuple order is swapped, the prev lookup is dropped, or new_prev accumulates stale entries. Co-Authored-By: Claude Opus 4.7 (1M context) --- test/duct_main/test_sampling.py | 49 +++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 test/duct_main/test_sampling.py diff --git a/test/duct_main/test_sampling.py b/test/duct_main/test_sampling.py new file mode 100644 index 00000000..8eec9db8 --- /dev/null +++ b/test/duct_main/test_sampling.py @@ -0,0 +1,49 @@ +"""Tests for the platform-specific samplers in _sampling.py.""" + +from __future__ import annotations +from unittest import mock +from con_duct._sampling import _get_sample_linux + +_HEADER = " PID %CPU %MEM RSS VSZ ELAPSED STAT CMD\n" + + +@mock.patch("con_duct._sampling.subprocess.check_output") +def test_get_sample_linux_threads_prev_state( + mock_check_output: mock.MagicMock, +) -> None: + """Three successive calls round-trip the prev-state dict. + + Call 1: no prior -> pcpu falls back to pcpu_raw. + Call 2: prior present -> pcpu computed via instantaneous_pcpu. + Call 3: pid 1234 vanishes, pid 5678 appears -> new_prev is + rebuilt fresh, dropping stale entries automatically. + """ + out_1 = _HEADER + " 1234 80.0 1.0 1024 2048 00:10 R cmd\n" + # pcpu=82 at etimes=20 against pcpu=80 at etimes=10 yields a + # constant 84% load across the 10s interval (mirrors one of the + # test_instantaneous_pcpu_green parameterized cases). + out_2 = _HEADER + " 1234 82.0 1.0 1024 2048 00:20 R cmd\n" + out_3 = _HEADER + " 5678 50.0 1.0 512 1024 00:05 R other\n" + mock_check_output.side_effect = [out_1, out_2, out_3] + + sample1, prev1 = _get_sample_linux(session_id=42, prev={}) + assert prev1 == {1234: (80.0, 10.0)} + stats1 = sample1.stats[1234] + assert stats1.pcpu == 80.0 + assert stats1.pcpu_raw == 80.0 + assert stats1.etimes == 10.0 + assert stats1.rss == 1024 * 1024 # KiB -> bytes + assert stats1.vsz == 2048 * 1024 + + sample2, prev2 = _get_sample_linux(session_id=42, prev=prev1) + assert prev2 == {1234: (82.0, 20.0)} + stats2 = sample2.stats[1234] + assert stats2.pcpu_raw == 82.0 + assert stats2.etimes == 20.0 + assert stats2.pcpu == 84.0 + + sample3, prev3 = _get_sample_linux(session_id=42, prev=prev2) + assert prev3 == {5678: (50.0, 5.0)} + assert 1234 not in sample3.stats + # First observation of 5678 -> fallback to pcpu_raw. + assert sample3.stats[5678].pcpu == 50.0 From ba244003f934e2afa33c8607674fcdc2e5518483 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Tue, 28 Apr 2026 09:43:36 -0500 Subject: [PATCH 08/10] tests: cover prev_raw round-trip in Report.collect_sample One of the load-bearing decisions in the delta-pcpu work is that the Tracker holds the cross-sample state (rather than a module-level dict or a sampler-as-class). Without a regression test, refactoring self.prev_raw to a local would silently disable the calc on every sample after the first. Two-call test mocking _get_sample with a side_effect that inspects each invocation's input prev. Asserts the first call gets {} and the second gets the dict returned from the first; asserts report.prev_raw lands on the latest returned value after each call. Co-Authored-By: Claude Opus 4.7 (1M context) --- test/duct_main/test_report.py | 43 +++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/test/duct_main/test_report.py b/test/duct_main/test_report.py index 9982ca7a..fc0a9102 100644 --- a/test/duct_main/test_report.py +++ b/test/duct_main/test_report.py @@ -240,6 +240,49 @@ def test_system_info_sanity(mock_log_paths: mock.MagicMock) -> None: assert report.system_info.user == os.environ.get("USER") +@mock.patch("con_duct._tracker._get_sample") +@mock.patch("con_duct._tracker.LogPaths") +def test_collect_sample_round_trips_prev_raw( + mock_log_paths: mock.MagicMock, mock_get_sample: mock.MagicMock +) -> None: + """Report.collect_sample threads its prev_raw dict across calls. + + Each invocation of _get_sample receives the prev_raw returned by + the previous invocation (or {} on the first call) and the + returned new prev_raw lands on self.prev_raw for the next call. + Regression guard for the load-bearing decision that the Tracker + holds the cross-sample state. + """ + mock_log_paths.prefix = "mock_prefix" + cwd = os.getcwd() + report = Report( + "_cmd", [], mock_log_paths, EXECUTION_SUMMARY_FORMAT, cwd, clobber=False + ) + report.session_id = 42 + + received: list[dict] = [] + returned: list[dict] = [ + {1234: (80.0, 10.0)}, + {1234: (82.0, 20.0)}, + ] + + def fake_get_sample( + _session_id: int, prev: dict[int, tuple[float, float]] + ) -> tuple[Sample, dict[int, tuple[float, float]]]: + received.append(dict(prev)) + return Sample(), returned[len(received) - 1] + + mock_get_sample.side_effect = fake_get_sample + + report.collect_sample() + assert received[0] == {} + assert report.prev_raw == {1234: (80.0, 10.0)} + + report.collect_sample() + assert received[1] == {1234: (80.0, 10.0)} + assert report.prev_raw == {1234: (82.0, 20.0)} + + @mock.patch("con_duct._tracker.shutil.which") @mock.patch("con_duct._tracker.subprocess.check_output") @mock.patch("con_duct._tracker.LogPaths") From a945ea6005b8eea08d11d8cc552fb681cadab392 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Tue, 28 Apr 2026 13:09:49 -0500 Subject: [PATCH 09/10] sampling: drop young pids (etime=="00:00") by default Adopts smon's pattern of skipping pids whose ps elapsed-time reads "00:00" -- pids too young for ps to report a meaningful etime. Mitigates the con/duct#399 churn-of-short-lived-workers case more aggressively than the pcpu_raw fallback alone, by preventing inflated lifetime ratios from ever entering the sample. Tradeoff: loses the entire record (cmd/rss/vsz/pmem) for any pid ps reports with etime=="00:00", not just the pcpu field. Pids that survive long enough for etime to tick past 1s are observed normally on subsequent samples. Toggled by DROP_YOUNG_PIDS in _constants.py rather than a CLI flag for now. Promoting it to an env-var + CLI flag (matching the DUCT_SAMPLE_INTERVAL / DUCT_REPORT_INTERVAL pattern) is tracked as a follow-up; see PR description. Empty-sample handling: _get_sample_linux now returns (None, new_prev) when every pid in a ps round is filtered (common at process startup before etime ticks past 1s). Mirrors the existing _get_sample_mac no-pids path so the monitor loop skips the round rather than crashing on Averages.from_sample's not-None assertions. The return type narrows from Sample to Optional[Sample] to match. Tests: - test_sampling.py grows a young-pid line in the threading test (asserts both sample and new_prev exclude it) and a dedicated all-young-dropped test (asserts the (None, new_prev) return). Both opt into DROP_YOUNG_PIDS=True via enable_drop_young_pids applied as a usefixtures mark. - test/conftest.py gains a session-autouse fixture that flips DROP_YOUNG_PIDS=False for in-process tests (so pre-existing fixtures asserting on sub-second workloads keep working) and a function-scoped enable_drop_young_pids opt-in for unit tests that specifically exercise the drop. The session override reaches in-process tests only; subprocess tests in test_e2e.py inherit the production default. - test_e2e.py: sample-asserting tests (test_spawn_children, test_session_modes, test_session_mode_behavior_difference) bumped from sub-second to >=1s sleeps. Tests that don't assert on samples (test_sanity, test_logging_levels) keep their short sleeps -- the empty-sample handling makes them exit cleanly. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/con_duct/_constants.py | 8 +++++++ src/con_duct/_sampling.py | 18 +++++++++++++++- test/conftest.py | 33 ++++++++++++++++++++++++++++ test/duct_main/test_e2e.py | 12 ++++++++--- test/duct_main/test_sampling.py | 38 ++++++++++++++++++++++++++++++++- 5 files changed, 104 insertions(+), 5 deletions(-) diff --git a/src/con_duct/_constants.py b/src/con_duct/_constants.py index 07457347..787ea6bf 100644 --- a/src/con_duct/_constants.py +++ b/src/con_duct/_constants.py @@ -2,6 +2,14 @@ __schema_version__ = "0.3.0" +# Skip pids whose ps `etime` reads "00:00" -- i.e. too young for ps +# to report a non-zero elapsed time. Mitigates con/duct#399's +# churn-of-short-lived-workers case at the cost of the whole +# record (cmd/rss/vsz/pmem) for that pid in that sample. Mirrors +# the smon ancestor's pattern. Module-level toggle for now; +# elevating to a CLI flag later is a one-line wire-up. +DROP_YOUNG_PIDS = True + ENV_PREFIXES = ("PBS_", "SLURM_", "OSG") SUFFIXES = { "stdout": "stdout", diff --git a/src/con_duct/_sampling.py b/src/con_duct/_sampling.py index a95480f4..baef2ccd 100644 --- a/src/con_duct/_sampling.py +++ b/src/con_duct/_sampling.py @@ -9,6 +9,7 @@ import subprocess import sys from typing import Callable, Optional +from con_duct._constants import DROP_YOUNG_PIDS from con_duct._models import Averages, ProcessStats, Sample from con_duct._utils import etime_to_etimes, instantaneous_pcpu @@ -29,7 +30,7 @@ def _get_sample_linux( session_id: int, prev: dict[int, tuple[float, float]] -) -> tuple[Sample, dict[int, tuple[float, float]]]: +) -> tuple[Optional[Sample], dict[int, tuple[float, float]]]: sample = Sample() new_prev: dict[int, tuple[float, float]] = {} @@ -49,6 +50,14 @@ def _get_sample_linux( pid, pcpu, pmem, rss_kib, vsz_kib, etime, stat, cmd = line.split(maxsplit=7) + if DROP_YOUNG_PIDS and etime == "00:00": + # Pid too young for ps to report a non-zero elapsed + # time; skip the whole record (smon's pattern). Loses + # rss/vsz/cmd for this sample, but avoids inflating + # pcpu_raw via the first-observation fallback for the + # con/duct#399 churn case. + continue + pid_int = int(pid) pcpu_raw_value = float(pcpu) etimes_value = etime_to_etimes(etime) @@ -78,6 +87,13 @@ def _get_sample_linux( cmd=cmd, ), ) + if not sample.stats: + # Every pid was filtered (e.g. DROP_YOUNG_PIDS dropped all + # the youngest observations) or ps returned no rows. Mirror + # _get_sample_mac's no-pids path so the monitor loop skips + # this round rather than crashing on Averages.from_sample's + # not-None assertions. + return None, new_prev sample.averages = Averages.from_sample(sample=sample) return sample, new_prev diff --git a/test/conftest.py b/test/conftest.py index 04765bb3..b07eef51 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -23,6 +23,39 @@ def set_test_config() -> Generator: del os.environ[k] +@pytest.fixture(scope="session", autouse=True) +def disable_drop_young_pids_for_tests() -> Generator: + """Disable the DROP_YOUNG_PIDS sampler filter for in-process + tests. Affects only the test process's interpreter; subprocess + tests (e.g. test_e2e.py) inherit the production default and + must use commands that outlive ps's 1-second etime quantum. + """ + import con_duct._sampling as sampling_module + + orig = sampling_module.DROP_YOUNG_PIDS + sampling_module.DROP_YOUNG_PIDS = False + yield + sampling_module.DROP_YOUNG_PIDS = orig + + +@pytest.fixture +def enable_drop_young_pids(monkeypatch: pytest.MonkeyPatch) -> None: + """Opt back into the production DROP_YOUNG_PIDS filter for a + single in-process test (used by unit tests in test_sampling.py + that specifically exercise the drop behavior). + + Only effective for tests that exercise duct via in-process + Python imports (e.g. test_sampling.py with mocked ps, + test_execution.py via run_duct_command). Has no effect on + subprocess-based tests (test_e2e.py), which spawn fresh Python + interpreters that read DROP_YOUNG_PIDS from _constants.py + directly. Subprocess tests must instead use workloads that + outlive ps's 1-second etime quantum if they assert on + observability under the production default. + """ + monkeypatch.setattr("con_duct._sampling.DROP_YOUNG_PIDS", True) + + @pytest.fixture(autouse=True) def reset_logger_state() -> Generator: """Automatically reset logger state after each test. diff --git a/test/duct_main/test_e2e.py b/test/duct_main/test_e2e.py index 62aefc39..2c483d8f 100644 --- a/test/duct_main/test_e2e.py +++ b/test/duct_main/test_e2e.py @@ -36,7 +36,9 @@ def test_spawn_children( ) -> None: duct_prefix = f"{temp_output_dir}log_" script_path = TEST_SCRIPT_DIR / "spawn_children.sh" - dur = "0.3" + # >=1s outlives ps's etime quantum so the production + # DROP_YOUNG_PIDS filter doesn't drop the spawned children. + dur = "2" command = ( f"{duct_cmd} -q --s-i 0.001 --r-i 0.01 " f"-p {duct_prefix} {script_path} {mode} {num_children} {dur}" @@ -64,7 +66,9 @@ def test_spawn_children( def test_session_modes(temp_output_dir: str, duct_cmd: str, session_mode: str) -> None: """Test that both session modes work correctly and collect appropriate data.""" duct_prefix = f"{temp_output_dir}log_" - command = f"{duct_cmd} -q --s-i 0.01 --r-i 0.05 --mode {session_mode} -p {duct_prefix} sleep 0.3" + # >=1s outlives ps's etime quantum so the production + # DROP_YOUNG_PIDS filter leaves observable samples. + command = f"{duct_cmd} -q --s-i 0.01 --r-i 0.05 --mode {session_mode} -p {duct_prefix} sleep 2" subprocess.check_output(command, shell=True) # Check that log files were created @@ -113,7 +117,9 @@ def test_session_mode_behavior_difference(temp_output_dir: str, duct_cmd: str) - new_session_prefix = f"{temp_output_dir}new_" current_session_prefix = f"{temp_output_dir}current_" - # Run duct with new-session mode - should NOT see background process + # Run duct with new-session mode - should NOT see background process. + # >=1s outlives ps's etime quantum so the production + # DROP_YOUNG_PIDS filter leaves observable samples. subprocess.check_output( f"{duct_cmd} -q --s-i 0.01 --r-i 0.05 --mode new-session -p {new_session_prefix} sleep 2", shell=True, diff --git a/test/duct_main/test_sampling.py b/test/duct_main/test_sampling.py index 8eec9db8..8619d760 100644 --- a/test/duct_main/test_sampling.py +++ b/test/duct_main/test_sampling.py @@ -2,11 +2,13 @@ from __future__ import annotations from unittest import mock +import pytest from con_duct._sampling import _get_sample_linux _HEADER = " PID %CPU %MEM RSS VSZ ELAPSED STAT CMD\n" +@pytest.mark.usefixtures("enable_drop_young_pids") @mock.patch("con_duct._sampling.subprocess.check_output") def test_get_sample_linux_threads_prev_state( mock_check_output: mock.MagicMock, @@ -23,10 +25,17 @@ def test_get_sample_linux_threads_prev_state( # constant 84% load across the 10s interval (mirrors one of the # test_instantaneous_pcpu_green parameterized cases). out_2 = _HEADER + " 1234 82.0 1.0 1024 2048 00:20 R cmd\n" - out_3 = _HEADER + " 5678 50.0 1.0 512 1024 00:05 R other\n" + # 9999 has etime "00:00" -> dropped per DROP_YOUNG_PIDS; + # 5678 first-observed with non-zero etime -> fallback path. + out_3 = ( + _HEADER + + " 5678 50.0 1.0 512 1024 00:05 R other\n" + + " 9999 99.0 1.0 256 512 00:00 R young\n" + ) mock_check_output.side_effect = [out_1, out_2, out_3] sample1, prev1 = _get_sample_linux(session_id=42, prev={}) + assert sample1 is not None assert prev1 == {1234: (80.0, 10.0)} stats1 = sample1.stats[1234] assert stats1.pcpu == 80.0 @@ -36,6 +45,7 @@ def test_get_sample_linux_threads_prev_state( assert stats1.vsz == 2048 * 1024 sample2, prev2 = _get_sample_linux(session_id=42, prev=prev1) + assert sample2 is not None assert prev2 == {1234: (82.0, 20.0)} stats2 = sample2.stats[1234] assert stats2.pcpu_raw == 82.0 @@ -43,7 +53,33 @@ def test_get_sample_linux_threads_prev_state( assert stats2.pcpu == 84.0 sample3, prev3 = _get_sample_linux(session_id=42, prev=prev2) + assert sample3 is not None assert prev3 == {5678: (50.0, 5.0)} assert 1234 not in sample3.stats # First observation of 5678 -> fallback to pcpu_raw. assert sample3.stats[5678].pcpu == 50.0 + # 9999 had etime="00:00" -> dropped from sample and prev. + assert 9999 not in sample3.stats + assert 9999 not in prev3 + + +@pytest.mark.usefixtures("enable_drop_young_pids") +@mock.patch("con_duct._sampling.subprocess.check_output") +def test_get_sample_linux_returns_none_when_all_dropped( + mock_check_output: mock.MagicMock, +) -> None: + """When every pid in ps output is filtered (e.g. all young), + return (None, new_prev) instead of an empty Sample. Matches + _get_sample_mac's no-pids path; lets the monitor loop skip the + round rather than crash on Averages.from_sample's not-None + assertions. + """ + out = ( + _HEADER + + " 1111 99.0 1.0 256 512 00:00 R young_a\n" + + " 2222 99.0 1.0 256 512 00:00 R young_b\n" + ) + mock_check_output.side_effect = [out] + sample, prev = _get_sample_linux(session_id=42, prev={}) + assert sample is None + assert prev == {} From 0d742f51a8cea3cb42e9fab27e7e1ea598943025 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Tue, 28 Apr 2026 14:35:03 -0500 Subject: [PATCH 10/10] cli: warn on sub-1s sample-interval MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ps reports etime at 1-second resolution, so configured sample-interval below 1s produces phase artifacts in the corrected pcpu calc -- adjacent sample pairs frequently land in the same etime quantum, the calc falls back to pcpu_raw (lifetime ratio), and short-lived pids with high startup CPU show as false spikes. At the default 1.0s with positive sampling overhead the issue does not occur in practice (actual interval >= 1s, Δetime >= 1 in steady state). The warning fires only when the user explicitly configures a sub-1s interval; --quiet and --log-level NONE suppress it through the normal log machinery. Bumping the default to 2.0s for an extra safety margin is captured as a TODO in the PR description; not a correctness fix, just a conservatism call. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/con_duct/cli.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/con_duct/cli.py b/src/con_duct/cli.py index 258d00aa..5039da61 100644 --- a/src/con_duct/cli.py +++ b/src/con_duct/cli.py @@ -461,6 +461,12 @@ def _create_ls_parser() -> argparse.ArgumentParser: def run_command(args: argparse.Namespace) -> int: """Execute a command with duct monitoring.""" + if args.sample_interval < 1.0: + lgr.warning( + "sample-interval %ss is below ps's 1s etime resolution; " + "this can cause phase artifacts (false pcpu spikes) on Linux.", + args.sample_interval, + ) kwargs = vars(args).copy() # Remove arguments that are not for duct_execute for key in ("func", "log_level", "quiet"):