feat(platform-wallet): instrument wallet_manager RwLock behind lock-stats feature#3557
feat(platform-wallet): instrument wallet_manager RwLock behind lock-stats feature#3557QuantumExplorer wants to merge 1 commit intov3.1-devfrom
Conversation
📝 WalkthroughWalkthroughThis pull request introduces optional lock instrumentation for the wallet manager synchronization primitive. A new Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
✅ Review complete (commit 952ded1) |
763149b to
47fdf9a
Compare
…tats feature
Adds an `InstrumentedRwLock<T>` wrapper around `tokio::sync::RwLock<T>`
that, when the new `lock-stats` Cargo feature is enabled, records
per-call-site acquisition counts plus wait and hold durations. Off by
default — with the feature off the wrapper is a thin newtype whose
methods are `#[inline]` delegations and whose guards are zero-overhead.
Motivation: SPV holds the `wallet_manager` write lock across the full
duration of upstream's `process_block_for_wallets`. Anything platform-
side holding a read or write lock on the same mutex blocks SPV. We
already audited statically (109 acquisition sites, 52 of them writes)
but had no way to measure runtime contention. This wrapper makes that
measurable on demand without paying the cost when we're not measuring.
Wrapper layout
--------------
* `InstrumentedRwLock<T>` holds `inner: Arc<TokioRwLock<T>>` plus, with
`lock-stats` on, an `Arc<LockStats>`. The inner-Arc shape lets us
hand a real `Arc<TokioRwLock<T>>` to upstream APIs that take it
literally (e.g. `dash_spv::DashSpvClient::new`) via `raw_arc()`.
Acquisitions made through that handed-out Arc bypass the wrapper's
stats — the intentional trade is that SPV's own `process_block`
write isn't tracked, but every platform-side `wallet_manager.read()`
/ `.write()` IS.
* `read_at("tag") / write_at("tag")` (plus `try_*_at` and `blocking_*_at`)
attribute the acquisition to a per-call-site bucket. Untagged calls
bucket into `UNTAGGED` so no acquisition vanishes from the snapshot.
* `LockStats::snapshot()` produces a `Snapshot { total, per_tag }` of
cumulative counters: read/write acquired counts, contention counts
(try_* failures), wait_ns_total, hold_ns_total, plus mean helpers
(`read_wait_ns_mean`, etc.). Cheap enough to call on every refresh
of a debug UI.
* Per-tag map lives behind a `parking_lot::Mutex<BTreeMap<&'static str,
Arc<SiteCounters>>>` — touched only on the lock acquire / release
boundary, never across an `.await`, so a sync mutex is appropriate.
* `parking_lot` is an optional dep gated on the feature; nothing pulls
it in the default build.
Migration & call-site tagging
-----------------------------
* All 21 declarations of `Arc<RwLock<WalletManager<PlatformWalletInfo>>>`
across `manager`, `spv`, `wallet`, `changeset` modules switched to
`Arc<crate::diagnostics::InstrumentedRwLock<...>>`. The two structs
that named `RwLockReadGuard` / `RwLockWriteGuard` directly
(`WalletStateReadGuard` / `WalletStateWriteGuard` and
`IdentityWallet::wallet_manager_read` / `_write`) switched to the
wrapper guards.
* Existing call sites continue to use `lock.read().await` / `.write()
.await` — they bucket into `UNTAGGED`. Tagging is opt-in per site.
* Two sites tagged as worked examples in
`core_bridge::spawn_wallet_event_adapter`: the one-shot
`event_adapter::subscribe` read at task start, and the
`event_adapter::is_chain_locked` per-event probe that the audit
flagged as the primary new contention point introduced by the
event-bus migration.
Tests
-----
5 unit tests in `diagnostics::instrumented_lock::tests` cover the
no-stats path (read/write smoke, contended try_read) and the stats
path (per-tag attribution, untagged bucket routing, contention
counter increments).
Verified: `cargo check --workspace` clean, `cargo check -p
platform-wallet --features lock-stats` clean, `cargo test -p
platform-wallet --features lock-stats --lib diagnostics` passes 5/5,
`cargo fmt --all -- --check` clean.
No FFI surface in this PR — `LockStats::snapshot()` is Rust-only for
the first cut. Adding an FFI accessor is the natural next step once
we know what the iOS debug UI wants from the snapshot shape.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
47fdf9a to
952ded1
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/rs-platform-wallet/src/wallet/core/wallet.rs (1)
19-20:⚠️ Potential issue | 🟡 MinorUpdate stale type wording in docs.
The struct docs still say mutable state is behind
Arc<RwLock<…>>, but this handle now usesArc<InstrumentedRwLock<…>>. Please align the comment to avoid confusion.Also applies to: 27-28
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/rs-platform-wallet/src/wallet/core/wallet.rs` around lines 19 - 20, Update the struct documentation to replace references to `Arc<RwLock<…>>` with `Arc<InstrumentedRwLock<…>>` (and likewise update any wording about "mutable state is behind Arc<RwLock<…>>") for the handle type (the `WalletManager<PlatformWalletInfo>`/handle docs in this file), ensuring the comment on the handle (including the occurrences around lines 27-28) accurately describes that the handle holds `Arc` references to an `InstrumentedRwLock` and remains cheaply `Clone`able.
🧹 Nitpick comments (1)
packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs (1)
154-155: Consider tagging this provider’s hot lock paths.Now that the wrapper is in place, using
read_at(...)/write_at(...)in key paths here would make lock-stats significantly more actionable than accumulating mostly underUNTAGGED.Also applies to: 188-189, 271-272
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs` around lines 154 - 155, The InstrumentedRwLock usage for wallet_manager is currently untagged; replace hot-path .read()/.write() calls on the Arc<InstrumentedRwLock<WalletManager<PlatformWalletInfo>>> (the wallet_manager field) with the instrumented .read_at(...)/.write_at(...) variants and supply descriptive tags per call site (e.g., "provider::get_addresses_read", "provider::commit_tracked_state_write", "provider::update_addresses_write") so lock stats break out by operation; update every occurrence in this file (including the sites around the earlier read/write uses) to use those tagged methods and choose concise labels reflecting the operation performed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs`:
- Around line 253-370: The async and blocking acquisition paths (read_at,
write_at, blocking_read_at, blocking_write_at) only record the wait duration but
never increment contention counters, so add a contention check: after computing
wait_ns (in read_at, write_at, blocking_read_at, blocking_write_at) call
self.stats.record_read_contended(tag) or self.stats.record_write_contended(tag)
when wait_ns > 0 (or otherwise indicates blocking), then continue to call
record_read_acquired/record_write_acquired as now; update those four functions
accordingly to ensure contended counts increment for awaited/blocking waits as
well as try_* paths.
In `@packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs`:
- Around line 128-188: The hold-time means are computed by dividing
read_hold_ns_total/write_hold_ns_total by the acquired counts, but holds are
only recorded on guard drop so this can undercount; add release counters (e.g.,
read_released: AtomicU64 and write_released: AtomicU64) to SiteCounters and
corresponding u64 fields to SiteStats, increment those release counters where
read/write guards are dropped (the code that currently adds to
read_hold_ns_total/write_hold_ns_total), and update SiteCounters::snapshot to
expose read_released/write_released and ensure any mean calculations use the
release counts (read_released/write_released) instead of
read_acquired/write_acquired.
---
Outside diff comments:
In `@packages/rs-platform-wallet/src/wallet/core/wallet.rs`:
- Around line 19-20: Update the struct documentation to replace references to
`Arc<RwLock<…>>` with `Arc<InstrumentedRwLock<…>>` (and likewise update any
wording about "mutable state is behind Arc<RwLock<…>>") for the handle type (the
`WalletManager<PlatformWalletInfo>`/handle docs in this file), ensuring the
comment on the handle (including the occurrences around lines 27-28) accurately
describes that the handle holds `Arc` references to an `InstrumentedRwLock` and
remains cheaply `Clone`able.
---
Nitpick comments:
In `@packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs`:
- Around line 154-155: The InstrumentedRwLock usage for wallet_manager is
currently untagged; replace hot-path .read()/.write() calls on the
Arc<InstrumentedRwLock<WalletManager<PlatformWalletInfo>>> (the wallet_manager
field) with the instrumented .read_at(...)/.write_at(...) variants and supply
descriptive tags per call site (e.g., "provider::get_addresses_read",
"provider::commit_tracked_state_write", "provider::update_addresses_write") so
lock stats break out by operation; update every occurrence in this file
(including the sites around the earlier read/write uses) to use those tagged
methods and choose concise labels reflecting the operation performed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 46cd6f56-02bb-48fe-9414-e6cd76bf92df
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (15)
packages/rs-platform-wallet/Cargo.tomlpackages/rs-platform-wallet/src/changeset/core_bridge.rspackages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rspackages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rspackages/rs-platform-wallet/src/diagnostics/mod.rspackages/rs-platform-wallet/src/lib.rspackages/rs-platform-wallet/src/manager/mod.rspackages/rs-platform-wallet/src/spv/runtime.rspackages/rs-platform-wallet/src/wallet/asset_lock/manager.rspackages/rs-platform-wallet/src/wallet/core/wallet.rspackages/rs-platform-wallet/src/wallet/identity/network/identity_handle.rspackages/rs-platform-wallet/src/wallet/platform_addresses/provider.rspackages/rs-platform-wallet/src/wallet/platform_addresses/wallet.rspackages/rs-platform-wallet/src/wallet/platform_wallet.rspackages/rs-platform-wallet/src/wallet/tokens/wallet.rs
| pub async fn read_at(&self, tag: &'static str) -> ReadGuard<'_, T> { | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.read().await; | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| self.stats.record_read_acquired(tag, wait_ns); | ||
| ReadGuard { | ||
| inner, | ||
| stats: Arc::clone(&self.stats), | ||
| tag, | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } | ||
|
|
||
| /// Acquire an exclusive lock with a per-call-site tag. | ||
| pub async fn write_at(&self, tag: &'static str) -> WriteGuard<'_, T> { | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.write().await; | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| self.stats.record_write_acquired(tag, wait_ns); | ||
| WriteGuard { | ||
| inner, | ||
| stats: Arc::clone(&self.stats), | ||
| tag, | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } | ||
|
|
||
| /// Try to acquire a shared lock without waiting. | ||
| #[inline] | ||
| pub fn try_read(&self) -> Result<ReadGuard<'_, T>, TryLockError> { | ||
| self.try_read_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Try to acquire an exclusive lock without waiting. | ||
| #[inline] | ||
| pub fn try_write(&self) -> Result<WriteGuard<'_, T>, TryLockError> { | ||
| self.try_write_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Tagged variant of [`try_read`](Self::try_read). | ||
| pub fn try_read_at(&self, tag: &'static str) -> Result<ReadGuard<'_, T>, TryLockError> { | ||
| match self.inner.try_read() { | ||
| Ok(inner) => { | ||
| self.stats.record_read_acquired(tag, 0); | ||
| Ok(ReadGuard { | ||
| inner, | ||
| stats: Arc::clone(&self.stats), | ||
| tag, | ||
| acquired_at: Instant::now(), | ||
| }) | ||
| } | ||
| Err(e) => { | ||
| self.stats.record_read_contended(tag); | ||
| Err(e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Tagged variant of [`try_write`](Self::try_write). | ||
| pub fn try_write_at(&self, tag: &'static str) -> Result<WriteGuard<'_, T>, TryLockError> { | ||
| match self.inner.try_write() { | ||
| Ok(inner) => { | ||
| self.stats.record_write_acquired(tag, 0); | ||
| Ok(WriteGuard { | ||
| inner, | ||
| stats: Arc::clone(&self.stats), | ||
| tag, | ||
| acquired_at: Instant::now(), | ||
| }) | ||
| } | ||
| Err(e) => { | ||
| self.stats.record_write_contended(tag); | ||
| Err(e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Synchronously acquire a shared lock — must NOT be called from | ||
| /// a tokio runtime thread. | ||
| #[inline] | ||
| pub fn blocking_read(&self) -> ReadGuard<'_, T> { | ||
| self.blocking_read_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Synchronously acquire an exclusive lock — must NOT be called | ||
| /// from a tokio runtime thread. | ||
| #[inline] | ||
| pub fn blocking_write(&self) -> WriteGuard<'_, T> { | ||
| self.blocking_write_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Tagged variant of [`blocking_read`](Self::blocking_read). | ||
| pub fn blocking_read_at(&self, tag: &'static str) -> ReadGuard<'_, T> { | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.blocking_read(); | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| self.stats.record_read_acquired(tag, wait_ns); | ||
| ReadGuard { | ||
| inner, | ||
| stats: Arc::clone(&self.stats), | ||
| tag, | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } | ||
|
|
||
| /// Tagged variant of [`blocking_write`](Self::blocking_write). | ||
| pub fn blocking_write_at(&self, tag: &'static str) -> WriteGuard<'_, T> { | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.blocking_write(); | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| self.stats.record_write_acquired(tag, wait_ns); | ||
| WriteGuard { | ||
| inner, | ||
| stats: Arc::clone(&self.stats), | ||
| tag, | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } |
There was a problem hiding this comment.
Count contention on the awaited/blocking paths too.
Line 253, Line 267, Line 345, and Line 359 only record wait time after the lock resolves. If those calls actually block, read_contended / write_contended still stay at zero because only the try_*_at() paths increment them today. That makes the new contention counters miss the normal code paths this PR is instrumenting.
🩹 Suggested direction
pub async fn read_at(&self, tag: &'static str) -> ReadGuard<'_, T> {
+ if let Ok(inner) = self.inner.try_read() {
+ self.stats.record_read_acquired(tag, 0);
+ return ReadGuard {
+ inner,
+ stats: Arc::clone(&self.stats),
+ tag,
+ acquired_at: Instant::now(),
+ };
+ }
+
+ self.stats.record_read_contended(tag);
let wait_start = Instant::now();
let inner = self.inner.read().await;
let wait_ns = wait_start.elapsed().as_nanos() as u64;
self.stats.record_read_acquired(tag, wait_ns);
ReadGuard {
@@
pub async fn write_at(&self, tag: &'static str) -> WriteGuard<'_, T> {
+ if let Ok(inner) = self.inner.try_write() {
+ self.stats.record_write_acquired(tag, 0);
+ return WriteGuard {
+ inner,
+ stats: Arc::clone(&self.stats),
+ tag,
+ acquired_at: Instant::now(),
+ };
+ }
+
+ self.stats.record_write_contended(tag);
let wait_start = Instant::now();
let inner = self.inner.write().await;
let wait_ns = wait_start.elapsed().as_nanos() as u64;
self.stats.record_write_acquired(tag, wait_ns);
WriteGuard {Apply the same pattern to blocking_read_at / blocking_write_at.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs` around
lines 253 - 370, The async and blocking acquisition paths (read_at, write_at,
blocking_read_at, blocking_write_at) only record the wait duration but never
increment contention counters, so add a contention check: after computing
wait_ns (in read_at, write_at, blocking_read_at, blocking_write_at) call
self.stats.record_read_contended(tag) or self.stats.record_write_contended(tag)
when wait_ns > 0 (or otherwise indicates blocking), then continue to call
record_read_acquired/record_write_acquired as now; update those four functions
accordingly to ensure contended counts increment for awaited/blocking waits as
well as try_* paths.
| struct SiteCounters { | ||
| read_acquired: AtomicU64, | ||
| write_acquired: AtomicU64, | ||
| read_contended: AtomicU64, | ||
| write_contended: AtomicU64, | ||
| read_wait_ns_total: AtomicU64, | ||
| write_wait_ns_total: AtomicU64, | ||
| read_hold_ns_total: AtomicU64, | ||
| write_hold_ns_total: AtomicU64, | ||
| } | ||
|
|
||
| impl SiteCounters { | ||
| fn new() -> Self { | ||
| Self { | ||
| read_acquired: AtomicU64::new(0), | ||
| write_acquired: AtomicU64::new(0), | ||
| read_contended: AtomicU64::new(0), | ||
| write_contended: AtomicU64::new(0), | ||
| read_wait_ns_total: AtomicU64::new(0), | ||
| write_wait_ns_total: AtomicU64::new(0), | ||
| read_hold_ns_total: AtomicU64::new(0), | ||
| write_hold_ns_total: AtomicU64::new(0), | ||
| } | ||
| } | ||
|
|
||
| fn snapshot(&self) -> SiteStats { | ||
| SiteStats { | ||
| read_acquired: self.read_acquired.load(Ordering::Relaxed), | ||
| write_acquired: self.write_acquired.load(Ordering::Relaxed), | ||
| read_contended: self.read_contended.load(Ordering::Relaxed), | ||
| write_contended: self.write_contended.load(Ordering::Relaxed), | ||
| read_wait_ns_total: self.read_wait_ns_total.load(Ordering::Relaxed), | ||
| write_wait_ns_total: self.write_wait_ns_total.load(Ordering::Relaxed), | ||
| read_hold_ns_total: self.read_hold_ns_total.load(Ordering::Relaxed), | ||
| write_hold_ns_total: self.write_hold_ns_total.load(Ordering::Relaxed), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Plain-old-data snapshot of a single bucket (the global total or a | ||
| /// single tag). All durations are in nanoseconds; cumulative across | ||
| /// every acquisition since the lock was created. | ||
| #[derive(Debug, Clone, Default, PartialEq, Eq)] | ||
| pub struct SiteStats { | ||
| /// Number of times a read guard was successfully acquired. | ||
| pub read_acquired: u64, | ||
| /// Number of times a write guard was successfully acquired. | ||
| pub write_acquired: u64, | ||
| /// Number of times a `try_read` returned `Err(TryLockError)`. | ||
| pub read_contended: u64, | ||
| /// Number of times a `try_write` returned `Err(TryLockError)`. | ||
| pub write_contended: u64, | ||
| /// Cumulative wait time before read acquisitions resolved, in ns. | ||
| pub read_wait_ns_total: u64, | ||
| /// Cumulative wait time before write acquisitions resolved, in ns. | ||
| pub write_wait_ns_total: u64, | ||
| /// Cumulative time read guards were held before drop, in ns. | ||
| pub read_hold_ns_total: u64, | ||
| /// Cumulative time write guards were held before drop, in ns. | ||
| pub write_hold_ns_total: u64, | ||
| } |
There was a problem hiding this comment.
Use release counts for the hold-time means.
Line 215 and Line 224 divide hold totals by *_acquired, but read_hold_ns_total / write_hold_ns_total only move on guard drop. A snapshot taken while guards are still live can therefore report Some(0) or materially understate the mean even though no hold has completed yet.
🩹 Suggested direction
struct SiteCounters {
read_acquired: AtomicU64,
write_acquired: AtomicU64,
+ read_released: AtomicU64,
+ write_released: AtomicU64,
read_contended: AtomicU64,
write_contended: AtomicU64,
read_wait_ns_total: AtomicU64,
write_wait_ns_total: AtomicU64,
read_hold_ns_total: AtomicU64,
@@
pub(super) fn record_read_released(&self, tag: &'static str, held_ns: u64) {
+ self.total.read_released.fetch_add(1, Ordering::Relaxed);
self.total
.read_hold_ns_total
.fetch_add(held_ns, Ordering::Relaxed);
let site = self.site(tag);
+ site.read_released.fetch_add(1, Ordering::Relaxed);
site.read_hold_ns_total
.fetch_add(held_ns, Ordering::Relaxed);
}
@@
pub(super) fn record_write_released(&self, tag: &'static str, held_ns: u64) {
+ self.total.write_released.fetch_add(1, Ordering::Relaxed);
self.total
.write_hold_ns_total
.fetch_add(held_ns, Ordering::Relaxed);
let site = self.site(tag);
+ site.write_released.fetch_add(1, Ordering::Relaxed);
site.write_hold_ns_total
.fetch_add(held_ns, Ordering::Relaxed);
}
@@
pub struct SiteStats {
pub read_acquired: u64,
pub write_acquired: u64,
+ pub read_released: u64,
+ pub write_released: u64,
pub read_contended: u64,
pub write_contended: u64,
pub read_wait_ns_total: u64,
pub write_wait_ns_total: u64,
pub read_hold_ns_total: u64,
@@
pub fn read_hold_ns_mean(&self) -> Option<u64> {
- if self.read_acquired == 0 {
+ if self.read_released == 0 {
None
} else {
- Some(self.read_hold_ns_total / self.read_acquired)
+ Some(self.read_hold_ns_total / self.read_released)
}
}
@@
pub fn write_hold_ns_mean(&self) -> Option<u64> {
- if self.write_acquired == 0 {
+ if self.write_released == 0 {
None
} else {
- Some(self.write_hold_ns_total / self.write_acquired)
+ Some(self.write_hold_ns_total / self.write_released)
}
}
}Also applies to: 190-227
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs`
around lines 128 - 188, The hold-time means are computed by dividing
read_hold_ns_total/write_hold_ns_total by the acquired counts, but holds are
only recorded on guard drop so this can undercount; add release counters (e.g.,
read_released: AtomicU64 and write_released: AtomicU64) to SiteCounters and
corresponding u64 fields to SiteStats, increment those release counters where
read/write guards are dropped (the code that currently adds to
read_hold_ns_total/write_hold_ns_total), and update SiteCounters::snapshot to
expose read_released/write_released and ensure any mean calculations use the
release counts (read_released/write_released) instead of
read_acquired/write_acquired.
thepastaclaw
left a comment
There was a problem hiding this comment.
Code Review
Implementation is clean, off-by-default, and correctly scoped to a client crate (no consensus impact). All blocking gates (versioning, dependency direction, feature gating) are satisfied. Findings target the lock-stats=ON path: a hold-time mean bias and self-perturbation from holding the per-tag mutex inside the wallet_manager critical section, plus minor docs/inline/test-coverage refinements. Nothing rises to blocking severity.
Reviewed commit: 47fdf9a
🟡 4 suggestion(s) | 💬 4 nitpick(s)
1 additional finding
🟡 suggestion: No test verifies that hold-time is recorded via guard Drop
packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs (lines 664-715)
The lock-stats tests verify acquire-path counters (read_acquired, write_acquired, read_contended) and tag attribution but never assert that record_*_released is invoked on Drop. If a future refactor breaks the Drop impl on ReadGuard/WriteGuard (panic-on-drop, removed wiring, conditional return), *_hold_ns_total would silently stay at 0 with every existing assertion still passing. Adding tokio::time::sleep(Duration::from_micros(1)) while a tagged guard is held, then asserting ours.read_hold_ns_total > 0 after Drop, costs a single line and locks in the contract.
🤖 Prompt for all review comments with AI agents
These findings are from an automated code review. Verify each finding against the current code and only fix it if needed.
In `packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs`:
- [SUGGESTION] lines 210-226: Hold-time means divide by acquisitions instead of completed releases, biasing the metric downward
`read_hold_ns_mean` and `write_hold_ns_mean` divide `*_hold_ns_total` (only incremented on guard Drop) by `*_acquired` (incremented on acquire). Whenever a snapshot is sampled while N guards are still live, the divisor is too large by N relative to the dividend, and the reported mean is biased low — in the simplest case, one live guard yields `Some(0)` instead of `None` or a completed-sample mean. For long-held guards (e.g. wallet_manager held during sync) the bias can halve the apparent hold time, exactly when this metric is being read to investigate a stall. Either (a) maintain separate `read_released` / `write_released` counters incremented on Drop and divide hold totals by those, or (b) explicitly document that the mean reflects only completed acquisitions and may understate when many guards are live.
In `packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs`:
- [SUGGESTION] lines 158-416: Stats bookkeeping runs while the Tokio guard is held, perturbing the timings the feature is meant to measure
On the acquire path (`read_at`/`write_at`, mod.rs:161-186), the code grabs the inner `tokio::sync::RwLock` and only then calls `record_*_acquired`, which goes through `LockStats::site()` and takes the shared `parking_lot::Mutex<BTreeMap<…>>` behind `per_tag`. On the release path (`Drop` for `ReadGuard`/`WriteGuard`, mod.rs:371-376 and 412-416), the custom `Drop` body executes before the inner `TokioReadGuard`/`TokioWriteGuard` field is dropped, so `record_*_released` again takes that same mutex while the wallet_manager lock is still held.
Two concrete consequences for the diagnostic itself:
1. Concurrent unlocks/locks serialize on the per-tag mutex (and on `snapshot()`, which holds it for the full BTreeMap iteration with 8 atomic loads per tag at stats.rs:46-57). The instrument becomes a global serialization point that lengthens the wallet_manager critical section and inflates the very wait/hold numbers the user is reading.
2. The Drop path re-runs the BTreeMap lookup (`site(tag)`) even though the acquire path already produced the matching `Arc<SiteCounters>`. That Arc could be stored on the guard alongside `acquired_at`, eliminating the per-Drop mutex acquisition entirely.
A combined fix: return the `Arc<SiteCounters>` from `record_*_acquired`, store it on the guard, and have `record_*_released` take `&SiteCounters` directly so the Drop path skips both the BTreeMap and the per-tag mutex. This both halves the global-mutex traffic and (because the Drop body finishes before the inner guard is released) shrinks the window where stats bookkeeping overlaps the wallet_manager critical section. If the goal is to remove the overlap entirely on the release side, wrap `inner` in `Option`/`ManuallyDrop` and explicitly release it before recording.
- [SUGGESTION] lines 664-715: No test verifies that hold-time is recorded via guard Drop
The lock-stats tests verify acquire-path counters (`read_acquired`, `write_acquired`, `read_contended`) and tag attribution but never assert that `record_*_released` is invoked on Drop. If a future refactor breaks the `Drop` impl on `ReadGuard`/`WriteGuard` (panic-on-drop, removed wiring, conditional return), `*_hold_ns_total` would silently stay at 0 with every existing assertion still passing. Adding `tokio::time::sleep(Duration::from_micros(1))` while a tagged guard is held, then asserting `ours.read_hold_ns_total > 0` after Drop, costs a single line and locks in the contract.
- [SUGGESTION] lines 158-328: `*_at` variants are not `#[inline]`, weakening the zero-overhead claim in non-LTO builds
The PR documents that with `lock-stats` off, call-site code is byte-for-byte identical to direct `tokio::sync::RwLock` use. That depends on the compiler inlining `read_at`/`write_at`/`try_*_at`/`blocking_*_at`, but only the no-arg wrappers (`read`, `write`, `try_read`, `try_write`, `blocking_read`, `blocking_write`) carry `#[inline]`. The `*_at` variants — which are the workhorse, since the unmarked variants delegate to them — do not. With the feature off, those bodies collapse to a single `self.inner.read().await`-equivalent, so adding `#[inline]` makes the zero-overhead guarantee hold across all profiles (including non-LTO debug). Mechanical, no-cost change.
| /// Mean hold time for read acquisitions, in nanoseconds. | ||
| pub fn read_hold_ns_mean(&self) -> Option<u64> { | ||
| if self.read_acquired == 0 { | ||
| None | ||
| } else { | ||
| Some(self.read_hold_ns_total / self.read_acquired) | ||
| } | ||
| } | ||
|
|
||
| /// Mean hold time for write acquisitions, in nanoseconds. | ||
| pub fn write_hold_ns_mean(&self) -> Option<u64> { | ||
| if self.write_acquired == 0 { | ||
| None | ||
| } else { | ||
| Some(self.write_hold_ns_total / self.write_acquired) | ||
| } | ||
| } |
There was a problem hiding this comment.
🟡 Suggestion: Hold-time means divide by acquisitions instead of completed releases, biasing the metric downward
read_hold_ns_mean and write_hold_ns_mean divide *_hold_ns_total (only incremented on guard Drop) by *_acquired (incremented on acquire). Whenever a snapshot is sampled while N guards are still live, the divisor is too large by N relative to the dividend, and the reported mean is biased low — in the simplest case, one live guard yields Some(0) instead of None or a completed-sample mean. For long-held guards (e.g. wallet_manager held during sync) the bias can halve the apparent hold time, exactly when this metric is being read to investigate a stall. Either (a) maintain separate read_released / write_released counters incremented on Drop and divide hold totals by those, or (b) explicitly document that the mean reflects only completed acquisitions and may understate when many guards are live.
source: ['claude', 'codex']
🤖 Fix this with AI agents
These findings are from an automated code review. Verify each finding against the current code and only fix it if needed.
In `packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs`:
- [SUGGESTION] lines 210-226: Hold-time means divide by acquisitions instead of completed releases, biasing the metric downward
`read_hold_ns_mean` and `write_hold_ns_mean` divide `*_hold_ns_total` (only incremented on guard Drop) by `*_acquired` (incremented on acquire). Whenever a snapshot is sampled while N guards are still live, the divisor is too large by N relative to the dividend, and the reported mean is biased low — in the simplest case, one live guard yields `Some(0)` instead of `None` or a completed-sample mean. For long-held guards (e.g. wallet_manager held during sync) the bias can halve the apparent hold time, exactly when this metric is being read to investigate a stall. Either (a) maintain separate `read_released` / `write_released` counters incremented on Drop and divide hold totals by those, or (b) explicitly document that the mean reflects only completed acquisitions and may understate when many guards are live.
| pub async fn read_at(&self, tag: &'static str) -> ReadGuard<'_, T> { | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.read().await; | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_read_acquired(tag, wait_ns); | ||
| ReadGuard { | ||
| inner, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc::clone(&self.stats), | ||
| #[cfg(feature = "lock-stats")] | ||
| tag, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } | ||
|
|
||
| /// Acquire an exclusive (write) lock with a per-call-site tag. | ||
| /// See [`read_at`](Self::read_at) for the tag semantics. | ||
| pub async fn write_at(&self, tag: &'static str) -> WriteGuard<'_, T> { | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.write().await; | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_write_acquired(tag, wait_ns); | ||
| WriteGuard { | ||
| inner, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc::clone(&self.stats), | ||
| #[cfg(feature = "lock-stats")] | ||
| tag, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } | ||
|
|
||
| // ----------------------------------------------------------------- | ||
| // Non-blocking try-acquire | ||
| // ----------------------------------------------------------------- | ||
|
|
||
| /// Try to acquire a shared lock without waiting. | ||
| /// Returns `Err(TryLockError)` if the lock is currently held | ||
| /// exclusively. Equivalent to [`tokio::sync::RwLock::try_read`] when | ||
| /// `lock-stats` is off; otherwise also records under [`UNTAGGED`]. | ||
| #[inline] | ||
| pub fn try_read(&self) -> Result<ReadGuard<'_, T>, TryLockError> { | ||
| self.try_read_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Try to acquire an exclusive lock without waiting. | ||
| #[inline] | ||
| pub fn try_write(&self) -> Result<WriteGuard<'_, T>, TryLockError> { | ||
| self.try_write_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Tagged variant of [`try_read`](Self::try_read). On | ||
| /// `Err(TryLockError)` the contention is recorded under `tag` | ||
| /// (when `lock-stats` is on). | ||
| pub fn try_read_at(&self, tag: &'static str) -> Result<ReadGuard<'_, T>, TryLockError> { | ||
| match self.inner.try_read() { | ||
| Ok(inner) => { | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_read_acquired(tag, 0); | ||
| Ok(ReadGuard { | ||
| inner, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc::clone(&self.stats), | ||
| #[cfg(feature = "lock-stats")] | ||
| tag, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant::now(), | ||
| }) | ||
| } | ||
| Err(e) => { | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_read_contended(tag); | ||
| #[cfg(not(feature = "lock-stats"))] | ||
| let _ = tag; | ||
| Err(e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Tagged variant of [`try_write`](Self::try_write). | ||
| pub fn try_write_at(&self, tag: &'static str) -> Result<WriteGuard<'_, T>, TryLockError> { | ||
| match self.inner.try_write() { | ||
| Ok(inner) => { | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_write_acquired(tag, 0); | ||
| Ok(WriteGuard { | ||
| inner, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc::clone(&self.stats), | ||
| #[cfg(feature = "lock-stats")] | ||
| tag, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant::now(), | ||
| }) | ||
| } | ||
| Err(e) => { | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_write_contended(tag); | ||
| #[cfg(not(feature = "lock-stats"))] | ||
| let _ = tag; | ||
| Err(e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // ----------------------------------------------------------------- | ||
| // Sync (blocking) acquire — for FFI bridges and Drop paths | ||
| // ----------------------------------------------------------------- | ||
|
|
||
| /// Synchronously acquire a shared lock. Must NOT be called from a | ||
| /// tokio runtime thread (will panic). Mirrors | ||
| /// [`tokio::sync::RwLock::blocking_read`]. | ||
| #[inline] | ||
| pub fn blocking_read(&self) -> ReadGuard<'_, T> { | ||
| self.blocking_read_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Synchronously acquire an exclusive lock. Must NOT be called from | ||
| /// a tokio runtime thread. | ||
| #[inline] | ||
| pub fn blocking_write(&self) -> WriteGuard<'_, T> { | ||
| self.blocking_write_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Tagged variant of [`blocking_read`](Self::blocking_read). | ||
| pub fn blocking_read_at(&self, tag: &'static str) -> ReadGuard<'_, T> { | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.blocking_read(); | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_read_acquired(tag, wait_ns); | ||
| ReadGuard { | ||
| inner, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc::clone(&self.stats), | ||
| #[cfg(feature = "lock-stats")] | ||
| tag, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } | ||
|
|
||
| /// Tagged variant of [`blocking_write`](Self::blocking_write). | ||
| pub fn blocking_write_at(&self, tag: &'static str) -> WriteGuard<'_, T> { | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.blocking_write(); | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_write_acquired(tag, wait_ns); | ||
| WriteGuard { | ||
| inner, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc::clone(&self.stats), | ||
| #[cfg(feature = "lock-stats")] | ||
| tag, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl<T: Default> Default for InstrumentedRwLock<T> { | ||
| fn default() -> Self { | ||
| Self::new(T::default()) | ||
| } | ||
| } | ||
|
|
||
| impl<T: std::fmt::Debug> std::fmt::Debug for InstrumentedRwLock<T> { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| f.debug_struct("InstrumentedRwLock") | ||
| .field("inner", &self.inner) | ||
| .finish() | ||
| } | ||
| } | ||
|
|
||
| // --------------------------------------------------------------------------- | ||
| // Read guard | ||
| // --------------------------------------------------------------------------- | ||
|
|
||
| /// Shared read guard. `Deref<Target = T>`. With `lock-stats` enabled, | ||
| /// records the hold time of this acquisition on `Drop`. | ||
| pub struct ReadGuard<'a, T> { | ||
| inner: TokioReadGuard<'a, T>, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc<LockStats>, | ||
| #[cfg(feature = "lock-stats")] | ||
| tag: &'static str, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant, | ||
| } | ||
|
|
||
| impl<T> Deref for ReadGuard<'_, T> { | ||
| type Target = T; | ||
|
|
||
| #[inline] | ||
| fn deref(&self) -> &T { | ||
| &self.inner | ||
| } | ||
| } | ||
|
|
||
| #[cfg(feature = "lock-stats")] | ||
| impl<T> Drop for ReadGuard<'_, T> { | ||
| fn drop(&mut self) { | ||
| let held_ns = self.acquired_at.elapsed().as_nanos() as u64; | ||
| self.stats.record_read_released(self.tag, held_ns); | ||
| } | ||
| } | ||
|
|
||
| // --------------------------------------------------------------------------- | ||
| // Write guard | ||
| // --------------------------------------------------------------------------- | ||
|
|
||
| /// Exclusive write guard. `Deref<Target = T>` + `DerefMut<Target = T>`. | ||
| /// With `lock-stats` enabled, records the hold time of this acquisition | ||
| /// on `Drop`. | ||
| pub struct WriteGuard<'a, T> { | ||
| inner: TokioWriteGuard<'a, T>, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc<LockStats>, | ||
| #[cfg(feature = "lock-stats")] | ||
| tag: &'static str, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant, | ||
| } | ||
|
|
||
| impl<T> Deref for WriteGuard<'_, T> { | ||
| type Target = T; | ||
|
|
||
| #[inline] | ||
| fn deref(&self) -> &T { | ||
| &self.inner | ||
| } | ||
| } | ||
|
|
||
| impl<T> DerefMut for WriteGuard<'_, T> { | ||
| #[inline] | ||
| fn deref_mut(&mut self) -> &mut T { | ||
| &mut self.inner | ||
| } | ||
| } | ||
|
|
||
| #[cfg(feature = "lock-stats")] | ||
| impl<T> Drop for WriteGuard<'_, T> { | ||
| fn drop(&mut self) { | ||
| let held_ns = self.acquired_at.elapsed().as_nanos() as u64; | ||
| self.stats.record_write_released(self.tag, held_ns); | ||
| } |
There was a problem hiding this comment.
🟡 Suggestion: Stats bookkeeping runs while the Tokio guard is held, perturbing the timings the feature is meant to measure
On the acquire path (read_at/write_at, mod.rs:161-186), the code grabs the inner tokio::sync::RwLock and only then calls record_*_acquired, which goes through LockStats::site() and takes the shared parking_lot::Mutex<BTreeMap<…>> behind per_tag. On the release path (Drop for ReadGuard/WriteGuard, mod.rs:371-376 and 412-416), the custom Drop body executes before the inner TokioReadGuard/TokioWriteGuard field is dropped, so record_*_released again takes that same mutex while the wallet_manager lock is still held.
Two concrete consequences for the diagnostic itself:
- Concurrent unlocks/locks serialize on the per-tag mutex (and on
snapshot(), which holds it for the full BTreeMap iteration with 8 atomic loads per tag at stats.rs:46-57). The instrument becomes a global serialization point that lengthens the wallet_manager critical section and inflates the very wait/hold numbers the user is reading. - The Drop path re-runs the BTreeMap lookup (
site(tag)) even though the acquire path already produced the matchingArc<SiteCounters>. That Arc could be stored on the guard alongsideacquired_at, eliminating the per-Drop mutex acquisition entirely.
A combined fix: return the Arc<SiteCounters> from record_*_acquired, store it on the guard, and have record_*_released take &SiteCounters directly so the Drop path skips both the BTreeMap and the per-tag mutex. This both halves the global-mutex traffic and (because the Drop body finishes before the inner guard is released) shrinks the window where stats bookkeeping overlaps the wallet_manager critical section. If the goal is to remove the overlap entirely on the release side, wrap inner in Option/ManuallyDrop and explicitly release it before recording.
source: ['claude', 'codex']
🤖 Fix this with AI agents
These findings are from an automated code review. Verify each finding against the current code and only fix it if needed.
In `packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs`:
- [SUGGESTION] lines 158-416: Stats bookkeeping runs while the Tokio guard is held, perturbing the timings the feature is meant to measure
On the acquire path (`read_at`/`write_at`, mod.rs:161-186), the code grabs the inner `tokio::sync::RwLock` and only then calls `record_*_acquired`, which goes through `LockStats::site()` and takes the shared `parking_lot::Mutex<BTreeMap<…>>` behind `per_tag`. On the release path (`Drop` for `ReadGuard`/`WriteGuard`, mod.rs:371-376 and 412-416), the custom `Drop` body executes before the inner `TokioReadGuard`/`TokioWriteGuard` field is dropped, so `record_*_released` again takes that same mutex while the wallet_manager lock is still held.
Two concrete consequences for the diagnostic itself:
1. Concurrent unlocks/locks serialize on the per-tag mutex (and on `snapshot()`, which holds it for the full BTreeMap iteration with 8 atomic loads per tag at stats.rs:46-57). The instrument becomes a global serialization point that lengthens the wallet_manager critical section and inflates the very wait/hold numbers the user is reading.
2. The Drop path re-runs the BTreeMap lookup (`site(tag)`) even though the acquire path already produced the matching `Arc<SiteCounters>`. That Arc could be stored on the guard alongside `acquired_at`, eliminating the per-Drop mutex acquisition entirely.
A combined fix: return the `Arc<SiteCounters>` from `record_*_acquired`, store it on the guard, and have `record_*_released` take `&SiteCounters` directly so the Drop path skips both the BTreeMap and the per-tag mutex. This both halves the global-mutex traffic and (because the Drop body finishes before the inner guard is released) shrinks the window where stats bookkeeping overlaps the wallet_manager critical section. If the goal is to remove the overlap entirely on the release side, wrap `inner` in `Option`/`ManuallyDrop` and explicitly release it before recording.
| pub async fn read_at(&self, tag: &'static str) -> ReadGuard<'_, T> { | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.read().await; | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_read_acquired(tag, wait_ns); | ||
| ReadGuard { | ||
| inner, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc::clone(&self.stats), | ||
| #[cfg(feature = "lock-stats")] | ||
| tag, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } | ||
|
|
||
| /// Acquire an exclusive (write) lock with a per-call-site tag. | ||
| /// See [`read_at`](Self::read_at) for the tag semantics. | ||
| pub async fn write_at(&self, tag: &'static str) -> WriteGuard<'_, T> { | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.write().await; | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_write_acquired(tag, wait_ns); | ||
| WriteGuard { | ||
| inner, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc::clone(&self.stats), | ||
| #[cfg(feature = "lock-stats")] | ||
| tag, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } | ||
|
|
||
| // ----------------------------------------------------------------- | ||
| // Non-blocking try-acquire | ||
| // ----------------------------------------------------------------- | ||
|
|
||
| /// Try to acquire a shared lock without waiting. | ||
| /// Returns `Err(TryLockError)` if the lock is currently held | ||
| /// exclusively. Equivalent to [`tokio::sync::RwLock::try_read`] when | ||
| /// `lock-stats` is off; otherwise also records under [`UNTAGGED`]. | ||
| #[inline] | ||
| pub fn try_read(&self) -> Result<ReadGuard<'_, T>, TryLockError> { | ||
| self.try_read_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Try to acquire an exclusive lock without waiting. | ||
| #[inline] | ||
| pub fn try_write(&self) -> Result<WriteGuard<'_, T>, TryLockError> { | ||
| self.try_write_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Tagged variant of [`try_read`](Self::try_read). On | ||
| /// `Err(TryLockError)` the contention is recorded under `tag` | ||
| /// (when `lock-stats` is on). | ||
| pub fn try_read_at(&self, tag: &'static str) -> Result<ReadGuard<'_, T>, TryLockError> { | ||
| match self.inner.try_read() { | ||
| Ok(inner) => { | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_read_acquired(tag, 0); | ||
| Ok(ReadGuard { | ||
| inner, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc::clone(&self.stats), | ||
| #[cfg(feature = "lock-stats")] | ||
| tag, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant::now(), | ||
| }) | ||
| } | ||
| Err(e) => { | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_read_contended(tag); | ||
| #[cfg(not(feature = "lock-stats"))] | ||
| let _ = tag; | ||
| Err(e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Tagged variant of [`try_write`](Self::try_write). | ||
| pub fn try_write_at(&self, tag: &'static str) -> Result<WriteGuard<'_, T>, TryLockError> { | ||
| match self.inner.try_write() { | ||
| Ok(inner) => { | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_write_acquired(tag, 0); | ||
| Ok(WriteGuard { | ||
| inner, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc::clone(&self.stats), | ||
| #[cfg(feature = "lock-stats")] | ||
| tag, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant::now(), | ||
| }) | ||
| } | ||
| Err(e) => { | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_write_contended(tag); | ||
| #[cfg(not(feature = "lock-stats"))] | ||
| let _ = tag; | ||
| Err(e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // ----------------------------------------------------------------- | ||
| // Sync (blocking) acquire — for FFI bridges and Drop paths | ||
| // ----------------------------------------------------------------- | ||
|
|
||
| /// Synchronously acquire a shared lock. Must NOT be called from a | ||
| /// tokio runtime thread (will panic). Mirrors | ||
| /// [`tokio::sync::RwLock::blocking_read`]. | ||
| #[inline] | ||
| pub fn blocking_read(&self) -> ReadGuard<'_, T> { | ||
| self.blocking_read_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Synchronously acquire an exclusive lock. Must NOT be called from | ||
| /// a tokio runtime thread. | ||
| #[inline] | ||
| pub fn blocking_write(&self) -> WriteGuard<'_, T> { | ||
| self.blocking_write_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Tagged variant of [`blocking_read`](Self::blocking_read). | ||
| pub fn blocking_read_at(&self, tag: &'static str) -> ReadGuard<'_, T> { | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.blocking_read(); | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_read_acquired(tag, wait_ns); | ||
| ReadGuard { | ||
| inner, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc::clone(&self.stats), | ||
| #[cfg(feature = "lock-stats")] | ||
| tag, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } | ||
|
|
||
| /// Tagged variant of [`blocking_write`](Self::blocking_write). | ||
| pub fn blocking_write_at(&self, tag: &'static str) -> WriteGuard<'_, T> { | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.blocking_write(); | ||
| #[cfg(feature = "lock-stats")] | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_write_acquired(tag, wait_ns); | ||
| WriteGuard { | ||
| inner, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc::clone(&self.stats), | ||
| #[cfg(feature = "lock-stats")] | ||
| tag, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } |
There was a problem hiding this comment.
🟡 Suggestion: *_at variants are not #[inline], weakening the zero-overhead claim in non-LTO builds
The PR documents that with lock-stats off, call-site code is byte-for-byte identical to direct tokio::sync::RwLock use. That depends on the compiler inlining read_at/write_at/try_*_at/blocking_*_at, but only the no-arg wrappers (read, write, try_read, try_write, blocking_read, blocking_write) carry #[inline]. The *_at variants — which are the workhorse, since the unmarked variants delegate to them — do not. With the feature off, those bodies collapse to a single self.inner.read().await-equivalent, so adding #[inline] makes the zero-overhead guarantee hold across all profiles (including non-LTO debug). Mechanical, no-cost change.
source: ['claude']
🤖 Fix this with AI agents
These findings are from an automated code review. Verify each finding against the current code and only fix it if needed.
In `packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs`:
- [SUGGESTION] lines 158-328: `*_at` variants are not `#[inline]`, weakening the zero-overhead claim in non-LTO builds
The PR documents that with `lock-stats` off, call-site code is byte-for-byte identical to direct `tokio::sync::RwLock` use. That depends on the compiler inlining `read_at`/`write_at`/`try_*_at`/`blocking_*_at`, but only the no-arg wrappers (`read`, `write`, `try_read`, `try_write`, `blocking_read`, `blocking_write`) carry `#[inline]`. The `*_at` variants — which are the workhorse, since the unmarked variants delegate to them — do not. With the feature off, those bodies collapse to a single `self.inner.read().await`-equivalent, so adding `#[inline]` makes the zero-overhead guarantee hold across all profiles (including non-LTO debug). Mechanical, no-cost change.
| # `write_at("tag")`). With the feature off the wrapper is a transparent | ||
| # type alias for `tokio::sync::RwLock` and there is zero runtime cost. |
There was a problem hiding this comment.
💬 Nitpick: Cargo.toml comment misstates the off-mode shape as a 'transparent type alias'
The comment says: 'With the feature off the wrapper is a transparent type alias for tokio::sync::RwLock and there is zero runtime cost.' InstrumentedRwLock<T> is always a struct holding inner: Arc<TokioRwLock<T>>, even with the feature off — it's a thin newtype, not a type alias, and Arc<InstrumentedRwLock<T>> adds an extra Arc indirection vs. plain Arc<TokioRwLock<T>>. The PR description's 'byte-for-byte identical after inlining' framing is accurate; the Cargo.toml summary should mirror it (e.g. 'a thin newtype around tokio::sync::RwLock that compiles down to direct delegation').
source: ['claude']
| pub fn snapshot(&self) -> Snapshot { | ||
| let per_tag: BTreeMap<&'static str, SiteStats> = self | ||
| .per_tag | ||
| .lock() | ||
| .iter() | ||
| .map(|(tag, counters)| (*tag, counters.snapshot())) | ||
| .collect(); | ||
| Snapshot { | ||
| total: self.total.snapshot(), | ||
| per_tag, | ||
| } | ||
| } |
There was a problem hiding this comment.
💬 Nitpick: snapshot() holds the per_tag mutex while doing 8 atomic loads per tag
LockStats::snapshot iterates the BTreeMap with the per_tag mutex held, calling counters.snapshot() (8 atomic loads) inside the iteration. With many tags every live acquire/release stalls behind the snapshot. A two-pass implementation — clone the Arc<SiteCounters> values into a Vec under the lock, call .snapshot() outside it — fixes this cheaply. Flagging only because the docs claim the snapshot is 'cheap enough to call from a debug UI on every refresh', a claim that weakens as tag count grows.
source: ['claude']
| pub use instrumented_lock::{InstrumentedRwLock, ReadGuard, WriteGuard}; | ||
|
|
||
| #[cfg(feature = "lock-stats")] | ||
| pub use instrumented_lock::{LockStats, SiteStats}; |
There was a problem hiding this comment.
💬 Nitpick: Snapshot is not re-exported alongside LockStats/SiteStats in the diagnostics module root
diagnostics::mod.rs re-exports LockStats and SiteStats but not Snapshot, even though Snapshot is the public return type of LockStats::snapshot(). External callers can still reach it via diagnostics::instrumented_lock::Snapshot, but the asymmetry is awkward. Either add Snapshot to the pub use list under #[cfg(feature = "lock-stats")], or document the deliberate scoping.
source: ['claude']
| pub fn try_read_at(&self, tag: &'static str) -> Result<ReadGuard<'_, T>, TryLockError> { | ||
| match self.inner.try_read() { | ||
| Ok(inner) => { | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_read_acquired(tag, 0); | ||
| Ok(ReadGuard { | ||
| inner, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc::clone(&self.stats), | ||
| #[cfg(feature = "lock-stats")] | ||
| tag, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant::now(), | ||
| }) | ||
| } | ||
| Err(e) => { | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_read_contended(tag); | ||
| #[cfg(not(feature = "lock-stats"))] | ||
| let _ = tag; | ||
| Err(e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Tagged variant of [`try_write`](Self::try_write). | ||
| pub fn try_write_at(&self, tag: &'static str) -> Result<WriteGuard<'_, T>, TryLockError> { | ||
| match self.inner.try_write() { | ||
| Ok(inner) => { | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_write_acquired(tag, 0); | ||
| Ok(WriteGuard { | ||
| inner, | ||
| #[cfg(feature = "lock-stats")] | ||
| stats: Arc::clone(&self.stats), | ||
| #[cfg(feature = "lock-stats")] | ||
| tag, | ||
| #[cfg(feature = "lock-stats")] | ||
| acquired_at: Instant::now(), | ||
| }) | ||
| } | ||
| Err(e) => { | ||
| #[cfg(feature = "lock-stats")] | ||
| self.stats.record_write_contended(tag); | ||
| #[cfg(not(feature = "lock-stats"))] | ||
| let _ = tag; | ||
| Err(e) | ||
| } | ||
| } |
There was a problem hiding this comment.
💬 Nitpick: try_*_at records wait_ns: 0 on success — clarify the metric semantics
On a successful try_read_at/try_write_at the recorded wait_ns is 0. Whether this is correct depends on what 'wait' is intended to capture: 'time spent contended' makes 0 right, 'time inside the API call' makes it slightly low. A one-line doc-comment clarifying the intent would prevent users from mis-aggregating wait_ns_mean across a workload heavy in try_*_at successes.
source: ['claude']
thepastaclaw
left a comment
There was a problem hiding this comment.
Code Review
PR #3557 adds an opt-in InstrumentedRwLock<T> wrapper gated on a new lock-stats Cargo feature. With the feature off, the wrapper is a literal type alias for tokio::sync::RwLock<T>, so the zero-cost claim is genuine and the 21 call-site migrations are mechanical. With lock-stats on, the bookkeeping path itself perturbs the measurement (stats mutex re-acquired in guard Drop while the inner Tokio guard is still alive), and read_hold_ns_mean() divides by acquisitions rather than completed releases — biased low precisely when the feature is needed. None are consensus-critical (diagnostics-only, opt-in), but several materially weaken the trustworthiness of the metric the PR was written to surface.
Reviewed commit: 952ded1
🟡 5 suggestion(s) | 💬 2 nitpick(s)
🤖 Prompt for all review comments with AI agents
These findings are from an automated code review. Verify each finding against the current code and only fix it if needed.
In `packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs`:
- [SUGGESTION] lines 405-442: Guard `Drop` records stats while the inner Tokio guard is still held
Both `ReadGuard::drop` (lines 405–410) and `WriteGuard::drop` (lines 437–442) call `self.stats.record_*_released(...)` before `self.inner: TokioReadGuard/TokioWriteGuard` is dropped — Rust runs the user-defined `Drop::drop` body first and only destructs fields after it returns. `record_*_released` calls `LockStats::site(tag)`, which takes `parking_lot::Mutex<BTreeMap<…>>` (and may insert under contention). That extends the wallet_manager critical section by a sync mutex acquire plus a BTreeMap walk on every guard release, and concurrent unlocks serialize on the stats mutex while the Tokio rwlock is still held — self-inflicted contention indistinguishable from real wallet pressure in the very metric this feature exists to measure. Releasing the inner Tokio guard before touching `LockStats` (e.g. wrap `inner` in `ManuallyDrop`/`Option`, take it explicitly, then record stats) restores faithful measurement.
- [SUGGESTION] lines 543-593: No test asserts that `record_*_released` fires on guard `Drop`
The `lock-stats` tests cover acquire-side counters (`read_acquired`, `write_acquired`, `read_contended`) and tag attribution, but nothing asserts that `record_*_released` actually runs on `Drop`. If a future refactor of `ReadGuard`/`WriteGuard` re-orders fields, removes the custom `Drop`, or wraps the inner guard in a way that suppresses the impl, every existing assertion still passes while `read_hold_ns_total`/`write_hold_ns_total` silently stay at 0 — the diagnostic loses half its signal with no test failure. A single-line addition to `stats_count_and_attribute_to_tag` — `tokio::time::sleep(Duration::from_micros(1)).await` while holding a guard, then `assert!(ours.read_hold_ns_total > 0)` after drop — catches that regression cheaply. Worth doing because hold time is the headline metric this feature was built to expose.
In `packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs`:
- [SUGGESTION] lines 83-116: Drop path re-acquires the per-tag mutex; cache the `Arc<SiteCounters>` on the guard
Acquire paths already call `site(tag)` (lines 77, 103) to bump the per-tag atomics, taking the `parking_lot::Mutex<BTreeMap<…>>` and walking the map. The acquire path discards the `Arc<SiteCounters>` it produced, so `record_read_released`/`record_write_released` (lines 87, 113) re-take the same mutex and re-walk the BTreeMap on every guard drop. That doubles per-acquisition global-mutex traffic and puts a `BTreeMap` lookup on the hot release path. Since `ReadGuard`/`WriteGuard` already carry `Arc<LockStats>` and `tag`, return the `Arc<SiteCounters>` from `record_*_acquired` and store it on the guard so the release path can bump atomics on `&SiteCounters` directly without touching the global mutex. This compounds with the previous finding — fixing both is what makes the metric trustworthy.
- [SUGGESTION] lines 210-226: Hold-time means divide by acquisitions, not completed releases — biased low under load
`read_hold_ns_total`/`write_hold_ns_total` are only incremented in `record_*_released` (lines 89, 115), but `read_hold_ns_mean()` (line 211) and `write_hold_ns_mean()` (line 220) divide by `read_acquired`/`write_acquired`, which are bumped on acquire (lines 78, 104). Any snapshot taken with N still-held guards therefore over-counts the divisor by N relative to the dividend, biasing the reported mean downward. In the simplest case — a single live read guard held for one second when the snapshot fires — `read_hold_ns_mean()` returns `Some(0)` instead of `None`. For a guard held during sync (multiple seconds), this can roughly halve the apparent mean — and snapshots taken mid-burst, the very situation the diagnostic exists to surface, are exactly when the bias is largest. Either (a) maintain `read_released`/`write_released` counters incremented in `record_*_released` and divide by those, or (b) document explicitly that the mean reflects only completed acquisitions and may understate while guards remain live.
- [SUGGESTION] lines 46-57: `snapshot()` holds the per-tag mutex across all atomic loads per tag
`LockStats::snapshot` keeps `per_tag.lock()` held for the full iteration and calls `counters.snapshot()` (8 atomic loads per `SiteCounters`) inside the iteration. Every concurrent acquire/release that needs `site()` stalls behind the snapshot for `tags × 8 atomic loads`. The doc comment claims snapshot is "cheap enough to call from a debug UI on every refresh" (line 43) — that holds for a handful of tags but degrades linearly with tag count under contention. A two-pass shape — clone `Arc<SiteCounters>` values into a `Vec` under the lock, drop the lock, then call `.snapshot()` outside — preserves the public API while removing the atomic loads from the critical section.
| impl<T> Drop for ReadGuard<'_, T> { | ||
| fn drop(&mut self) { | ||
| let held_ns = self.acquired_at.elapsed().as_nanos() as u64; | ||
| self.stats.record_read_released(self.tag, held_ns); | ||
| } | ||
| } | ||
|
|
||
| /// Exclusive write guard. `Deref<Target = T>` + `DerefMut`. Records | ||
| /// hold time on `Drop`. | ||
| pub struct WriteGuard<'a, T> { | ||
| inner: TokioWriteGuard<'a, T>, | ||
| stats: Arc<LockStats>, | ||
| tag: &'static str, | ||
| acquired_at: Instant, | ||
| } | ||
|
|
||
| impl<T> Deref for WriteGuard<'_, T> { | ||
| type Target = T; | ||
|
|
||
| #[inline] | ||
| fn deref(&self) -> &T { | ||
| &self.inner | ||
| } | ||
| } | ||
|
|
||
| impl<T> DerefMut for WriteGuard<'_, T> { | ||
| #[inline] | ||
| fn deref_mut(&mut self) -> &mut T { | ||
| &mut self.inner | ||
| } | ||
| } | ||
|
|
||
| impl<T> Drop for WriteGuard<'_, T> { | ||
| fn drop(&mut self) { | ||
| let held_ns = self.acquired_at.elapsed().as_nanos() as u64; | ||
| self.stats.record_write_released(self.tag, held_ns); | ||
| } | ||
| } |
There was a problem hiding this comment.
🟡 Suggestion: Guard Drop records stats while the inner Tokio guard is still held
Both ReadGuard::drop (lines 405–410) and WriteGuard::drop (lines 437–442) call self.stats.record_*_released(...) before self.inner: TokioReadGuard/TokioWriteGuard is dropped — Rust runs the user-defined Drop::drop body first and only destructs fields after it returns. record_*_released calls LockStats::site(tag), which takes parking_lot::Mutex<BTreeMap<…>> (and may insert under contention). That extends the wallet_manager critical section by a sync mutex acquire plus a BTreeMap walk on every guard release, and concurrent unlocks serialize on the stats mutex while the Tokio rwlock is still held — self-inflicted contention indistinguishable from real wallet pressure in the very metric this feature exists to measure. Releasing the inner Tokio guard before touching LockStats (e.g. wrap inner in ManuallyDrop/Option, take it explicitly, then record stats) restores faithful measurement.
source: ['claude', 'codex']
🤖 Fix this with AI agents
These findings are from an automated code review. Verify each finding against the current code and only fix it if needed.
In `packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs`:
- [SUGGESTION] lines 405-442: Guard `Drop` records stats while the inner Tokio guard is still held
Both `ReadGuard::drop` (lines 405–410) and `WriteGuard::drop` (lines 437–442) call `self.stats.record_*_released(...)` before `self.inner: TokioReadGuard/TokioWriteGuard` is dropped — Rust runs the user-defined `Drop::drop` body first and only destructs fields after it returns. `record_*_released` calls `LockStats::site(tag)`, which takes `parking_lot::Mutex<BTreeMap<…>>` (and may insert under contention). That extends the wallet_manager critical section by a sync mutex acquire plus a BTreeMap walk on every guard release, and concurrent unlocks serialize on the stats mutex while the Tokio rwlock is still held — self-inflicted contention indistinguishable from real wallet pressure in the very metric this feature exists to measure. Releasing the inner Tokio guard before touching `LockStats` (e.g. wrap `inner` in `ManuallyDrop`/`Option`, take it explicitly, then record stats) restores faithful measurement.
| pub(super) fn record_read_released(&self, tag: &'static str, held_ns: u64) { | ||
| self.total | ||
| .read_hold_ns_total | ||
| .fetch_add(held_ns, Ordering::Relaxed); | ||
| let site = self.site(tag); | ||
| site.read_hold_ns_total | ||
| .fetch_add(held_ns, Ordering::Relaxed); | ||
| } | ||
|
|
||
| pub(super) fn record_read_contended(&self, tag: &'static str) { | ||
| self.total.read_contended.fetch_add(1, Ordering::Relaxed); | ||
| let site = self.site(tag); | ||
| site.read_contended.fetch_add(1, Ordering::Relaxed); | ||
| } | ||
|
|
||
| pub(super) fn record_write_acquired(&self, tag: &'static str, wait_ns: u64) { | ||
| self.total.write_acquired.fetch_add(1, Ordering::Relaxed); | ||
| self.total | ||
| .write_wait_ns_total | ||
| .fetch_add(wait_ns, Ordering::Relaxed); | ||
| let site = self.site(tag); | ||
| site.write_acquired.fetch_add(1, Ordering::Relaxed); | ||
| site.write_wait_ns_total | ||
| .fetch_add(wait_ns, Ordering::Relaxed); | ||
| } | ||
|
|
||
| pub(super) fn record_write_released(&self, tag: &'static str, held_ns: u64) { | ||
| self.total | ||
| .write_hold_ns_total | ||
| .fetch_add(held_ns, Ordering::Relaxed); | ||
| let site = self.site(tag); | ||
| site.write_hold_ns_total | ||
| .fetch_add(held_ns, Ordering::Relaxed); | ||
| } |
There was a problem hiding this comment.
🟡 Suggestion: Drop path re-acquires the per-tag mutex; cache the Arc<SiteCounters> on the guard
Acquire paths already call site(tag) (lines 77, 103) to bump the per-tag atomics, taking the parking_lot::Mutex<BTreeMap<…>> and walking the map. The acquire path discards the Arc<SiteCounters> it produced, so record_read_released/record_write_released (lines 87, 113) re-take the same mutex and re-walk the BTreeMap on every guard drop. That doubles per-acquisition global-mutex traffic and puts a BTreeMap lookup on the hot release path. Since ReadGuard/WriteGuard already carry Arc<LockStats> and tag, return the Arc<SiteCounters> from record_*_acquired and store it on the guard so the release path can bump atomics on &SiteCounters directly without touching the global mutex. This compounds with the previous finding — fixing both is what makes the metric trustworthy.
source: ['claude', 'codex']
🤖 Fix this with AI agents
These findings are from an automated code review. Verify each finding against the current code and only fix it if needed.
In `packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs`:
- [SUGGESTION] lines 83-116: Drop path re-acquires the per-tag mutex; cache the `Arc<SiteCounters>` on the guard
Acquire paths already call `site(tag)` (lines 77, 103) to bump the per-tag atomics, taking the `parking_lot::Mutex<BTreeMap<…>>` and walking the map. The acquire path discards the `Arc<SiteCounters>` it produced, so `record_read_released`/`record_write_released` (lines 87, 113) re-take the same mutex and re-walk the BTreeMap on every guard drop. That doubles per-acquisition global-mutex traffic and puts a `BTreeMap` lookup on the hot release path. Since `ReadGuard`/`WriteGuard` already carry `Arc<LockStats>` and `tag`, return the `Arc<SiteCounters>` from `record_*_acquired` and store it on the guard so the release path can bump atomics on `&SiteCounters` directly without touching the global mutex. This compounds with the previous finding — fixing both is what makes the metric trustworthy.
| /// Mean hold time for read acquisitions, in nanoseconds. | ||
| pub fn read_hold_ns_mean(&self) -> Option<u64> { | ||
| if self.read_acquired == 0 { | ||
| None | ||
| } else { | ||
| Some(self.read_hold_ns_total / self.read_acquired) | ||
| } | ||
| } | ||
|
|
||
| /// Mean hold time for write acquisitions, in nanoseconds. | ||
| pub fn write_hold_ns_mean(&self) -> Option<u64> { | ||
| if self.write_acquired == 0 { | ||
| None | ||
| } else { | ||
| Some(self.write_hold_ns_total / self.write_acquired) | ||
| } | ||
| } |
There was a problem hiding this comment.
🟡 Suggestion: Hold-time means divide by acquisitions, not completed releases — biased low under load
read_hold_ns_total/write_hold_ns_total are only incremented in record_*_released (lines 89, 115), but read_hold_ns_mean() (line 211) and write_hold_ns_mean() (line 220) divide by read_acquired/write_acquired, which are bumped on acquire (lines 78, 104). Any snapshot taken with N still-held guards therefore over-counts the divisor by N relative to the dividend, biasing the reported mean downward. In the simplest case — a single live read guard held for one second when the snapshot fires — read_hold_ns_mean() returns Some(0) instead of None. For a guard held during sync (multiple seconds), this can roughly halve the apparent mean — and snapshots taken mid-burst, the very situation the diagnostic exists to surface, are exactly when the bias is largest. Either (a) maintain read_released/write_released counters incremented in record_*_released and divide by those, or (b) document explicitly that the mean reflects only completed acquisitions and may understate while guards remain live.
source: ['claude', 'codex']
🤖 Fix this with AI agents
These findings are from an automated code review. Verify each finding against the current code and only fix it if needed.
In `packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs`:
- [SUGGESTION] lines 210-226: Hold-time means divide by acquisitions, not completed releases — biased low under load
`read_hold_ns_total`/`write_hold_ns_total` are only incremented in `record_*_released` (lines 89, 115), but `read_hold_ns_mean()` (line 211) and `write_hold_ns_mean()` (line 220) divide by `read_acquired`/`write_acquired`, which are bumped on acquire (lines 78, 104). Any snapshot taken with N still-held guards therefore over-counts the divisor by N relative to the dividend, biasing the reported mean downward. In the simplest case — a single live read guard held for one second when the snapshot fires — `read_hold_ns_mean()` returns `Some(0)` instead of `None`. For a guard held during sync (multiple seconds), this can roughly halve the apparent mean — and snapshots taken mid-burst, the very situation the diagnostic exists to surface, are exactly when the bias is largest. Either (a) maintain `read_released`/`write_released` counters incremented in `record_*_released` and divide by those, or (b) document explicitly that the mean reflects only completed acquisitions and may understate while guards remain live.
| pub fn snapshot(&self) -> Snapshot { | ||
| let per_tag: BTreeMap<&'static str, SiteStats> = self | ||
| .per_tag | ||
| .lock() | ||
| .iter() | ||
| .map(|(tag, counters)| (*tag, counters.snapshot())) | ||
| .collect(); | ||
| Snapshot { | ||
| total: self.total.snapshot(), | ||
| per_tag, | ||
| } | ||
| } |
There was a problem hiding this comment.
🟡 Suggestion: snapshot() holds the per-tag mutex across all atomic loads per tag
LockStats::snapshot keeps per_tag.lock() held for the full iteration and calls counters.snapshot() (8 atomic loads per SiteCounters) inside the iteration. Every concurrent acquire/release that needs site() stalls behind the snapshot for tags × 8 atomic loads. The doc comment claims snapshot is "cheap enough to call from a debug UI on every refresh" (line 43) — that holds for a handful of tags but degrades linearly with tag count under contention. A two-pass shape — clone Arc<SiteCounters> values into a Vec under the lock, drop the lock, then call .snapshot() outside — preserves the public API while removing the atomic loads from the critical section.
source: ['claude', 'codex']
🤖 Fix this with AI agents
These findings are from an automated code review. Verify each finding against the current code and only fix it if needed.
In `packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs`:
- [SUGGESTION] lines 46-57: `snapshot()` holds the per-tag mutex across all atomic loads per tag
`LockStats::snapshot` keeps `per_tag.lock()` held for the full iteration and calls `counters.snapshot()` (8 atomic loads per `SiteCounters`) inside the iteration. Every concurrent acquire/release that needs `site()` stalls behind the snapshot for `tags × 8 atomic loads`. The doc comment claims snapshot is "cheap enough to call from a debug UI on every refresh" (line 43) — that holds for a handful of tags but degrades linearly with tag count under contention. A two-pass shape — clone `Arc<SiteCounters>` values into a `Vec` under the lock, drop the lock, then call `.snapshot()` outside — preserves the public API while removing the atomic loads from the critical section.
| #[cfg(feature = "lock-stats")] | ||
| #[tokio::test] | ||
| async fn stats_count_and_attribute_to_tag() { | ||
| let lock = InstrumentedRwLock::new(0u32); | ||
|
|
||
| // Two reads tagged "ours", one write tagged "theirs". | ||
| { | ||
| let _r1 = lock.read_at("ours").await; | ||
| let _r2 = lock.read_at("ours").await; | ||
| } | ||
| { | ||
| let _w = lock.write_at("theirs").await; | ||
| } | ||
|
|
||
| let snap = lock.stats().snapshot(); | ||
| let ours = snap.per_tag.get("ours").expect("ours tag present"); | ||
| assert_eq!(ours.read_acquired, 2); | ||
| assert_eq!(ours.write_acquired, 0); | ||
| let theirs = snap.per_tag.get("theirs").expect("theirs tag present"); | ||
| assert_eq!(theirs.read_acquired, 0); | ||
| assert_eq!(theirs.write_acquired, 1); | ||
| assert_eq!(snap.total.read_acquired, 2); | ||
| assert_eq!(snap.total.write_acquired, 1); | ||
| } | ||
|
|
||
| // Untagged calls go to the UNTAGGED bucket so the snapshot still | ||
| // accounts for them — we don't want acquisitions to vanish. | ||
| #[cfg(feature = "lock-stats")] | ||
| #[tokio::test] | ||
| async fn untagged_calls_go_to_untagged_bucket() { | ||
| let lock = InstrumentedRwLock::new(0u32); | ||
| { | ||
| let _r = lock.read().await; | ||
| } | ||
| let snap = lock.stats().snapshot(); | ||
| let untagged = snap.per_tag.get(UNTAGGED).expect("UNTAGGED bucket"); | ||
| assert_eq!(untagged.read_acquired, 1); | ||
| } | ||
|
|
||
| #[cfg(feature = "lock-stats")] | ||
| #[tokio::test] | ||
| async fn try_read_failure_records_contention() { | ||
| let lock = InstrumentedRwLock::new(0u32); | ||
| let _w = lock.write_at("holder").await; | ||
| let r = lock.try_read_at("contender"); | ||
| assert!(r.is_err()); | ||
| let snap = lock.stats().snapshot(); | ||
| let contender = snap.per_tag.get("contender").expect("contender tag"); | ||
| assert_eq!(contender.read_contended, 1); | ||
| assert_eq!(contender.read_acquired, 0); | ||
| } |
There was a problem hiding this comment.
🟡 Suggestion: No test asserts that record_*_released fires on guard Drop
The lock-stats tests cover acquire-side counters (read_acquired, write_acquired, read_contended) and tag attribution, but nothing asserts that record_*_released actually runs on Drop. If a future refactor of ReadGuard/WriteGuard re-orders fields, removes the custom Drop, or wraps the inner guard in a way that suppresses the impl, every existing assertion still passes while read_hold_ns_total/write_hold_ns_total silently stay at 0 — the diagnostic loses half its signal with no test failure. A single-line addition to stats_count_and_attribute_to_tag — tokio::time::sleep(Duration::from_micros(1)).await while holding a guard, then assert!(ours.read_hold_ns_total > 0) after drop — catches that regression cheaply. Worth doing because hold time is the headline metric this feature was built to expose.
source: ['claude', 'codex']
🤖 Fix this with AI agents
These findings are from an automated code review. Verify each finding against the current code and only fix it if needed.
In `packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs`:
- [SUGGESTION] lines 543-593: No test asserts that `record_*_released` fires on guard `Drop`
The `lock-stats` tests cover acquire-side counters (`read_acquired`, `write_acquired`, `read_contended`) and tag attribution, but nothing asserts that `record_*_released` actually runs on `Drop`. If a future refactor of `ReadGuard`/`WriteGuard` re-orders fields, removes the custom `Drop`, or wraps the inner guard in a way that suppresses the impl, every existing assertion still passes while `read_hold_ns_total`/`write_hold_ns_total` silently stay at 0 — the diagnostic loses half its signal with no test failure. A single-line addition to `stats_count_and_attribute_to_tag` — `tokio::time::sleep(Duration::from_micros(1)).await` while holding a guard, then `assert!(ours.read_hold_ns_total > 0)` after drop — catches that regression cheaply. Worth doing because hold time is the headline metric this feature was built to expose.
| pub fn try_read_at(&self, tag: &'static str) -> Result<ReadGuard<'_, T>, TryLockError> { | ||
| match self.inner.try_read() { | ||
| Ok(inner) => { | ||
| self.stats.record_read_acquired(tag, 0); | ||
| Ok(ReadGuard { | ||
| inner, | ||
| stats: Arc::clone(&self.stats), | ||
| tag, | ||
| acquired_at: Instant::now(), | ||
| }) | ||
| } | ||
| Err(e) => { | ||
| self.stats.record_read_contended(tag); | ||
| Err(e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Tagged variant of [`try_write`](Self::try_write). | ||
| pub fn try_write_at(&self, tag: &'static str) -> Result<WriteGuard<'_, T>, TryLockError> { | ||
| match self.inner.try_write() { | ||
| Ok(inner) => { | ||
| self.stats.record_write_acquired(tag, 0); | ||
| Ok(WriteGuard { | ||
| inner, | ||
| stats: Arc::clone(&self.stats), | ||
| tag, | ||
| acquired_at: Instant::now(), | ||
| }) | ||
| } | ||
| Err(e) => { | ||
| self.stats.record_write_contended(tag); | ||
| Err(e) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
💬 Nitpick: try_*_at records wait_ns: 0 on success — semantic worth documenting
On a successful try_read_at (line 296) / try_write_at (line 315) the recorded wait_ns is hard-coded to 0. Any workload that mixes try_* successes with read_at/write_at will see wait_ns_mean biased toward zero proportional to the try_* ratio. The choice is defensible if wait_ns is meant to mean "time spent contended waiting," but the docstrings on read_wait_ns_total (stats.rs:181) and read_wait_ns_mean (stats.rs:193) say only "wait time before … resolved" without a qualifier. A one-line clarification — "try_* successes contribute 0 to wait totals; only awaiting acquisitions are timed" — keeps consumers from misreading aggregates that mix try_*_at and read_at.
source: ['claude']
| let wait_start = Instant::now(); | ||
| let inner = self.inner.read().await; | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| self.stats.record_read_acquired(tag, wait_ns); | ||
| ReadGuard { | ||
| inner, | ||
| stats: Arc::clone(&self.stats), | ||
| tag, | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } | ||
|
|
||
| /// Acquire an exclusive lock with a per-call-site tag. | ||
| pub async fn write_at(&self, tag: &'static str) -> WriteGuard<'_, T> { | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.write().await; | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| self.stats.record_write_acquired(tag, wait_ns); | ||
| WriteGuard { | ||
| inner, | ||
| stats: Arc::clone(&self.stats), | ||
| tag, | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } | ||
|
|
||
| /// Try to acquire a shared lock without waiting. | ||
| #[inline] | ||
| pub fn try_read(&self) -> Result<ReadGuard<'_, T>, TryLockError> { | ||
| self.try_read_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Try to acquire an exclusive lock without waiting. | ||
| #[inline] | ||
| pub fn try_write(&self) -> Result<WriteGuard<'_, T>, TryLockError> { | ||
| self.try_write_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Tagged variant of [`try_read`](Self::try_read). | ||
| pub fn try_read_at(&self, tag: &'static str) -> Result<ReadGuard<'_, T>, TryLockError> { | ||
| match self.inner.try_read() { | ||
| Ok(inner) => { | ||
| self.stats.record_read_acquired(tag, 0); | ||
| Ok(ReadGuard { | ||
| inner, | ||
| stats: Arc::clone(&self.stats), | ||
| tag, | ||
| acquired_at: Instant::now(), | ||
| }) | ||
| } | ||
| Err(e) => { | ||
| self.stats.record_read_contended(tag); | ||
| Err(e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Tagged variant of [`try_write`](Self::try_write). | ||
| pub fn try_write_at(&self, tag: &'static str) -> Result<WriteGuard<'_, T>, TryLockError> { | ||
| match self.inner.try_write() { | ||
| Ok(inner) => { | ||
| self.stats.record_write_acquired(tag, 0); | ||
| Ok(WriteGuard { | ||
| inner, | ||
| stats: Arc::clone(&self.stats), | ||
| tag, | ||
| acquired_at: Instant::now(), | ||
| }) | ||
| } | ||
| Err(e) => { | ||
| self.stats.record_write_contended(tag); | ||
| Err(e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Synchronously acquire a shared lock — must NOT be called from | ||
| /// a tokio runtime thread. | ||
| #[inline] | ||
| pub fn blocking_read(&self) -> ReadGuard<'_, T> { | ||
| self.blocking_read_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Synchronously acquire an exclusive lock — must NOT be called | ||
| /// from a tokio runtime thread. | ||
| #[inline] | ||
| pub fn blocking_write(&self) -> WriteGuard<'_, T> { | ||
| self.blocking_write_at(UNTAGGED) | ||
| } | ||
|
|
||
| /// Tagged variant of [`blocking_read`](Self::blocking_read). | ||
| pub fn blocking_read_at(&self, tag: &'static str) -> ReadGuard<'_, T> { | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.blocking_read(); | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| self.stats.record_read_acquired(tag, wait_ns); | ||
| ReadGuard { | ||
| inner, | ||
| stats: Arc::clone(&self.stats), | ||
| tag, | ||
| acquired_at: Instant::now(), | ||
| } | ||
| } | ||
|
|
||
| /// Tagged variant of [`blocking_write`](Self::blocking_write). | ||
| pub fn blocking_write_at(&self, tag: &'static str) -> WriteGuard<'_, T> { | ||
| let wait_start = Instant::now(); | ||
| let inner = self.inner.blocking_write(); | ||
| let wait_ns = wait_start.elapsed().as_nanos() as u64; | ||
| self.stats.record_write_acquired(tag, wait_ns); |
There was a problem hiding this comment.
💬 Nitpick: Instant::elapsed().as_nanos() as u64 silently truncates on overflow
Six sites — read_at (line 256), write_at (line 270), blocking_read_at (line 348), blocking_write_at (line 362), and the two Drop impls (lines 407, 439) — convert u128 nanoseconds to u64 via unchecked as. Truncation only matters above 2⁶⁴ ns (~584 years), so for any realistic acquire/hold this is fine. Worth flagging only because saturating conversion (u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX)) costs nothing and removes a class-of-bug worry — and a hung-guard pathology where elapsed grows unboundedly is exactly what this instrument is built to surface; truncating to a small wraparound value would bury it.
source: ['claude']
Issue being fixed or feature implemented
After the event-bus migration (#3556), we audited wallet_manager lock pressure statically — 109 acquisition sites in platform-wallet, 52 of them writes. None hold the lock across an unrelated
.await, but we had no way to measure runtime contention. This PR adds an opt-in instrumentation layer so we can.The next time a perf question comes up ("is this site eating sync throughput?", "how often does the event adapter probe finality?"), the answer is one feature flag away.
What was done?
New
InstrumentedRwLock<T>wrapper (inplatform_wallet::diagnostics::instrumented_lock) aroundtokio::sync::RwLock<T>. The wrapper holds its inner lock asArc<TokioRwLock<T>>so it can hand the sameArc<RwLock<T>>to APIs that take that type literally (dash_spv::DashSpvClient::new) viaraw_arc().Off by default. With the
lock-statsCargo feature off:#[inline]delegations.Dropglue).tokio::sync::RwLockusage.parking_lotis an optional dep gated on the feature; nothing pulls it in the default build.With
lock-statson:Drop).read_at("tag") / write_at("tag")(plustry_*_atandblocking_*_at) attribute the acquisition to a per-call-site bucket. Untagged calls bucket intoUNTAGGED— no acquisition vanishes from the snapshot.LockStats::snapshot()produces aSnapshot { total, per_tag }: cumulative read/write counts, contention counts (try_*failures), wait_ns_total, hold_ns_total, plus mean helpers.parking_lot::Mutex<BTreeMap<&'static str, Arc<SiteCounters>>>— touched only on acquire/release, never across an.await, so a sync mutex is fine.Migration
Arc<RwLock<WalletManager<PlatformWalletInfo>>>acrossmanager,spv,wallet,changesetswitched toArc<crate::diagnostics::InstrumentedRwLock<...>>.RwLockReadGuard/RwLockWriteGuarddirectly (WalletStateReadGuard/WalletStateWriteGuardinwallet/platform_wallet.rsandIdentityWallet::wallet_manager_read/_writeinwallet/identity/network/identity_handle.rs) switched to the wrapper guards.lock.read().await/.write().await— they bucket intoUNTAGGED. Tagging is opt-in per site.Worked examples — two sites tagged in
core_bridge::spawn_wallet_event_adapter:event_adapter::subscribe— the one-shot read at task start (subscribe toWalletManager::subscribe_events).event_adapter::is_chain_locked— the per-TransactionInstantLockedprobe that the prior audit flagged as the primary new contention point introduced by the event-bus migration.These two tags answer the question "how much of wallet_manager contention does the event adapter contribute?" once the feature is enabled. Other sites can be tagged incrementally as questions arise.
Trade explicitly accepted in the wrapper docs
raw_arc()hands out the innerArc<TokioRwLock<T>>to upstream. Acquisitions made through that handed-out Arc — specifically SPV's ownprocess_blockwrite insideDashSpvClient— bypass the wrapper's stats. Counting those would require an upstream change. The current shape is the right one for "what does platform-wallet contribute to lock pressure?", which is the question the audit was actually trying to answer.How Has This Been Tested?
cargo check --workspace— clean (default features)cargo check -p platform-wallet --features lock-stats— cleancargo test -p platform-wallet --features lock-stats --lib diagnostics— 5/5 pass:read_write_smoketry_read_contendedstats_count_and_attribute_to_taguntagged_calls_go_to_untagged_buckettry_read_failure_records_contentioncargo fmt --all -- --check— cleanBreaking Changes
None — the wrapper preserves the call-site API surface. Call sites that previously named the tokio guard types directly (two structs and two methods) switched to the wrapper guard types; this is internal to platform-wallet.
Follow-ups
LockStats::snapshot()once we know what the iOS debug UI wants from the snapshot shape (deferred per the PR scope).wallet/identity/network/*so the feature snapshot can break down identity vs. token vs. address-sync vs. SPV-event-adapter contributions independently.Checklist:
🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes
lock-statsflag with zero runtime overhead when disabled.