Automaticaly update BigQuery table schema when unknown fields are seen.#38058
Automaticaly update BigQuery table schema when unknown fields are seen.#38058reuvenlax wants to merge 17 commits intoapache:masterfrom
Conversation
|
/gemini review |
|
Warning Gemini encountered an error creating the summary. You can try again by commenting |
|
/gemini summary |
Summary of ChangesThis pull request introduces a significant enhancement to the BigQuery Storage Write API connector, allowing it to automatically adapt to evolving data schemas. When data with new fields or relaxed field constraints arrives, the system will now detect these changes, buffer the affected records, and dynamically update the BigQuery table's schema. This capability greatly simplifies data ingestion pipelines by removing the need for manual schema management in response to minor data evolution, ensuring data flow continues uninterrupted. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
|
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements automatic schema evolution for the BigQuery Storage Write API. It introduces a mechanism to detect schema mismatches using hashing and handles missing field errors by automatically patching the BigQuery table schema when permitted. Key changes include the UpgradeTableSchema utility for schema merging and a stateful buffering mechanism in StorageApiConvertMessages to hold records during schema updates. The review feedback recommends replacing System.err calls with proper logging, optimizing object allocations in loops, removing redundant semicolons, and using a fixed charset for hashing to ensure consistency across environments.
| ElementT value = element.getValue(); | ||
| boolean needsNewTimer = false; | ||
| if (value != null) { | ||
| System.err.println("BUFFERING ELEMENT"); |
| boolean schemaOutOfDate = false; | ||
| do { | ||
| try { | ||
| System.err.println("TRYING TO PATCH TO " + updatedSchema); |
| } | ||
| return; | ||
| } catch (IOException e) { | ||
| ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); |
| collectors.clear(); | ||
| ; |
| this.isStruct = isStruct; | ||
| ; |
| prefix.isEmpty() | ||
| ? tableFieldSchema.getName() | ||
| : String.join(".", prefix, tableFieldSchema.getName()); | ||
| hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), Charset.defaultCharset())); |
There was a problem hiding this comment.
Using Charset.defaultCharset() can lead to inconsistent results across different environments. It is safer to use a fixed charset like StandardCharsets.UTF_8 for hashing.
| hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), Charset.defaultCharset())); | |
| hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), StandardCharsets.UTF_8)); |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
402535f to
863352d
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces automatic schema updates for the BigQuery Storage Write API, enabling field additions and relaxations during streaming. It implements a detection mechanism for schema mismatches, asynchronous table patching, and a stateful buffering stage to hold records until updates are propagated. The changes also include an error collection system for proto conversion and optimized schema caching. A redundant license header was noted in ConvertMessagesDoFn.java.
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces support for automatic BigQuery table schema updates during Storage Write API operations. It adds new components like ConvertMessagesDoFn, PatchTableSchemaDoFn, and SchemaUpdateHoldingFn to buffer and retry elements when schema mismatches occur. My review identified several critical issues: the schema patching loop can become infinite on invalid updates, the ConvertMessagesDoFn is incorrectly shared across transforms, the windowing logic for schema notifications is flawed, and there are several instances of dead code, debug logs, and overly aggressive error handling that could crash the pipeline.
| while (true) { | ||
| TableSchema baseSchema = messageConverter.getTableSchema(); | ||
| TableSchema updatedSchema = UpgradeTableSchema.mergeSchemas(baseSchema, tableSchemaDiff); | ||
| // Check first to see if the schema still needs updating. | ||
| if (baseSchema.equals(updatedSchema)) { | ||
| return; | ||
| } | ||
| BackOff backoff = | ||
| new ExponentialBackOff.Builder() | ||
| .setMaxElapsedTimeMillis((int) TimeUnit.MINUTES.toMillis(1)) | ||
| .build(); | ||
| boolean schemaOutOfDate = false; | ||
| Exception lastException = null; | ||
| do { | ||
| try { | ||
| getDatasetService(pipelineOptions) | ||
| .patchTableSchema( | ||
| dynamicDestinations.getTable(destination).getTableReference(), | ||
| TableRowToStorageApiProto.protoSchemaToTableSchema(updatedSchema)); | ||
| // Indicate that we've patched this schema. | ||
| o.output(KV.of(destination, null)); | ||
| return; | ||
| } catch (IOException e) { | ||
| ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); | ||
| if (errorExtractor.preconditionNotMet(e) || errorExtractor.badRequest(e)) { | ||
| schemaOutOfDate = true; | ||
| break; | ||
| } else { | ||
| lastException = e; | ||
| } | ||
| } | ||
| } while (BackOffUtils.next(com.google.api.client.util.Sleeper.DEFAULT, backoff)); | ||
| if (schemaOutOfDate) { | ||
| // This could be due to an out-of-date schema. | ||
| messageConverter.updateSchemaFromTable(); | ||
| } else { | ||
| // We ran out of retries. | ||
| throw new RuntimeException("Failed to patch table schema.", lastException); | ||
| } | ||
| } |
There was a problem hiding this comment.
The while (true) loop, combined with the logic that sets schemaOutOfDate = true on any badRequest or preconditionNotMet error, can lead to an infinite loop. If a schema update is inherently invalid (e.g., attempting an incompatible type change), BigQuery will return a 400 Bad Request. The current implementation will interpret this as the local schema being out of date, refresh it, and retry the same invalid update indefinitely. A maximum retry count or a more specific check on the error reason should be implemented.
| ConvertMessagesDoFn<DestinationT, ElementT> convertMessagesDoFn = | ||
| new ConvertMessagesDoFn<>( | ||
| dynamicDestinations, | ||
| bqServices, | ||
| operationName, | ||
| failedWritesTag, | ||
| successfulWritesTag, | ||
| patchTableSchemaTag, | ||
| elementsWaitingForSchemaTag, | ||
| rowMutationFn, | ||
| badRecordRouter, | ||
| input.getCoder()); | ||
|
|
||
| PCollectionTuple result = | ||
| input.apply( | ||
| "Convert to message", | ||
| ParDo.of( | ||
| new ConvertMessagesDoFn<>( | ||
| dynamicDestinations, | ||
| bqServices, | ||
| operationName, | ||
| failedWritesTag, | ||
| successfulWritesTag, | ||
| rowMutationFn, | ||
| badRecordRouter, | ||
| input.getCoder())) | ||
| ParDo.of(convertMessagesDoFn) | ||
| .withOutputTags( | ||
| successfulWritesTag, | ||
| TupleTagList.of(ImmutableList.of(failedWritesTag, BAD_RECORD_TAG))) | ||
| TupleTagList.of( | ||
| ImmutableList.of( | ||
| failedWritesTag, | ||
| BAD_RECORD_TAG, | ||
| patchTableSchemaTag, | ||
| elementsWaitingForSchemaTag))) | ||
| .withSideInputs(dynamicDestinations.getSideInputs())); | ||
| result.get(successfulWritesTag).setCoder(successCoder); | ||
| result.get(failedWritesTag).setCoder(errorCoder); | ||
| result.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())); | ||
| return result; | ||
| result | ||
| .get(patchTableSchemaTag) | ||
| .setCoder(KvCoder.of(destinationCoder, ProtoCoder.of(TableSchema.class))); | ||
| result.get(elementsWaitingForSchemaTag).setCoder(KvCoder.of(destinationCoder, elementCoder)); | ||
|
|
||
| final int numShards = | ||
| input | ||
| .getPipeline() | ||
| .getOptions() | ||
| .as(BigQueryOptions.class) | ||
| .getSchemaUpgradeBufferingShards(); | ||
|
|
||
| // Throttle the stream to the patch-table function so that only a single update per table per | ||
| // second gets processed. The combiner merges incremental schemas, so we won't miss any pdates. | ||
| PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched = | ||
| result | ||
| .get(patchTableSchemaTag) | ||
| .apply( | ||
| "rewindow", | ||
| Window.<KV<DestinationT, TableSchema>>configure() | ||
| .triggering( | ||
| Repeatedly.forever( | ||
| AfterProcessingTime.pastFirstElementInPane() | ||
| .plusDelayOf(Duration.standardSeconds(1)))) | ||
| .discardingFiredPanes()) | ||
| .apply("merge schemas", Combine.perKey(new MergeSchemaCombineFn())) | ||
| .setCoder(KvCoder.of(destinationCoder, ProtoCoder.of(TableSchema.class))) | ||
| .apply( | ||
| "Patch table schema", | ||
| ParDo.of( | ||
| new PatchTableSchemaDoFn<>(operationName, bqServices, dynamicDestinations))) | ||
| .setCoder(KvCoder.of(destinationCoder, NullableCoder.of(elementCoder))) | ||
| // We need to make sure that all shards of the buffering transform are notified. | ||
| .apply( | ||
| "fanout to all shards", | ||
| FlatMapElements.via( | ||
| new SimpleFunction< | ||
| KV<DestinationT, ElementT>, | ||
| Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() { | ||
| @Override | ||
| public Iterable<KV<ShardedKey<DestinationT>, ElementT>> apply( | ||
| KV<DestinationT, ElementT> elem) { | ||
| return IntStream.range(0, numShards) | ||
| .mapToObj( | ||
| i -> | ||
| KV.of( | ||
| StorageApiConvertMessages.AssignShardFn.getShardedKey( | ||
| elem.getKey(), i, numShards), | ||
| elem.getValue())) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| })) | ||
| .setCoder( | ||
| KvCoder.of(ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder))) | ||
| .apply( | ||
| Window.<KV<ShardedKey<DestinationT>, ElementT>>configure() | ||
| .triggering(DefaultTrigger.of())); | ||
|
|
||
| // Any elements that are waiting for a schema update are sent to this stateful DoFn to be | ||
| // buffered. | ||
| // Note: we currently do not provide the DynamicDestinations object access to the side input in | ||
| // this path. | ||
| // This is because side inputs are not currently available from timer callbacks. Since side | ||
| // inputs are generally | ||
| // used for getSchema and in this case we read the schema from the table, this is unlikely to be | ||
| // a problem. | ||
| PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedWaitingElements = | ||
| result | ||
| .get(elementsWaitingForSchemaTag) | ||
| // TODO: Consider using GroupIntoBatchs.withShardingKey to get auto sharding here | ||
| // instead of fixed sharding. | ||
| .apply("assignShard", ParDo.of(new AssignShardFn<>(numShards))) | ||
| .setCoder( | ||
| KvCoder.of(ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder))); | ||
|
|
||
| PCollectionList<KV<ShardedKey<DestinationT>, ElementT>> waitingElementsList = | ||
| PCollectionList.of(shardedWaitingElements).and(tablesPatched); | ||
| PCollectionTuple retryResult = | ||
| waitingElementsList | ||
| .apply("Buffered flatten", Flatten.pCollections()) | ||
| .apply( | ||
| "bufferElements", | ||
| ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder, convertMessagesDoFn)) |
There was a problem hiding this comment.
The convertMessagesDoFn instance is being used both as a standalone DoFn in a ParDo (line 125) and as an embedded helper inside SchemaUpdateHoldingFn (line 220). In Apache Beam, DoFn instances have specific lifecycles (setup, startBundle, finishBundle, teardown) managed by the runner. Sharing a DoFn instance across different transforms or embedding it within another DoFn is highly discouraged and can lead to unexpected behavior or serialization issues. The shared logic should be refactored into a separate helper class that is not a DoFn.
| PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched = | ||
| result | ||
| .get(patchTableSchemaTag) | ||
| .apply( | ||
| "rewindow", | ||
| Window.<KV<DestinationT, TableSchema>>configure() | ||
| .triggering( | ||
| Repeatedly.forever( | ||
| AfterProcessingTime.pastFirstElementInPane() | ||
| .plusDelayOf(Duration.standardSeconds(1)))) | ||
| .discardingFiredPanes()) | ||
| .apply("merge schemas", Combine.perKey(new MergeSchemaCombineFn())) | ||
| .setCoder(KvCoder.of(destinationCoder, ProtoCoder.of(TableSchema.class))) | ||
| .apply( | ||
| "Patch table schema", | ||
| ParDo.of( | ||
| new PatchTableSchemaDoFn<>(operationName, bqServices, dynamicDestinations))) | ||
| .setCoder(KvCoder.of(destinationCoder, NullableCoder.of(elementCoder))) | ||
| // We need to make sure that all shards of the buffering transform are notified. | ||
| .apply( | ||
| "fanout to all shards", | ||
| FlatMapElements.via( | ||
| new SimpleFunction< | ||
| KV<DestinationT, ElementT>, | ||
| Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() { | ||
| @Override | ||
| public Iterable<KV<ShardedKey<DestinationT>, ElementT>> apply( | ||
| KV<DestinationT, ElementT> elem) { | ||
| return IntStream.range(0, numShards) | ||
| .mapToObj( | ||
| i -> | ||
| KV.of( | ||
| StorageApiConvertMessages.AssignShardFn.getShardedKey( | ||
| elem.getKey(), i, numShards), | ||
| elem.getValue())) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| })) | ||
| .setCoder( | ||
| KvCoder.of(ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder))) | ||
| .apply( | ||
| Window.<KV<ShardedKey<DestinationT>, ElementT>>configure() | ||
| .triggering(DefaultTrigger.of())); |
There was a problem hiding this comment.
The schema update notification (tablesPatched) is emitted in the GlobalWindow via ConvertMessagesDoFn.finishBundle. However, the elements waiting for schema updates in SchemaUpdateHoldingFn may belong to various windows. Since SchemaUpdateHoldingFn is a stateful DoFn, its state and timers are window-scoped. A notification in the GlobalWindow will not trigger the processing of state held in other windows. While the pollTimer fallback ensures eventual processing, the 'fast-path' notification mechanism is currently ineffective for windowed data.
| new RetryManager<>( | ||
| Duration.standardSeconds(1), | ||
| Duration.standardSeconds(20), | ||
| maxRetries, | ||
| BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS)); |
|
|
||
| AppendClientInfo getAppendClientInfo( | ||
| boolean lookupCache, final @Nullable TableSchema updatedSchema) { | ||
| lookupCache = false; |
| // TODO: WHY ARE WE HITTING THIS FAILURE!!!!!!! WE successfully reopend the | ||
| // connection with | ||
| // a new TableSchema, yet Vortex is failing the individual rows. | ||
| // APPEARS THAT STREAMWRITER isn't getting updated? | ||
| LOG.error( | ||
| "UNEXPECTED. DUMPING ERROR {}, CONVERTER SCHEMA {}, ACI SCHEMA {}", | ||
| error.getRowIndexToErrorMessage().get(failedIndex), | ||
| messageConverter.getTableSchema(), | ||
| aci.getTableSchema()); |
| return; | ||
| } | ||
| } while (BackOffUtils.next(com.google.api.client.util.Sleeper.DEFAULT, backoff)); | ||
| throw new RuntimeException("Failed to flush elements on window expiration!"); |
There was a problem hiding this comment.
Throwing a RuntimeException during onWindowExpiration is quite aggressive for a streaming pipeline. If a schema update is delayed or fails permanently, this will cause the entire pipeline to crash and potentially enter a crash loop. Consider routing these elements to a dead-letter queue or failing them through the standard WriteResult mechanism instead of crashing the worker.
|
Assigning reviewers: R: @chamikaramj for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
R: @ahmedabu98 |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
ahmedabu98
left a comment
There was a problem hiding this comment.
Leaving some comments, but didn't look at any test classes yet
| return tableSchemaHash("", tableSchema.getFieldsList()).asBytes(); | ||
| } | ||
|
|
||
| public static HashCode tableSchemaHash(String prefix, List<TableFieldSchema> fields) { |
There was a problem hiding this comment.
Can we also hash the field type? The current logic will see {name: a, type: STRING} and {name: a, type: INTEGER} as equal
There was a problem hiding this comment.
And can we add some unit testing?
| // Returns true if the exception was collected. | ||
| void collect(SchemaConversionException exception) throws SchemaConversionException { |
There was a problem hiding this comment.
// Returns true
outdated comment?
| checkArgument(!useSchemaUpdate); | ||
|
|
There was a problem hiding this comment.
nit: duplicate check, already covered above
|
|
||
| TableRow toFailsafeTableRow(T element); | ||
|
|
||
| void updateSchemaFromTable() throws IOException, InterruptedException; |
There was a problem hiding this comment.
Can we have subclasses that don't implement this throw an UnsupportedOperationException?
There was a problem hiding this comment.
let me think about this a bit more. in the past such logic has caused problems with pipeline update
| streamDoesNotExist = streamDoesNotExist && !schemaMismatchError; | ||
|
|
||
| if (offsetMismatch || streamDoesNotExist) { | ||
| appendOffsetFailures.inc(); |
There was a problem hiding this comment.
Do we need to reset and create a new stream for schema mismatch errors?
There was a problem hiding this comment.
we want to reset and create a new stream "connection" (reset the RPC). We specifically do not want to delete the stream and recreate it.
| checkArgument(!useSchemaUpdate); | ||
|
|
| // Returns true if the exception was collected. | ||
| void collect(SchemaConversionException exception) throws SchemaConversionException { |
| for (int i = 0; i < 5; ++i) { | ||
| testStream = testStream.advanceProcessingTime(Duration.standardSeconds(2)); | ||
| } |
There was a problem hiding this comment.
We're trying to force some processing-time timer expiration so we aren't just relying on OnWindowExpiration. It's not great, because TestStream isn't well suited to testing processing-time timers. Ideally we'd be able to advance the clock after the job has started, and then wait for a signal that the timer has expired. TestStream wants these advancements to be deterministically put in the input stream though. This approach does add some test coverage, but it's not very deterministic
| Function<Integer, TableSchema> getUpdatedSchema = | ||
| currentStride -> { | ||
| TableSchema tableSchemaUpdated = new TableSchema(); |
There was a problem hiding this comment.
I'm noticing that this function only gets checked for Integer.MAX_VALUE to validate the final schema.
Is there a way to view the table update history and validate that incremental updates occurred?
There was a problem hiding this comment.
Maybe TableContainer can maintain an ordered list of Table schemas
There was a problem hiding this comment.
oh yes. This is a remnant of an earlier test that got rewritten. We might be able to test, but I'm not sure the sequence will be deterministic - if DirectRunner shuffles inputs (which in general it does) the intermediate set of table schemas might vary from tests to test, with only the final one being deterministic.
It might be possible to do this with a stateful DoFn like we do in the IT test, but I'd have to look at it a bit more.
| private void verifyTableSchemaUpdated(String tableSpec, TableSchema evolvedSchema) | ||
| throws IOException, InterruptedException { | ||
| Table table = | ||
| BQ_CLIENT.getTableResource( | ||
| PROJECT, | ||
| BIG_QUERY_DATASET_ID, | ||
| Iterables.getLast( | ||
| org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter.on('.') | ||
| .split(tableSpec))); | ||
| assertEquals( | ||
| TableRowToStorageApiProto.schemaToProtoTableSchema(evolvedSchema), | ||
| TableRowToStorageApiProto.schemaToProtoTableSchema(table.getSchema())); | ||
| } |
There was a problem hiding this comment.
Same question from above: is there a way to validate that incremental schemas occurred? Maybe harder to do in an integration test, but worth asking the question
There was a problem hiding this comment.
rough idea: BigQueryIO supports successful write propagation. We can add a ParDo lisrtening to these successful writes, and validate the schemas there. Again though order is not guaranteed.
I'm also noticing that the implementation of successful-write propagation appears broken for the EO sink. We push to this PCollection as soon as we write, but before we flush the stream.
| IntFunction<TableRow> getRow = | ||
| (IntFunction<TableRow> & Serializable) | ||
| (int i) -> { | ||
| TableRow row = new TableRow().set("name", "name" + i).set("number", Long.toString(i)); | ||
| if (i < stride) { | ||
| row = row.set("req", "foo"); | ||
| } else { | ||
| row = row.set("new1", "blah" + i); | ||
| row = row.set("new2", "baz" + i); |
There was a problem hiding this comment.
Can we also include one last stride that tries writing invalid rows? Should check different cases:
- extra required column
- same column name but different type
- same column name but invalid mode change
Then check that they're correctly output to the DLQ
There was a problem hiding this comment.
good idea. new required column isn't possible to express (any new fields in the json are assumed to be nullable). What I can test is changing mode to repeated or changing type from string to struct. (once we support user-determined types, we can test more).
One wrinkle: pretty much any type in the object will successfully validate as a String type (the logic is to call object.toString()), so this only really works with more restricted types such as INT64. Basically anything can match to. string field.
| TableRow getRow(int i) { | ||
| TableRow row = new TableRow().set("name", "name" + i).set("number", Long.toString(i)); | ||
| if (i < stride) { | ||
| row = row.set("req", "foo"); | ||
| } else { | ||
| row = row.set("new1", "blah" + i); | ||
| row = row.set("new2", "baz" + i); | ||
|
|
There was a problem hiding this comment.
Same ask here about adding invalid rows and checking they correctly get output to the DLQ
No description provided.