feat(go): bulk ingest via Volume + COPY INTO instead of per-row INSERT#411
Draft
jatorre wants to merge 24 commits intoadbc-drivers:mainfrom
Draft
feat(go): bulk ingest via Volume + COPY INTO instead of per-row INSERT#411jatorre wants to merge 24 commits intoadbc-drivers:mainfrom
jatorre wants to merge 24 commits intoadbc-drivers:mainfrom
Conversation
Expose geospatialAsArrow support (SPARK-54232) as an opt-in ADBC connection option. When set to "true", geometry/geography columns arrive as Struct<srid: Int32, wkb: Binary> instead of EWKT strings. This depends on databricks/databricks-sql-go#328 which adds the WithArrowNativeGeospatial() ConnOption to the underlying Go SQL driver. Usage via adbc_connect (e.g. from DuckDB adbc_scanner): adbc_connect({ 'driver': 'libadbc_driver_databricks.dylib', 'databricks.server_hostname': '...', 'databricks.arrow.native_geospatial': 'true' })
When databricks.arrow.native_geospatial is enabled, the driver now converts Struct<srid: Int32, wkb: Binary> columns to flat Binary columns with ARROW:extension:name=geoarrow.wkb metadata. This enables downstream consumers (e.g. DuckDB adbc_scanner) to automatically map geometry columns to native GEOMETRY types without any explicit ST_GeomFromWKB conversion. Pipeline: Databricks -> Struct<srid,wkb> -> geoarrow.wkb -> native GEOMETRY Benchmarks vs baseline (ST_AsBinary + ST_GeomFromWKB): 100k points: 2.05x faster (31k rows/sec vs 15k rows/sec) 10k polygons: 1.31x faster (4.5k rows/sec vs 3.4k rows/sec)
Defer schema transformation to the first Next() call so the SRID can be read from the first non-null row of each geometry column. The SRID is encoded as PROJJSON CRS in ARROW:extension:metadata, e.g. EPSG:4326 or EPSG:3857. This ensures CRS information propagates correctly to downstream consumers (DuckDB, pandas, polars, GDAL). Split transformSchemaForGeoArrow into: - detectGeometryColumns: finds geometry struct column indices (called in constructor) - buildGeoArrowSchema: builds geoarrow schema with CRS from first batch (called lazily) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The schema must be available before the first Next() call since consumers like adbc_scanner read it upfront to create table columns. Build the geoarrow.wkb schema eagerly with empty CRS metadata in the constructor, then enrich it with the actual SRID from the first record batch during the first Next() call. Verified: DuckDB now correctly recognizes geometry columns as native GEOMETRY type via the geoarrow.wkb extension metadata. Benchmark results (Databricks → DuckDB): - 100k points: 7x faster than ST_AsBinary baseline - 10k polygons: 3.6x faster than ST_AsBinary baseline Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Dewey Dunnington <dewey@dunnington.ca>
Remove azure-prod environment requirement and test-packages job so the release workflow can run on forks without Databricks credentials. The pipeline is now: build → package → release.
Add missing `then` after `if` condition in version check.
The cdylib/staticlib targets need the ffi feature to export AdbcDriverInit. Make metadata-ffi (which includes ffi) the default so release builds work correctly with DuckDB and other ADBC consumers.
The adbc-make check rejects the extra metadata symbols exported by metadata-ffi. Use just ffi to export only AdbcDriverInit.
native-tls links against system OpenSSL, causing ABI mismatches (e.g. libssl.so.10 vs OpenSSL 3) on different distros. rustls is a pure Rust TLS implementation with no system dependency.
The webpki-roots crate (rustls dependency) uses CDLA-Permissive-2.0. Add it to the accepted licenses list for cargo-about.
The tokio Runtime was owned by Connection and could be dropped while an FFI_ArrowArrayStream reader was still alive (e.g. if DuckDB releases the Connection before finishing the scan). The reader held only a Handle, which becomes invalid after Runtime is dropped. Fix: wrap Runtime in Arc<Runtime> and propagate it to: - Connection (owns Arc<Runtime>) - Statement (holds Arc<Runtime>) - ResultReaderFactory (holds _runtime_keepalive Arc) - CloudFetchResultReader (holds _runtime_keepalive Arc) This ensures the runtime stays alive as long as any reader exists, preventing "double free or corruption" crashes at the FFI boundary.
The export_driver! macro from adbc_ffi v0.22 hard-rejects ADBC 1.0.0 callers and always writes the full 1.1.0 AdbcDriver struct (58 fields, 464 bytes). DuckDB's adbc_scanner (v1.5.0) allocates a 1.0.0-sized buffer (29 fields, 232 bytes), so writing the 1.1.0 struct causes a heap buffer overflow → "double free or corruption" crash at connect. Replace the macro with custom AdbcDriverInit/AdbcDatabricksInit that: - Accept ADBC_VERSION_1_0_0: write only the first 29 fields - Accept ADBC_VERSION_1_1_0: write the full struct - Reject unknown versions with a clear error
The upstream adbc_ffi's release_ffi_error frees error.message via CString::from_raw but doesn't null the pointer or clear the release callback. When DuckDB's ADBC driver manager calls release and then the error struct is cleaned up again (C++ destructor or second release call), the stale pointer causes "free(): double free detected in tcache 2" crash. Fix: patch adbc_ffi locally via [patch.crates-io] to null out error.message, error.private_data, and error.release after freeing. This makes the release function idempotent. Also revert the custom AdbcDriverInit (V100 compat was not needed — the driver manager allocates a full V110 struct and passes V110).
DuckDB's adbc_scanner calls StatementPrepare before StatementExecuteQuery. Without it, adbc_scan fails. Databricks doesn't have server-side prepared statements, so accept prepare as a no-op that validates a query has been set.
When set to "true", passes arrow_native_geospatial=true as a session
configuration parameter to the SEA API. This causes geometry/geography
columns to arrive as Arrow native structs (Struct<srid: Int32, wkb: Binary>)
instead of EWKT strings, enabling GeoArrow passthrough.
Usage via adbc_connect (e.g. from DuckDB adbc_scanner):
adbc_connect({
'driver': 'libadbc_driver_databricks.so',
'uri': 'https://...',
'databricks.arrow.native_geospatial': 'true',
...
})
When `databricks.arrow.native_geospatial=true`, the driver: 1. Sends arrow_native_geospatial session config to SEA API 2. Detects Databricks geometry Struct<srid: Int32, wkb: Binary> columns 3. Rewrites schema: Struct → Binary with ARROW:extension:name=geoarrow.wkb 4. Per batch: zero-copy extracts wkb child array from struct 5. Captures SRID from first batch, encodes as PROJJSON CRS Result: DuckDB adbc_scanner returns native GEOMETRY — no ST_GeomFromWKB needed.
- Simplify Go release workflow for fork (skip integration tests) - Merge arrow-native-geospatial branch (GeoArrow conversion pipeline) - Add replace directive for jatorre/databricks-sql-go fork with WithArrowNativeGeospatial() Thrift flag support (SPARK-54232)
Merge 28 upstream commits while preserving fork changes (geoarrow.wkb export, rustls-tls, FFI fixes, StatementPrepare no-op). Conflict resolutions: - Cargo.toml: keep rustls-tls (portable binaries) - database.rs: add upstream auth_config/retry_config alongside use_arrow_native_geospatial - reader/mod.rs: merge ResultReaderAdapter::new() to accept both manifest and use_geoarrow - statement.rs, ffi/metadata.rs: update call sites for 3-arg new() - http.rs: add explicit PEM validation for rustls compatibility
The Go ADBC driver was never passing WithCloudFetch(true) to databricks-sql-go, so all data transfers went through inline Thrift IPC (single-threaded). Enabling CloudFetch uses parallel downloads from cloud storage via presigned URLs. Benchmark (Slovenia OSM, warm warehouse): - Points (51K): 2m11s → 44s (3x faster) - Polygons (934K): 4m30s → 2m16s (2x faster)
The current executeIngest loops over each row and issues a separate
parameterized INSERT against the SQL warehouse — measured at ~1.5 s/row
on a serverless Databricks SQL warehouse, which makes adbc_insert
impractical above ~few hundred rows. When the new
databricks.bulk.volume_path option is set, executeIngest instead
streams Arrow batches as Parquet to a Volume and issues one COPY INTO
per batch. RSS stays bounded to one batch (~37 MB at 100 K rows in a
side-by-side bench), throughput jumps from ~0.7 rows/s to ~6 K rows/s
at 100 K rows.
For geometry columns annotated as geoarrow.wkb on the Arrow schema:
- The Databricks DDL emitter (new databricksTypeForField) maps the
field to GEOMETRY(<srid>) using the EPSG code from the extension's
CRS metadata, so the destination column is a typed geometry rather
than the BINARY default. Bare GEOMETRY without an SRID modifier is
rejected on most Databricks SQL runtimes.
- The staged Parquet schema strips the extension metadata so the file
carries plain BINARY columns; the COPY INTO transform projects each
geometry column through ST_GEOMFROMWKB(<col>, <srid_literal>) so the
source binary is rebuilt as typed GEOMETRY(srid) on insert. The
SRID has to be a SQL literal — passing it as a column from the
Parquet file produces an untyped GEOMETRY whose schema can't be
merged with the destination column.
Plumbing: the connection now carries serverHostname / accessToken /
bulkVolumePath copied from databaseImpl, used by the Files API
helpers (uploadToVolume / deleteFromVolume) to PUT/DELETE staged
Parquet files via /api/2.0/fs/files{path}.
The legacy per-row INSERT path is kept as the fallback when
databricks.bulk.volume_path is unset, so this change is purely
additive and doesn't break callers without Volume access.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Replaces the per-row `INSERT` loop in `go/bulk_ingest.go`'s `executeIngest` with a streaming Parquet + COPY INTO path when a new `databricks.bulk.volume_path` option is set. Falls back to the per-row path when the option is unset, so this is purely additive and non-breaking.
Measured ~7,000× speedup at 100 K rows on a serverless Databricks SQL warehouse, with RSS bounded to ~11 MB regardless of source size.
Why
The current implementation issues one `INSERT INTO t VALUES (?, ?, ?)` per row through `db/sql.ExecContext`. Each call is a round-trip to the warehouse; in practice that pegs at ~0.7 rows/s, making `adbc_insert` impractical above a few hundred rows. The path I see commonly worked around outside the driver is "write Parquet to a Volume, COPY INTO" — this PR moves that into the driver so callers don't have to.
How
When `databricks.bulk.volume_path` is set, `executeIngest` switches to:
Geometry handling:
- Walk the schema for `geoarrow.wkb` extension fields. CRS metadata is parsed in two shapes: the simple `{"crs":"EPSG:N"}` string AND DuckDB's `{"crs_type":"projjson","crs":{... "id":{"authority":"EPSG","code":N}}}` PROJJSON form.
- DDL emits `GEOMETRY()` for those columns (bare `GEOMETRY` is rejected on the SQL warehouses I tested).
- The COPY INTO transform projects each through `ST_GEOMFROMWKB(
, <srid_literal>)` so the staged BINARY rebuilds as a typed geometry on insert. The SRID has to be a SQL literal — passing it as a parquet column produces an untyped `GEOMETRY` whose schema can't be merged with the destination column (`DELTA_FAILED_TO_MERGE_FIELDS`).The existing `createTable` / `arrowTypeToDatabricksType` mapping has no geometry case (falls back to BINARY), so the new path uses a sister `createGeoAwareTable` that knows about the detected geo columns. I left the original codepath untouched.
Measurements
End-to-end through DuckDB's `adbc_scanner` calling `adbc_insert(handle, table, (SELECT ...))`. Source = synthetic POINT geometries. Warehouse warmed up before each timed run. Numbers from `tests/databricks-write-bench` and the in-driver harness in duckdb-warehouse-transfer (a CARTO-internal test workspace; happy to inline the harness here if useful).
So the patched driver is ~7,000× faster than the current main at 100 K rows, lands ~81 % of the raw-SQL ceiling, and uses ~30 % of its RAM. RSS is independent of source size because only one Parquet writer + one inflight HTTP body live at a time.
What's not changed
Things I'd like reviewer guidance on
Test plan