diff --git a/src/con_duct/_constants.py b/src/con_duct/_constants.py index db84702f..787ea6bf 100644 --- a/src/con_duct/_constants.py +++ b/src/con_duct/_constants.py @@ -1,6 +1,14 @@ """Constants used throughout con-duct.""" -__schema_version__ = "0.2.2" +__schema_version__ = "0.3.0" + +# Skip pids whose ps `etime` reads "00:00" -- i.e. too young for ps +# to report a non-zero elapsed time. Mitigates con/duct#399's +# churn-of-short-lived-workers case at the cost of the whole +# record (cmd/rss/vsz/pmem) for that pid in that sample. Mirrors +# the smon ancestor's pattern. Module-level toggle for now; +# elevating to a CLI flag later is a one-line wire-up. +DROP_YOUNG_PIDS = True ENV_PREFIXES = ("PBS_", "SLURM_", "OSG") SUFFIXES = { diff --git a/src/con_duct/_models.py b/src/con_duct/_models.py index d3400893..414db429 100644 --- a/src/con_duct/_models.py +++ b/src/con_duct/_models.py @@ -66,12 +66,14 @@ class SystemInfo: @dataclass class ProcessStats: - pcpu: float # %CPU + pcpu: float # %CPU (corrected; equals pcpu_raw until calc is wired) + pcpu_raw: float # %CPU as reported by ps; preserved for audit pmem: float # %MEM rss: int # Memory Resident Set Size in Bytes vsz: int # Virtual Memory size in Bytes timestamp: str - etime: str + etime: str # raw ps etime string; legacy, see etimes + etimes: float # elapsed seconds (parsed from etime) stat: Counter cmd: str @@ -93,11 +95,13 @@ def aggregate(self, other: ProcessStats) -> ProcessStats: new_counter.update(other.stat) return ProcessStats( pcpu=max(self.pcpu, other.pcpu), + pcpu_raw=max(self.pcpu_raw, other.pcpu_raw), pmem=max(self.pmem, other.pmem), rss=max(self.rss, other.rss), vsz=max(self.vsz, other.vsz), timestamp=max(self.timestamp, other.timestamp), etime=other.etime, # For the aggregate always take the latest + etimes=other.etimes, stat=new_counter, cmd=cmd, ) @@ -111,7 +115,7 @@ def __post_init__(self) -> None: self._validate() def _validate(self) -> None: - assert_num(self.pcpu, self.pmem, self.rss, self.vsz) + assert_num(self.pcpu, self.pcpu_raw, self.pmem, self.rss, self.vsz, self.etimes) @dataclass diff --git a/src/con_duct/_sampling.py b/src/con_duct/_sampling.py index caa90517..baef2ccd 100644 --- a/src/con_duct/_sampling.py +++ b/src/con_duct/_sampling.py @@ -9,7 +9,9 @@ import subprocess import sys from typing import Callable, Optional +from con_duct._constants import DROP_YOUNG_PIDS from con_duct._models import Averages, ProcessStats, Sample +from con_duct._utils import etime_to_etimes, instantaneous_pcpu SYSTEM = platform.system() @@ -26,8 +28,11 @@ lgr = logging.getLogger("con-duct") -def _get_sample_linux(session_id: int) -> Sample: +def _get_sample_linux( + session_id: int, prev: dict[int, tuple[float, float]] +) -> tuple[Optional[Sample], dict[int, tuple[float, float]]]: sample = Sample() + new_prev: dict[int, tuple[float, float]] = {} ps_command = [ "ps", @@ -45,21 +50,52 @@ def _get_sample_linux(session_id: int) -> Sample: pid, pcpu, pmem, rss_kib, vsz_kib, etime, stat, cmd = line.split(maxsplit=7) + if DROP_YOUNG_PIDS and etime == "00:00": + # Pid too young for ps to report a non-zero elapsed + # time; skip the whole record (smon's pattern). Loses + # rss/vsz/cmd for this sample, but avoids inflating + # pcpu_raw via the first-observation fallback for the + # con/duct#399 churn case. + continue + + pid_int = int(pid) + pcpu_raw_value = float(pcpu) + etimes_value = etime_to_etimes(etime) + prev_entry = prev.get(pid_int) + if prev_entry is not None: + prev_pcpu_raw, prev_etimes = prev_entry + pcpu_value = instantaneous_pcpu( + prev_pcpu_raw, prev_etimes, pcpu_raw_value, etimes_value + ) + else: + # First observation of this pid; no prior state to delta against. + pcpu_value = pcpu_raw_value + new_prev[pid_int] = (pcpu_raw_value, etimes_value) + sample.add_pid( - pid=int(pid), + pid=pid_int, stats=ProcessStats( - pcpu=float(pcpu), + pcpu=pcpu_value, + pcpu_raw=pcpu_raw_value, pmem=float(pmem), rss=int(rss_kib) * 1024, vsz=int(vsz_kib) * 1024, timestamp=datetime.now().astimezone().isoformat(), etime=etime, + etimes=etimes_value, stat=Counter([stat]), cmd=cmd, ), ) + if not sample.stats: + # Every pid was filtered (e.g. DROP_YOUNG_PIDS dropped all + # the youngest observations) or ps returned no rows. Mirror + # _get_sample_mac's no-pids path so the monitor loop skips + # this round rather than crashing on Averages.from_sample's + # not-None assertions. + return None, new_prev sample.averages = Averages.from_sample(sample=sample) - return sample + return sample, new_prev def _try_to_get_sid(pid: int) -> int: @@ -92,22 +128,30 @@ def _add_pid_to_sample_from_line_mac( pid, pcpu, pmem, rss_kb, vsz_kb, etime, stat, cmd = line.split(maxsplit=7) if pid_to_matching_sid.get(int(pid)) is not None: + pcpu_value = float(pcpu) sample.add_pid( pid=int(pid), stats=ProcessStats( - pcpu=float(pcpu), + pcpu=pcpu_value, + pcpu_raw=pcpu_value, pmem=float(pmem), rss=int(rss_kb) * 1024, vsz=int(vsz_kb) * 1024, timestamp=datetime.now().astimezone().isoformat(), etime=etime, + etimes=etime_to_etimes(etime), stat=Counter([stat]), cmd=cmd, ), ) -def _get_sample_mac(session_id: int) -> Optional[Sample]: +def _get_sample_mac( + session_id: int, prev: dict[int, tuple[float, float]] +) -> tuple[Optional[Sample], dict[int, tuple[float, float]]]: + # Darwin's ps reports a 5/8-decayed EWMA, not a cumulative ratio, + # so the delta calc does not apply. prev is accepted for uniform + # signature with the Linux sampler and returned unchanged. sample = Sample() lines = _get_ps_lines_mac() @@ -120,7 +164,7 @@ def _get_sample_mac(session_id: int) -> Optional[Sample]: if not pid_to_matching_sid: lgr.debug(f"No processes found for session ID {session_id}.") - return None + return None, prev # collections.deque with maxlen=0 is used to approximate the # performance of list comprehension (superior to basic for-loop) @@ -136,11 +180,16 @@ def _get_sample_mac(session_id: int) -> Optional[Sample]: ) sample.averages = Averages.from_sample(sample=sample) - return sample + return sample, prev _get_sample_per_system = { "Linux": _get_sample_linux, "Darwin": _get_sample_mac, } -_get_sample: Callable[[int], Optional[Sample]] = _get_sample_per_system[SYSTEM] # type: ignore[assignment] +_get_sample: Callable[ + [int, dict[int, tuple[float, float]]], + tuple[Optional[Sample], dict[int, tuple[float, float]]], +] = _get_sample_per_system[ + SYSTEM +] # type: ignore[assignment] diff --git a/src/con_duct/_tracker.py b/src/con_duct/_tracker.py index 1d563b1a..e9577362 100644 --- a/src/con_duct/_tracker.py +++ b/src/con_duct/_tracker.py @@ -56,6 +56,10 @@ def __init__( self.system_info: SystemInfo | None = None self.full_run_stats = Sample() self.current_sample: Optional[Sample] = None + # Per-pid (pcpu_raw, etimes) carried forward across samples + # so the Linux sampler can compute instantaneous %CPU as a + # delta. Mac sampler accepts but does not consume it. + self.prev_raw: dict[int, tuple[float, float]] = {} self.end_time: float | None = None self.run_time_seconds: str | None = None self.usage_file: TextIO | None = None @@ -135,7 +139,7 @@ def get_system_info(self) -> None: def collect_sample(self) -> Optional[Sample]: assert self.session_id is not None try: - sample = _get_sample(self.session_id) + sample, self.prev_raw = _get_sample(self.session_id, self.prev_raw) return sample except subprocess.CalledProcessError as exc: # when session_id has no processes lgr.debug("Error collecting sample: %s", str(exc)) diff --git a/src/con_duct/_utils.py b/src/con_duct/_utils.py index 6ccd29c0..ac2d5148 100644 --- a/src/con_duct/_utils.py +++ b/src/con_duct/_utils.py @@ -1,5 +1,6 @@ """Utility functions for con-duct.""" +from __future__ import annotations from typing import Any @@ -8,6 +9,75 @@ def assert_num(*values: Any) -> None: assert isinstance(value, (float, int)) +# TODO: consider asking ps for `etimes` (seconds) directly via +# `-o etimes` instead of parsing `etime`. Even if we switch, this +# parser is worth keeping for backwards compatibility with existing +# usage.jsonl logs that persist `etime` as a string. +def etime_to_etimes(etime: str) -> float: + """Parse a ps ``etime`` string into seconds. + + ps's ``etime`` format is ``[[DD-]HH:]MM:SS``: ``MM:SS`` always, + with ``HH:`` prepended after one hour and ``DD-`` prepended after + one day. + + :param etime: elapsed-time string from ``ps -o etime``. + :returns: elapsed time in seconds. + :raises ValueError: if ``etime`` does not match the expected shape. + """ + if "-" in etime: + days_str, rest = etime.split("-", 1) + days = int(days_str) + else: + days = 0 + rest = etime + parts = rest.split(":") + if len(parts) == 2: + hours, minutes, seconds = 0, int(parts[0]), int(parts[1]) + elif len(parts) == 3: + hours, minutes, seconds = int(parts[0]), int(parts[1]), int(parts[2]) + else: + raise ValueError(f"Unparsable etime: {etime!r}") + return float(days * 86400 + hours * 3600 + minutes * 60 + seconds) + + +def instantaneous_pcpu( + prev_pcpu: float, + prev_etimes: float, + curr_pcpu: float, + curr_etimes: float, +) -> float: + """Instantaneous %CPU between two ps samples of the same pid. + + Inverts the procps identity ``pcpu = cputime / etime * 100`` to + recover cputime at each sample, takes the cputime delta, and + divides by the elapsed interval. The ``/100`` and ``*100`` + cancel, so the result is in the same units as ``pcpu``. + + Linux-only: assumes ``pcpu`` is the cumulative ``cputime/etime`` + ratio. Invalid on Darwin (decayed EWMA). + + Precision floor: ps reports ``etime`` at 1-second resolution, so + at sample intervals near 1s this function frequently falls back + to ``curr_pcpu`` (see PROBLEMS.md in the resource-measurement + notebook). + + :param prev_pcpu: %CPU from the earlier sample. + :param prev_etimes: elapsed seconds at the earlier sample. + :param curr_pcpu: %CPU from the later sample. + :param curr_etimes: elapsed seconds at the later sample. + :returns: instantaneous %CPU over the interval. Falls back to + ``curr_pcpu`` when the interval is non-positive (etimes did + not advance, or regressed -- suspected pid reuse). The + fallback keeps the value honest about "something is here" + rather than emitting a silent zero; a sophisticated consumer + can detect fallback by comparing against ``pcpu_raw``. + """ + interval = curr_etimes - prev_etimes + if interval <= 0: + return curr_pcpu + return (curr_pcpu * curr_etimes - prev_pcpu * prev_etimes) / interval + + def parse_version(version_str: str) -> tuple[int, int, int]: x_y_z = version_str.split(".") if len(x_y_z) != 3: diff --git a/src/con_duct/cli.py b/src/con_duct/cli.py index 258d00aa..5039da61 100644 --- a/src/con_duct/cli.py +++ b/src/con_duct/cli.py @@ -461,6 +461,12 @@ def _create_ls_parser() -> argparse.ArgumentParser: def run_command(args: argparse.Namespace) -> int: """Execute a command with duct monitoring.""" + if args.sample_interval < 1.0: + lgr.warning( + "sample-interval %ss is below ps's 1s etime resolution; " + "this can cause phase artifacts (false pcpu spikes) on Linux.", + args.sample_interval, + ) kwargs = vars(args).copy() # Remove arguments that are not for duct_execute for key in ("func", "log_level", "quiet"): diff --git a/test/conftest.py b/test/conftest.py index 04765bb3..b07eef51 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -23,6 +23,39 @@ def set_test_config() -> Generator: del os.environ[k] +@pytest.fixture(scope="session", autouse=True) +def disable_drop_young_pids_for_tests() -> Generator: + """Disable the DROP_YOUNG_PIDS sampler filter for in-process + tests. Affects only the test process's interpreter; subprocess + tests (e.g. test_e2e.py) inherit the production default and + must use commands that outlive ps's 1-second etime quantum. + """ + import con_duct._sampling as sampling_module + + orig = sampling_module.DROP_YOUNG_PIDS + sampling_module.DROP_YOUNG_PIDS = False + yield + sampling_module.DROP_YOUNG_PIDS = orig + + +@pytest.fixture +def enable_drop_young_pids(monkeypatch: pytest.MonkeyPatch) -> None: + """Opt back into the production DROP_YOUNG_PIDS filter for a + single in-process test (used by unit tests in test_sampling.py + that specifically exercise the drop behavior). + + Only effective for tests that exercise duct via in-process + Python imports (e.g. test_sampling.py with mocked ps, + test_execution.py via run_duct_command). Has no effect on + subprocess-based tests (test_e2e.py), which spawn fresh Python + interpreters that read DROP_YOUNG_PIDS from _constants.py + directly. Subprocess tests must instead use workloads that + outlive ps's 1-second etime quantum if they assert on + observability under the production default. + """ + monkeypatch.setattr("con_duct._sampling.DROP_YOUNG_PIDS", True) + + @pytest.fixture(autouse=True) def reset_logger_state() -> Generator: """Automatically reset logger state after each test. diff --git a/test/duct_main/test_aggregation.py b/test/duct_main/test_aggregation.py index fb6925ca..6c9e4dcf 100644 --- a/test/duct_main/test_aggregation.py +++ b/test/duct_main/test_aggregation.py @@ -10,54 +10,64 @@ stat0 = ProcessStats( pcpu=0.0, + pcpu_raw=0.0, pmem=0, rss=0, vsz=0, timestamp="2024-06-11T10:09:37-04:00", etime="00:00", + etimes=0.0, cmd="cmd 0", stat=Counter(["stat0"]), ) stat1 = ProcessStats( pcpu=1.0, + pcpu_raw=1.0, pmem=1.0, rss=1, vsz=1, timestamp="2024-06-11T10:13:23-04:00", etime="00:02", + etimes=2.0, cmd="cmd 1", stat=Counter(["stat1"]), ) stat2 = ProcessStats( pcpu=2.0, + pcpu_raw=2.0, pmem=2.0, rss=2, vsz=2, timestamp="2024-06-11T10:13:23-04:00", etime="00:02", + etimes=2.0, cmd="cmd 2", stat=Counter(["stat2"]), ) stat100 = ProcessStats( pcpu=100.0, + pcpu_raw=100.0, pmem=100.0, rss=2, vsz=2, timestamp="2024-06-11T10:13:23-04:00", etime="00:02", + etimes=2.0, cmd="cmd 100", stat=Counter(["stat100"]), ) stat_big = ProcessStats( pcpu=20000.0, + pcpu_raw=20000.0, pmem=21234234.0, rss=43645634562, vsz=2345234523452342, timestamp="2024-06-11T10:13:23-04:00", etime="00:02", + etimes=2.0, cmd="cmd 2", stat=Counter(["statbig"]), ) diff --git a/test/duct_main/test_duct_utils.py b/test/duct_main/test_duct_utils.py index c25a4991..a14d890e 100644 --- a/test/duct_main/test_duct_utils.py +++ b/test/duct_main/test_duct_utils.py @@ -1,7 +1,7 @@ """Tests for utility functions in _duct_main.py""" import pytest -from con_duct._utils import assert_num +from con_duct._utils import assert_num, etime_to_etimes, instantaneous_pcpu @pytest.mark.parametrize("input_value", [0, 1, 2, -1, 100, 0.001, -1.68]) @@ -13,3 +13,72 @@ def test_assert_num_green(input_value: int) -> None: def test_assert_num_red(input_value: int) -> None: with pytest.raises(AssertionError): assert_num(input_value) + + +@pytest.mark.parametrize( + "etime,expected", + [ + ("00:42", 42.0), + ("01:30", 90.0), + ("01:00:00", 3600.0), + ("12:34:56", 12 * 3600 + 34 * 60 + 56), + ("02-03:04:05", 2 * 86400 + 3 * 3600 + 4 * 60 + 5), + ("100-00:00:00", 100 * 86400.0), + ], +) +def test_etime_to_etimes_green(etime: str, expected: float) -> None: + assert etime_to_etimes(etime) == expected + + +@pytest.mark.parametrize("etime", ["", "garbage", "12", "1:2:3:4", "ab:cd"]) +def test_etime_to_etimes_red(etime: str) -> None: + with pytest.raises(ValueError): + etime_to_etimes(etime) + + +@pytest.mark.parametrize( + "prev_pcpu,prev_etimes,curr_pcpu,curr_etimes,expected", + [ + # Motivating con/duct#399 case: 100% for 60s then idle for 60s. + # Lifetime pcpu still reads 50% at t=120 (60 cputime / 120 + # etime), but the corrected instantaneous reading is 0%. + (100.0, 60.0, 50.0, 120.0, 0.0), + # Constant 84% load across a 10s interval. + (80.0, 10.0, 82.0, 20.0, 84.0), + # Pid that ramps from 50% lifetime to 75% lifetime over 100s + # of new wall time -> 100% during the new interval. + (50.0, 100.0, 75.0, 200.0, 100.0), + ], +) +def test_instantaneous_pcpu_green( + prev_pcpu: float, + prev_etimes: float, + curr_pcpu: float, + curr_etimes: float, + expected: float, +) -> None: + assert ( + instantaneous_pcpu(prev_pcpu, prev_etimes, curr_pcpu, curr_etimes) == expected + ) + + +@pytest.mark.parametrize( + "prev_pcpu,prev_etimes,curr_pcpu,curr_etimes,expected", + [ + # etimes regressed -> suspected pid reuse; fall back to curr. + (80.0, 100.0, 10.0, 2.0, 10.0), + # Same instant -> interval is zero, no rate definable; fall + # back to curr. + (50.0, 100.0, 50.0, 100.0, 50.0), + ], +) +def test_instantaneous_pcpu_falls_back_to_curr( + prev_pcpu: float, + prev_etimes: float, + curr_pcpu: float, + curr_etimes: float, + expected: float, +) -> None: + assert ( + instantaneous_pcpu(prev_pcpu, prev_etimes, curr_pcpu, curr_etimes) == expected + ) diff --git a/test/duct_main/test_e2e.py b/test/duct_main/test_e2e.py index 62aefc39..2c483d8f 100644 --- a/test/duct_main/test_e2e.py +++ b/test/duct_main/test_e2e.py @@ -36,7 +36,9 @@ def test_spawn_children( ) -> None: duct_prefix = f"{temp_output_dir}log_" script_path = TEST_SCRIPT_DIR / "spawn_children.sh" - dur = "0.3" + # >=1s outlives ps's etime quantum so the production + # DROP_YOUNG_PIDS filter doesn't drop the spawned children. + dur = "2" command = ( f"{duct_cmd} -q --s-i 0.001 --r-i 0.01 " f"-p {duct_prefix} {script_path} {mode} {num_children} {dur}" @@ -64,7 +66,9 @@ def test_spawn_children( def test_session_modes(temp_output_dir: str, duct_cmd: str, session_mode: str) -> None: """Test that both session modes work correctly and collect appropriate data.""" duct_prefix = f"{temp_output_dir}log_" - command = f"{duct_cmd} -q --s-i 0.01 --r-i 0.05 --mode {session_mode} -p {duct_prefix} sleep 0.3" + # >=1s outlives ps's etime quantum so the production + # DROP_YOUNG_PIDS filter leaves observable samples. + command = f"{duct_cmd} -q --s-i 0.01 --r-i 0.05 --mode {session_mode} -p {duct_prefix} sleep 2" subprocess.check_output(command, shell=True) # Check that log files were created @@ -113,7 +117,9 @@ def test_session_mode_behavior_difference(temp_output_dir: str, duct_cmd: str) - new_session_prefix = f"{temp_output_dir}new_" current_session_prefix = f"{temp_output_dir}current_" - # Run duct with new-session mode - should NOT see background process + # Run duct with new-session mode - should NOT see background process. + # >=1s outlives ps's etime quantum so the production + # DROP_YOUNG_PIDS filter leaves observable samples. subprocess.check_output( f"{duct_cmd} -q --s-i 0.01 --r-i 0.05 --mode new-session -p {new_session_prefix} sleep 2", shell=True, diff --git a/test/duct_main/test_report.py b/test/duct_main/test_report.py index c4cb9206..fc0a9102 100644 --- a/test/duct_main/test_report.py +++ b/test/duct_main/test_report.py @@ -12,33 +12,39 @@ stat0 = ProcessStats( pcpu=0.0, + pcpu_raw=0.0, pmem=0, rss=0, vsz=0, timestamp="2024-06-11T10:09:37-04:00", etime="00:00", + etimes=0.0, cmd="cmd 1", stat=Counter(["stat0"]), ) stat1 = ProcessStats( pcpu=1.0, + pcpu_raw=1.0, pmem=0, rss=0, vsz=0, timestamp="2024-06-11T10:13:23-04:00", etime="00:02", + etimes=2.0, cmd="cmd 1", stat=Counter(["stat1"]), ) stat2 = ProcessStats( pcpu=1.1, + pcpu_raw=1.1, pmem=1.1, rss=11, vsz=11, timestamp="2024-06-11T10:13:23-04:00", etime="00:02", + etimes=2.0, cmd="cmd 1", stat=Counter(["stat2"]), ) @@ -178,11 +184,13 @@ def test_process_stats_green( # Assert does not raise ProcessStats( pcpu=pcpu, + pcpu_raw=pcpu, pmem=pmem, rss=rss, vsz=vsz, timestamp=datetime.now().astimezone().isoformat(), etime=etime, + etimes=0.0, cmd=cmd, stat=Counter(["stat0"]), ) @@ -204,11 +212,13 @@ def test_process_stats_red( with pytest.raises(AssertionError): ProcessStats( pcpu=pcpu, + pcpu_raw=0.0, pmem=pmem, rss=rss, vsz=vsz, timestamp=datetime.now().astimezone().isoformat(), etime=etime, + etimes=0.0, cmd=cmd, stat=Counter(["stat0"]), ) @@ -230,6 +240,49 @@ def test_system_info_sanity(mock_log_paths: mock.MagicMock) -> None: assert report.system_info.user == os.environ.get("USER") +@mock.patch("con_duct._tracker._get_sample") +@mock.patch("con_duct._tracker.LogPaths") +def test_collect_sample_round_trips_prev_raw( + mock_log_paths: mock.MagicMock, mock_get_sample: mock.MagicMock +) -> None: + """Report.collect_sample threads its prev_raw dict across calls. + + Each invocation of _get_sample receives the prev_raw returned by + the previous invocation (or {} on the first call) and the + returned new prev_raw lands on self.prev_raw for the next call. + Regression guard for the load-bearing decision that the Tracker + holds the cross-sample state. + """ + mock_log_paths.prefix = "mock_prefix" + cwd = os.getcwd() + report = Report( + "_cmd", [], mock_log_paths, EXECUTION_SUMMARY_FORMAT, cwd, clobber=False + ) + report.session_id = 42 + + received: list[dict] = [] + returned: list[dict] = [ + {1234: (80.0, 10.0)}, + {1234: (82.0, 20.0)}, + ] + + def fake_get_sample( + _session_id: int, prev: dict[int, tuple[float, float]] + ) -> tuple[Sample, dict[int, tuple[float, float]]]: + received.append(dict(prev)) + return Sample(), returned[len(received) - 1] + + mock_get_sample.side_effect = fake_get_sample + + report.collect_sample() + assert received[0] == {} + assert report.prev_raw == {1234: (80.0, 10.0)} + + report.collect_sample() + assert received[1] == {1234: (80.0, 10.0)} + assert report.prev_raw == {1234: (82.0, 20.0)} + + @mock.patch("con_duct._tracker.shutil.which") @mock.patch("con_duct._tracker.subprocess.check_output") @mock.patch("con_duct._tracker.LogPaths") diff --git a/test/duct_main/test_sampling.py b/test/duct_main/test_sampling.py new file mode 100644 index 00000000..8619d760 --- /dev/null +++ b/test/duct_main/test_sampling.py @@ -0,0 +1,85 @@ +"""Tests for the platform-specific samplers in _sampling.py.""" + +from __future__ import annotations +from unittest import mock +import pytest +from con_duct._sampling import _get_sample_linux + +_HEADER = " PID %CPU %MEM RSS VSZ ELAPSED STAT CMD\n" + + +@pytest.mark.usefixtures("enable_drop_young_pids") +@mock.patch("con_duct._sampling.subprocess.check_output") +def test_get_sample_linux_threads_prev_state( + mock_check_output: mock.MagicMock, +) -> None: + """Three successive calls round-trip the prev-state dict. + + Call 1: no prior -> pcpu falls back to pcpu_raw. + Call 2: prior present -> pcpu computed via instantaneous_pcpu. + Call 3: pid 1234 vanishes, pid 5678 appears -> new_prev is + rebuilt fresh, dropping stale entries automatically. + """ + out_1 = _HEADER + " 1234 80.0 1.0 1024 2048 00:10 R cmd\n" + # pcpu=82 at etimes=20 against pcpu=80 at etimes=10 yields a + # constant 84% load across the 10s interval (mirrors one of the + # test_instantaneous_pcpu_green parameterized cases). + out_2 = _HEADER + " 1234 82.0 1.0 1024 2048 00:20 R cmd\n" + # 9999 has etime "00:00" -> dropped per DROP_YOUNG_PIDS; + # 5678 first-observed with non-zero etime -> fallback path. + out_3 = ( + _HEADER + + " 5678 50.0 1.0 512 1024 00:05 R other\n" + + " 9999 99.0 1.0 256 512 00:00 R young\n" + ) + mock_check_output.side_effect = [out_1, out_2, out_3] + + sample1, prev1 = _get_sample_linux(session_id=42, prev={}) + assert sample1 is not None + assert prev1 == {1234: (80.0, 10.0)} + stats1 = sample1.stats[1234] + assert stats1.pcpu == 80.0 + assert stats1.pcpu_raw == 80.0 + assert stats1.etimes == 10.0 + assert stats1.rss == 1024 * 1024 # KiB -> bytes + assert stats1.vsz == 2048 * 1024 + + sample2, prev2 = _get_sample_linux(session_id=42, prev=prev1) + assert sample2 is not None + assert prev2 == {1234: (82.0, 20.0)} + stats2 = sample2.stats[1234] + assert stats2.pcpu_raw == 82.0 + assert stats2.etimes == 20.0 + assert stats2.pcpu == 84.0 + + sample3, prev3 = _get_sample_linux(session_id=42, prev=prev2) + assert sample3 is not None + assert prev3 == {5678: (50.0, 5.0)} + assert 1234 not in sample3.stats + # First observation of 5678 -> fallback to pcpu_raw. + assert sample3.stats[5678].pcpu == 50.0 + # 9999 had etime="00:00" -> dropped from sample and prev. + assert 9999 not in sample3.stats + assert 9999 not in prev3 + + +@pytest.mark.usefixtures("enable_drop_young_pids") +@mock.patch("con_duct._sampling.subprocess.check_output") +def test_get_sample_linux_returns_none_when_all_dropped( + mock_check_output: mock.MagicMock, +) -> None: + """When every pid in ps output is filtered (e.g. all young), + return (None, new_prev) instead of an empty Sample. Matches + _get_sample_mac's no-pids path; lets the monitor loop skip the + round rather than crash on Averages.from_sample's not-None + assertions. + """ + out = ( + _HEADER + + " 1111 99.0 1.0 256 512 00:00 R young_a\n" + + " 2222 99.0 1.0 256 512 00:00 R young_b\n" + ) + mock_check_output.side_effect = [out] + sample, prev = _get_sample_linux(session_id=42, prev={}) + assert sample is None + assert prev == {}