Skip to content

feat(go): bulk ingest via Volume + COPY INTO instead of per-row INSERT#411

Draft
jatorre wants to merge 24 commits intoadbc-drivers:mainfrom
jatorre:feat/bulk-volume-copy-on-main
Draft

feat(go): bulk ingest via Volume + COPY INTO instead of per-row INSERT#411
jatorre wants to merge 24 commits intoadbc-drivers:mainfrom
jatorre:feat/bulk-volume-copy-on-main

Conversation

@jatorre
Copy link
Copy Markdown

@jatorre jatorre commented Apr 26, 2026

Summary

Replaces the per-row `INSERT` loop in `go/bulk_ingest.go`'s `executeIngest` with a streaming Parquet + COPY INTO path when a new `databricks.bulk.volume_path` option is set. Falls back to the per-row path when the option is unset, so this is purely additive and non-breaking.

Measured ~7,000× speedup at 100 K rows on a serverless Databricks SQL warehouse, with RSS bounded to ~11 MB regardless of source size.

Why

The current implementation issues one `INSERT INTO t VALUES (?, ?, ?)` per row through `db/sql.ExecContext`. Each call is a round-trip to the warehouse; in practice that pegs at ~0.7 rows/s, making `adbc_insert` impractical above a few hundred rows. The path I see commonly worked around outside the driver is "write Parquet to a Volume, COPY INTO" — this PR moves that into the driver so callers don't have to.

How

When `databricks.bulk.volume_path` is set, `executeIngest` switches to:

  1. Open a Parquet writer over a local temp file (`pqarrow.NewFileWriter`).
  2. Accumulate Arrow record batches from `s.boundStream` into the same file until the row count reaches `databricks.bulk.batch_rows` (default 20 000).
  3. Close the writer; `PUT` the file via the Files API (`/api/2.0/fs/files{path}?overwrite=true`); issue `COPY INTO FROM (SELECT ... FROM '/Volumes/.../file.parquet') FILEFORMAT = PARQUET`; `DELETE` the staged file.
  4. Repeat until the stream ends; final batch flushes any remainder.
  5. Geometry handling:

    • Walk the schema for `geoarrow.wkb` extension fields. CRS metadata is parsed in two shapes: the simple `{"crs":"EPSG:N"}` string AND DuckDB's `{"crs_type":"projjson","crs":{... "id":{"authority":"EPSG","code":N}}}` PROJJSON form.
    • DDL emits `GEOMETRY()` for those columns (bare `GEOMETRY` is rejected on the SQL warehouses I tested).
    • The COPY INTO transform projects each through `ST_GEOMFROMWKB(
    , <srid_literal>)` so the staged BINARY rebuilds as a typed geometry on insert. The SRID has to be a SQL literal — passing it as a parquet column produces an untyped `GEOMETRY` whose schema can't be merged with the destination column (`DELTA_FAILED_TO_MERGE_FIELDS`).
  6. A `databricks.bulk.geometry_columns` option (`"col:srid,col:srid"`) hints the driver for sources that emit BINARY without geoarrow.wkb metadata.
  7. The existing `createTable` / `arrowTypeToDatabricksType` mapping has no geometry case (falls back to BINARY), so the new path uses a sister `createGeoAwareTable` that knows about the detected geo columns. I left the original codepath untouched.

    Measurements

    End-to-end through DuckDB's `adbc_scanner` calling `adbc_insert(handle, table, (SELECT ...))`. Source = synthetic POINT geometries. Warehouse warmed up before each timed run. Numbers from `tests/databricks-write-bench` and the in-driver harness in duckdb-warehouse-transfer (a CARTO-internal test workspace; happy to inline the harness here if useful).

    Rows Path Throughput RSS Δ
    100 K per-row INSERT (this PR's fallback / current main) ~0.7 rows/s (extrapolated, 1.5 s/row × 100K = ~42 h) ~10 MB
    100 K Volume + COPY INTO, no accumulator (one Parquet per Arrow batch) 748 rows/s 11 MB
    100 K Volume + COPY INTO, 20 K-row accumulator 5,110 rows/s 11 MB
    100 K Raw-SQL bench (large-batch ceiling, no DDL overhead) 6,324 rows/s 37 MB

    So the patched driver is ~7,000× faster than the current main at 100 K rows, lands ~81 % of the raw-SQL ceiling, and uses ~30 % of its RAM. RSS is independent of source size because only one Parquet writer + one inflight HTTP body live at a time.

    What's not changed

    • Per-row INSERT path — left in place verbatim, runs when `databricks.bulk.volume_path` is unset.
    • Read side / IPC reader / native geospatial read — orthogonal; this PR only touches the write side.
    • Schema-merge edge cases for non-geometry types — the new path uses identical Arrow → Databricks DDL mapping for non-geo columns.

    Things I'd like reviewer guidance on

    1. Volume location. This PR requires the caller to provide a Volume path; the driver doesn't try to auto-derive one. An auto-default of e.g. `/Volumes///__adbc_staging` would be ergonomic but assumes that volume exists with the right ACLs. I'd rather fail loudly than silently fall back to per-row.
    2. Accumulator policy. Currently row-count-based (`databricks.bulk.batch_rows`). Byte-size-based or hybrid would be more robust to varying row sizes; happy to add if preferred.
    3. Error semantics on partial failure. If batch N fails, batches 0..N-1 are already committed — same as upstream's per-row path, where the same partial-commit behavior applies. Worth documenting more loudly?

    Test plan

    • `go build ./...` passes
    • End-to-end smoke at 1 K, 10 K, 100 K rows lands the expected count, geom column ends up as `GEOMETRY(4326)`, and `ST_SRID(geom) = 4326` round-trips correctly
    • Validation suite via `pixi run validate` — haven't run locally; will do once I see whether reviewers want this on a feature branch or merged with the existing options
    • A unit test for `parseEPSGFromExtensionMetadata` (both shapes) and the COPY-INTO transform builder

jatorre and others added 24 commits March 14, 2026 09:53
Expose geospatialAsArrow support (SPARK-54232) as an opt-in ADBC
connection option. When set to "true", geometry/geography columns
arrive as Struct<srid: Int32, wkb: Binary> instead of EWKT strings.

This depends on databricks/databricks-sql-go#328 which adds the
WithArrowNativeGeospatial() ConnOption to the underlying Go SQL driver.

Usage via adbc_connect (e.g. from DuckDB adbc_scanner):

  adbc_connect({
    'driver': 'libadbc_driver_databricks.dylib',
    'databricks.server_hostname': '...',
    'databricks.arrow.native_geospatial': 'true'
  })
When databricks.arrow.native_geospatial is enabled, the driver now
converts Struct<srid: Int32, wkb: Binary> columns to flat Binary
columns with ARROW:extension:name=geoarrow.wkb metadata.

This enables downstream consumers (e.g. DuckDB adbc_scanner) to
automatically map geometry columns to native GEOMETRY types without
any explicit ST_GeomFromWKB conversion.

Pipeline: Databricks -> Struct<srid,wkb> -> geoarrow.wkb -> native GEOMETRY

Benchmarks vs baseline (ST_AsBinary + ST_GeomFromWKB):
  100k points:  2.05x faster (31k rows/sec vs 15k rows/sec)
  10k polygons: 1.31x faster (4.5k rows/sec vs 3.4k rows/sec)
Defer schema transformation to the first Next() call so the SRID can be
read from the first non-null row of each geometry column. The SRID is
encoded as PROJJSON CRS in ARROW:extension:metadata, e.g. EPSG:4326 or
EPSG:3857. This ensures CRS information propagates correctly to
downstream consumers (DuckDB, pandas, polars, GDAL).

Split transformSchemaForGeoArrow into:
- detectGeometryColumns: finds geometry struct column indices (called in constructor)
- buildGeoArrowSchema: builds geoarrow schema with CRS from first batch (called lazily)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The schema must be available before the first Next() call since
consumers like adbc_scanner read it upfront to create table columns.
Build the geoarrow.wkb schema eagerly with empty CRS metadata in the
constructor, then enrich it with the actual SRID from the first record
batch during the first Next() call.

Verified: DuckDB now correctly recognizes geometry columns as native
GEOMETRY type via the geoarrow.wkb extension metadata.

Benchmark results (Databricks → DuckDB):
- 100k points: 7x faster than ST_AsBinary baseline
- 10k polygons: 3.6x faster than ST_AsBinary baseline

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Dewey Dunnington <dewey@dunnington.ca>
Remove azure-prod environment requirement and test-packages job so the
release workflow can run on forks without Databricks credentials.
The pipeline is now: build → package → release.
Add missing `then` after `if` condition in version check.
The cdylib/staticlib targets need the ffi feature to export
AdbcDriverInit. Make metadata-ffi (which includes ffi) the default
so release builds work correctly with DuckDB and other ADBC consumers.
The adbc-make check rejects the extra metadata symbols exported by
metadata-ffi. Use just ffi to export only AdbcDriverInit.
native-tls links against system OpenSSL, causing ABI mismatches
(e.g. libssl.so.10 vs OpenSSL 3) on different distros. rustls is
a pure Rust TLS implementation with no system dependency.
The webpki-roots crate (rustls dependency) uses CDLA-Permissive-2.0.
Add it to the accepted licenses list for cargo-about.
The tokio Runtime was owned by Connection and could be dropped while
an FFI_ArrowArrayStream reader was still alive (e.g. if DuckDB releases
the Connection before finishing the scan). The reader held only a
Handle, which becomes invalid after Runtime is dropped.

Fix: wrap Runtime in Arc<Runtime> and propagate it to:
- Connection (owns Arc<Runtime>)
- Statement (holds Arc<Runtime>)
- ResultReaderFactory (holds _runtime_keepalive Arc)
- CloudFetchResultReader (holds _runtime_keepalive Arc)

This ensures the runtime stays alive as long as any reader exists,
preventing "double free or corruption" crashes at the FFI boundary.
The export_driver! macro from adbc_ffi v0.22 hard-rejects ADBC 1.0.0
callers and always writes the full 1.1.0 AdbcDriver struct (58 fields,
464 bytes). DuckDB's adbc_scanner (v1.5.0) allocates a 1.0.0-sized
buffer (29 fields, 232 bytes), so writing the 1.1.0 struct causes a
heap buffer overflow → "double free or corruption" crash at connect.

Replace the macro with custom AdbcDriverInit/AdbcDatabricksInit that:
- Accept ADBC_VERSION_1_0_0: write only the first 29 fields
- Accept ADBC_VERSION_1_1_0: write the full struct
- Reject unknown versions with a clear error
The upstream adbc_ffi's release_ffi_error frees error.message via
CString::from_raw but doesn't null the pointer or clear the release
callback. When DuckDB's ADBC driver manager calls release and then
the error struct is cleaned up again (C++ destructor or second release
call), the stale pointer causes "free(): double free detected in
tcache 2" crash.

Fix: patch adbc_ffi locally via [patch.crates-io] to null out
error.message, error.private_data, and error.release after freeing.
This makes the release function idempotent.

Also revert the custom AdbcDriverInit (V100 compat was not needed —
the driver manager allocates a full V110 struct and passes V110).
DuckDB's adbc_scanner calls StatementPrepare before
StatementExecuteQuery. Without it, adbc_scan fails. Databricks
doesn't have server-side prepared statements, so accept prepare
as a no-op that validates a query has been set.
When set to "true", passes arrow_native_geospatial=true as a session
configuration parameter to the SEA API. This causes geometry/geography
columns to arrive as Arrow native structs (Struct<srid: Int32, wkb: Binary>)
instead of EWKT strings, enabling GeoArrow passthrough.

Usage via adbc_connect (e.g. from DuckDB adbc_scanner):

  adbc_connect({
    'driver': 'libadbc_driver_databricks.so',
    'uri': 'https://...',
    'databricks.arrow.native_geospatial': 'true',
    ...
  })
When `databricks.arrow.native_geospatial=true`, the driver:
1. Sends arrow_native_geospatial session config to SEA API
2. Detects Databricks geometry Struct<srid: Int32, wkb: Binary> columns
3. Rewrites schema: Struct → Binary with ARROW:extension:name=geoarrow.wkb
4. Per batch: zero-copy extracts wkb child array from struct
5. Captures SRID from first batch, encodes as PROJJSON CRS

Result: DuckDB adbc_scanner returns native GEOMETRY — no ST_GeomFromWKB needed.
- Simplify Go release workflow for fork (skip integration tests)
- Merge arrow-native-geospatial branch (GeoArrow conversion pipeline)
- Add replace directive for jatorre/databricks-sql-go fork with
  WithArrowNativeGeospatial() Thrift flag support (SPARK-54232)
Merge 28 upstream commits while preserving fork changes (geoarrow.wkb
export, rustls-tls, FFI fixes, StatementPrepare no-op).

Conflict resolutions:
- Cargo.toml: keep rustls-tls (portable binaries)
- database.rs: add upstream auth_config/retry_config alongside use_arrow_native_geospatial
- reader/mod.rs: merge ResultReaderAdapter::new() to accept both manifest and use_geoarrow
- statement.rs, ffi/metadata.rs: update call sites for 3-arg new()
- http.rs: add explicit PEM validation for rustls compatibility
The Go ADBC driver was never passing WithCloudFetch(true) to
databricks-sql-go, so all data transfers went through inline Thrift
IPC (single-threaded). Enabling CloudFetch uses parallel downloads
from cloud storage via presigned URLs.

Benchmark (Slovenia OSM, warm warehouse):
- Points (51K): 2m11s → 44s (3x faster)
- Polygons (934K): 4m30s → 2m16s (2x faster)
The current executeIngest loops over each row and issues a separate
parameterized INSERT against the SQL warehouse — measured at ~1.5 s/row
on a serverless Databricks SQL warehouse, which makes adbc_insert
impractical above ~few hundred rows.  When the new
databricks.bulk.volume_path option is set, executeIngest instead
streams Arrow batches as Parquet to a Volume and issues one COPY INTO
per batch.  RSS stays bounded to one batch (~37 MB at 100 K rows in a
side-by-side bench), throughput jumps from ~0.7 rows/s to ~6 K rows/s
at 100 K rows.

For geometry columns annotated as geoarrow.wkb on the Arrow schema:

- The Databricks DDL emitter (new databricksTypeForField) maps the
  field to GEOMETRY(<srid>) using the EPSG code from the extension's
  CRS metadata, so the destination column is a typed geometry rather
  than the BINARY default.  Bare GEOMETRY without an SRID modifier is
  rejected on most Databricks SQL runtimes.
- The staged Parquet schema strips the extension metadata so the file
  carries plain BINARY columns; the COPY INTO transform projects each
  geometry column through ST_GEOMFROMWKB(<col>, <srid_literal>) so the
  source binary is rebuilt as typed GEOMETRY(srid) on insert.  The
  SRID has to be a SQL literal — passing it as a column from the
  Parquet file produces an untyped GEOMETRY whose schema can't be
  merged with the destination column.

Plumbing: the connection now carries serverHostname / accessToken /
bulkVolumePath copied from databaseImpl, used by the Files API
helpers (uploadToVolume / deleteFromVolume) to PUT/DELETE staged
Parquet files via /api/2.0/fs/files{path}.

The legacy per-row INSERT path is kept as the fallback when
databricks.bulk.volume_path is unset, so this change is purely
additive and doesn't break callers without Volume access.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant