[FLINK-XXX] FLIP-XXX: Expose per downstream target numRecordsOut metric#28014
Open
Dennis-Mircea wants to merge 1 commit intoapache:masterfrom
Open
[FLINK-XXX] FLIP-XXX: Expose per downstream target numRecordsOut metric#28014Dennis-Mircea wants to merge 1 commit intoapache:masterfrom
Dennis-Mircea wants to merge 1 commit intoapache:masterfrom
Conversation
Collaborator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
FLIP link: https://docs.google.com/document/d/1TlyTc6fvYGG1xlO-IFlGBC97CxilUBFdZBpLOBPdKD8/edit?usp=sharing
Task-level
numRecordsOutis a single scalar today, which hides how much data a task routes to each of its downstream vertices when it has more than one network output (side outputs, multi-sink, broadcast fan-out). Consumers that need per-edge throughput, most notably the Kubernetes autoscaler, which computes per-vertex target parallelism, cannot distinguish traffic per downstream and therefore over- or under-provision affected vertices.This PR exposes a per-downstream-target
numRecordsOutbreakdown end-to-end: it is registered as an individual live metric (numRecordsOut.<targetJobVertexId>) for reporters, included in the archivedIOMetricssnapshot, aggregated per job vertex, and surfaced on the REST/jobs/:jobid,/jobs/:jobid/vertices/:vertexid, and subtask-attempt responses as a newwrite-records-per-targetfield. The aggregatenumRecordsOut/write-recordsscalar is unchanged.Brief change log
NonChainedOutputnow carries the downstreamJobVertexID, populated byStreamingJobGraphGenerator#createOrReuseOutput, so every network output knows which vertex it feeds.TaskIOMetricGroupgains two overloads:reuseRecordsOutputCounter(Counter, jobVertexId)(aggregate-contributing + individual metric) andregisterNumRecordsOutPerTarget(Counter, jobVertexId)(target-only, does not contribute to the aggregate). Both emit thenumRecordsOut.<targetJobVertexId>metric consumable by Prometheus/JMX/OTel/etc.OperatorChain#createOutputCollectorinstalls a per-target counter on eachRecordWriterOutput. The single-output path uses the aggregate-contributing overload; the multi-output / broadcast fan-out path uses the target-only overload and letsBroadcastingOutputCollectorown the aggregate (preventing double-count on broadcast). Missing target ids are logged once and fall back to the aggregate-only counter; metric wiring never fails a task.Execution#updateAccumulatorsAndMetricsnow preserves the newnumRecordsOutPerTargetmap when rebuilding the lean archivedIOMetrics(previously it was silently dropped, which would have made the breakdown invisible to REST consumers).IOMetricscarries the newMap<String, Long> numRecordsOutPerTarget.MutableIOMetricsfolds per-subtask maps into the vertex-level view via per-key sum.IOMetricsInfoadds a serializedwrite-records-per-targetfield.JobDetailsHandler,JobVertexTaskManagersHandler, andSubtaskExecutionAttemptDetailsInfopopulate the new field.rest_v1_dispatcher.ymlandrest_v1_dispatcher.html.Verifying this change
This change added tests and can be verified as follows:
TaskIOMetricGroupTest: covers aggregate-contributing vs target-only registration semantics, snapshot propagation, and that the per-target map reports live counter values.StreamingJobGraphGeneratorTest: asserts everyNonChainedOutputcarries a real downstreamJobVertexIDfor both normal pipelines and broadcast fan-outs.JobDetailsHandlerTest: asserts per-subtask per-target maps are merged per key into the vertex-levelIOMetricsInfo, and verifies back-compat when the map is empty.SubtaskExecutionAttemptDetailsInfoTest: Jackson round-trip for both empty and populated per-target maps.PerTargetNumRecordsOutITCaseinflink-testsruns real jobs on aMiniClusterExtensionand asserts the full pipeline (TaskIOMetricGroup->IOMetrics->MutableIOMetrics->IOMetricsInfo-> JSON) viaRestClusterClient#getJobDetails:source -> process (split by i % 3) -> {main-sink, side-sink}): aggregatewrite-records = 20, per-target map contains both downstreamJobVertexIDs with the expected split (13 main / 7 side), and per-target sum equals the aggregate.source.broadcast() -> 3 x (map -> DiscardingSink)): aggregate stays atNUM_RECORDS = 20(no double-count), each of the 3 per-target entries equalsNUM_RECORDSindependently, directly validating the target-only registration semantics.flink-architecture-tests-productionandflink-architecture-tests-testpass with no new violations; noarchunit-violations/store needed changes.Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code