PYTHON-5784 Increase code coverage for periodic_executor.py#2771
PYTHON-5784 Increase code coverage for periodic_executor.py#2771aclark4life wants to merge 11 commits intomongodb:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a dedicated unit test module for pymongo.periodic_executor, targeting both the threading-based PeriodicExecutor and asyncio-based AsyncPeriodicExecutor, plus module-level executor registration/shutdown helpers, to improve overall coverage.
Changes:
- Adds new unit tests for
PeriodicExecutorlifecycle and control methods (open/close/join/wake/update_interval/skip_sleep, stop conditions). - Adds new unit tests for
AsyncPeriodicExecutorlifecycle and stop conditions. - Adds unit tests for module-level
_register_executor()and_shutdown_executors()behavior.
| call_times = [] | ||
|
|
||
| async def target(): | ||
| call_times.append(asyncio.get_event_loop().time()) |
There was a problem hiding this comment.
Inside a running coroutine, asyncio.get_event_loop() is deprecated behavior on newer Python versions and can raise warnings; it may also return a different loop than expected. Use asyncio.get_running_loop().time() to record timestamps from the active loop.
| call_times.append(asyncio.get_event_loop().time()) | |
| call_times.append(asyncio.get_running_loop().time()) |
| await ex.join(timeout=2) | ||
| self.assertTrue(ex._stopped) |
There was a problem hiding this comment.
When the async target raises, the underlying task will finish with an exception. Since AsyncPeriodicExecutor.join() uses asyncio.wait() (which doesn’t retrieve the task exception), this test can emit "Task exception was never retrieved" warnings when the task is GC’d. After await ex.join(...), explicitly retrieve the exception from ex._task (or otherwise await the task) to prevent noisy warnings in the test suite.
| await ex.join(timeout=2) | |
| self.assertTrue(ex._stopped) | |
| await ex.join(timeout=2) | |
| task_exc = ex._task.exception() if ex._task is not None and ex._task.done() else None | |
| self.assertTrue(ex._stopped) | |
| self.assertIsInstance(task_exc, RuntimeError) |
| stopped = threading.Event() | ||
|
|
||
| def target(): | ||
| stopped.wait(timeout=5) | ||
| return True | ||
|
|
||
| ex = _make_sync(target=target) | ||
| ex.open() | ||
| time.sleep(0.05) | ||
| _register_executor(ex) | ||
| _shutdown_executors() | ||
| stopped.set() |
There was a problem hiding this comment.
This target blocks for up to 5 seconds, so _shutdown_executors()’s internal executor.join(1) will reliably burn the full 1s timeout before returning (since the target can’t observe close() until stopped is set later). This makes the unit test consistently slow. Consider using a non-blocking target (e.g., one that returns quickly and relies on the executor sleep loop) so shutdown/join can complete promptly.
| stopped = threading.Event() | |
| def target(): | |
| stopped.wait(timeout=5) | |
| return True | |
| ex = _make_sync(target=target) | |
| ex.open() | |
| time.sleep(0.05) | |
| _register_executor(ex) | |
| _shutdown_executors() | |
| stopped.set() | |
| ran = threading.Event() | |
| def target(): | |
| ran.set() | |
| return True | |
| ex = _make_sync(target=target) | |
| ex.open() | |
| self.assertTrue(ran.wait(timeout=2), "target never ran") | |
| _register_executor(ex) | |
| _shutdown_executors() |
| import sys | ||
| import threading | ||
| import time | ||
| import weakref |
There was a problem hiding this comment.
weakref is imported but never used in this test module. Removing unused imports helps keep the test file clean and avoids lint failures if the repo enforces import checks.
| import weakref |
| # When executor is GC'd the ref is cleaned up. | ||
| ref_count_before = len(pe_module._EXECUTORS) | ||
| del ex | ||
| self.assertLessEqual(len(pe_module._EXECUTORS), ref_count_before) | ||
|
|
There was a problem hiding this comment.
This test relies on the executor weakref being removed immediately after del ex, but that cleanup is GC-dependent and the current assertion (<=) doesn’t actually verify that the weakref callback ran. Consider forcing collection (e.g., gc.collect()) and/or waiting until _EXECUTORS shrinks to make the test deterministic across Python implementations.
| ex = _make_sync(target=target) | ||
| ex.open() | ||
| time.sleep(0.05) | ||
| _register_executor(ex) |
There was a problem hiding this comment.
PeriodicExecutor.open() already calls _register_executor(self). Registering ex again here adds a duplicate weakref into _EXECUTORS, which can make the test’s behavior harder to reason about and can cause extra work in _shutdown_executors(). Consider removing the extra _register_executor(ex) call (or clearing _EXECUTORS first if the test needs explicit registration).
| _register_executor(ex) |
…cated API, gc-safe weakref assertion, non-blocking shutdown test, retrieve task exception
| async def _default_target(): | ||
| return True | ||
|
|
||
| if target is None: | ||
| target = _default_target |
There was a problem hiding this comment.
Does this need to differ from the way _make_sync above does it?
| def _run(coroutine): | ||
| return asyncio.run(coroutine) |
There was a problem hiding this comment.
The individual tests that use asynchronous methods should use AsyncUnitTest or another appropriate test class from test/asynchronous/__init__.py instead of this. Each asyncio.run() call adds a significant amount of runtime overhead to setup and teardown the event loop.
|
|
||
| class TestPeriodicExecutorRepr(unittest.TestCase): | ||
| def test_repr_contains_class_and_name(self): | ||
| ex = _make_sync(name="myexec") |
There was a problem hiding this comment.
We can move all the _make_sync() calls into an asyncSetUp/setUp method on a base class shared by all of the test classes here.
| finally: | ||
| ex.close() | ||
| ex.join(timeout=2) | ||
|
|
There was a problem hiding this comment.
Similarly, safe closing of each executor can be done in an asyncTearDown/tearDown method on a base class.
| # --------------------------------------------------------------------------- | ||
|
|
||
|
|
||
| class TestAsyncPeriodicExecutorRepr(unittest.TestCase): |
There was a problem hiding this comment.
Can we move the AsyncPeriodicExecutor tests into test/asynchronous/ and generate the synchronous ones using synchro? That would half the amount of code we need to maintain.
…ous/ via synchro - Create test/asynchronous/test_periodic_executor.py as the single source of truth for all periodic executor tests, using AsyncUnitTest with asyncSetUp/ asyncTearDown base class for executor lifecycle management - Register test_periodic_executor.py in synchro's converted_tests so the sync variant is auto-generated - Replace the manually-maintained test/test_periodic_executor.py with the synchro-generated equivalent, eliminating duplicated async/sync test code - Use _IS_SYNC branching for the small number of tests that differ between threading (PeriodicExecutor) and asyncio (AsyncPeriodicExecutor) behavior
PYTHON-5784
Changes in this PR
Adds
test/test_periodic_executor.pywith unit tests forpymongo/periodic_executor.py. Tests cover the full open/close lifecycle,wake(),update_interval(),skip_sleep(), target returningFalsestopping the executor, exception in target stopping the executor, and the same lifecycle forAsyncPeriodicExecutor.Test Plan
Added unit tests using
threading.Eventfor synchronization andasyncio.run()for async executor tests. Background thread exceptions are captured viathreading.excepthookoverride to prevent pytest's thread exception plugin from treating them as test failures.Checklist
Checklist for Author
Checklist for Reviewer