Skip to content

[FLINK-XXX] FLIP-XXX: Expose per downstream target numRecordsOut metric#28014

Open
Dennis-Mircea wants to merge 1 commit intoapache:masterfrom
Dennis-Mircea:sideoutput-record-count
Open

[FLINK-XXX] FLIP-XXX: Expose per downstream target numRecordsOut metric#28014
Dennis-Mircea wants to merge 1 commit intoapache:masterfrom
Dennis-Mircea:sideoutput-record-count

Conversation

@Dennis-Mircea
Copy link
Copy Markdown

What is the purpose of the change

FLIP link: https://docs.google.com/document/d/1TlyTc6fvYGG1xlO-IFlGBC97CxilUBFdZBpLOBPdKD8/edit?usp=sharing

Task-level numRecordsOut is a single scalar today, which hides how much data a task routes to each of its downstream vertices when it has more than one network output (side outputs, multi-sink, broadcast fan-out). Consumers that need per-edge throughput, most notably the Kubernetes autoscaler, which computes per-vertex target parallelism, cannot distinguish traffic per downstream and therefore over- or under-provision affected vertices.

This PR exposes a per-downstream-target numRecordsOut breakdown end-to-end: it is registered as an individual live metric (numRecordsOut.<targetJobVertexId>) for reporters, included in the archived IOMetrics snapshot, aggregated per job vertex, and surfaced on the REST /jobs/:jobid, /jobs/:jobid/vertices/:vertexid, and subtask-attempt responses as a new write-records-per-target field. The aggregate numRecordsOut/write-records scalar is unchanged.

Brief change log

  • Planner wiring: NonChainedOutput now carries the downstream JobVertexID, populated by StreamingJobGraphGenerator#createOrReuseOutput, so every network output knows which vertex it feeds.
  • Metric registration: TaskIOMetricGroup gains two overloads: reuseRecordsOutputCounter(Counter, jobVertexId) (aggregate-contributing + individual metric) and registerNumRecordsOutPerTarget(Counter, jobVertexId) (target-only, does not contribute to the aggregate). Both emit the numRecordsOut.<targetJobVertexId> metric consumable by Prometheus/JMX/OTel/etc.
  • Operator wiring: OperatorChain#createOutputCollector installs a per-target counter on each RecordWriterOutput. The single-output path uses the aggregate-contributing overload; the multi-output / broadcast fan-out path uses the target-only overload and lets BroadcastingOutputCollector own the aggregate (preventing double-count on broadcast). Missing target ids are logged once and fall back to the aggregate-only counter; metric wiring never fails a task.
  • Archival path fix: Execution#updateAccumulatorsAndMetrics now preserves the new numRecordsOutPerTarget map when rebuilding the lean archived IOMetrics (previously it was silently dropped, which would have made the breakdown invisible to REST consumers).
  • REST surface:
    • IOMetrics carries the new Map<String, Long> numRecordsOutPerTarget.
    • MutableIOMetrics folds per-subtask maps into the vertex-level view via per-key sum.
    • IOMetricsInfo adds a serialized write-records-per-target field.
    • JobDetailsHandler, JobVertexTaskManagersHandler, and SubtaskExecutionAttemptDetailsInfo populate the new field.
    • Regenerated rest_v1_dispatcher.yml and rest_v1_dispatcher.html.

Verifying this change

This change added tests and can be verified as follows:

  • Unit:
    • TaskIOMetricGroupTest: covers aggregate-contributing vs target-only registration semantics, snapshot propagation, and that the per-target map reports live counter values.
    • StreamingJobGraphGeneratorTest: asserts every NonChainedOutput carries a real downstream JobVertexID for both normal pipelines and broadcast fan-outs.
    • JobDetailsHandlerTest: asserts per-subtask per-target maps are merged per key into the vertex-level IOMetricsInfo, and verifies back-compat when the map is empty.
    • SubtaskExecutionAttemptDetailsInfoTest: Jackson round-trip for both empty and populated per-target maps.
  • Integration (new): PerTargetNumRecordsOutITCase in flink-tests runs real jobs on a MiniClusterExtension and asserts the full pipeline (TaskIOMetricGroup -> IOMetrics -> MutableIOMetrics -> IOMetricsInfo -> JSON) via RestClusterClient#getJobDetails:
    • Side output topology (source -> process (split by i % 3) -> {main-sink, side-sink}): aggregate write-records = 20, per-target map contains both downstream JobVertexIDs with the expected split (13 main / 7 side), and per-target sum equals the aggregate.
    • Broadcast topology (source.broadcast() -> 3 x (map -> DiscardingSink)): aggregate stays at NUM_RECORDS = 20 (no double-count), each of the 3 per-target entries equals NUM_RECORDS independently, directly validating the target-only registration semantics.
  • ArchUnit: flink-architecture-tests-production and flink-architecture-tests-test pass with no new violations; no archunit-violations/ store needed changes.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

Was generative AI tooling used to co-author this PR?
  • Yes

Generated-by: Claude Code

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 23, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants