Migrate BookkeeperProtocol from protobuf-java to LightProto#4780
Migrate BookkeeperProtocol from protobuf-java to LightProto#4780
Conversation
The batchReadUnconfirmedAsync method added in #4739 calls LOG.error(...), but LedgerHandle was migrated to slog and only has a lowercase `log` field. Master fails to compile. Convert the call to the slog builder style used elsewhere in the file.
… to LightProto Replace Google's protobuf-java with StreamNative LightProto for the storage and metadata formats in `bookkeeper-proto`. The wire protocol (`BookkeeperProtocol`) remains on protobuf-java for now. LightProto generates mutable, reusable, ByteBuf-aware classes with built-in proto2 TextFormat (de)serialization (via `generateTextFormat=true`), so the existing TextFormat-based znode payloads (cookies, auditor votes, lock data, underreplication entries, layout) round-trip byte-identically. Notable behavior changes: - `BookKeeper.DigestType.toProtoDigestType` now returns the LightProto-generated enum (same constants, different package). - v3 ledger metadata uses a hand-rolled length-prefixed delimited writer/reader matching protobuf's `writeDelimitedTo`/`mergeDelimitedFrom`.
The existing exclude `~org.apache.bookkeeper.proto.DataFormats.*` matched protobuf's nested `DataFormats$LedgerMetadataFormat` etc. LightProto generates the same messages as flat top-level classes (`LedgerMetadataFormat` directly in `org.apache.bookkeeper.proto`), so those weren't excluded and triggered 12 bugs in the generated code (bit-twiddling, exposed internal byte arrays, etc.) that aren't actionable. Replace the obsolete `DataFormats.*` exclude with explicit per-message patterns covering both packages and `LightProtoCodec` (also generated per-package).
Migrates the BookkeeperProtocol.proto wire protocol to use LightProto for serialization. Combined with the prior migrations of DataFormats and DbLedgerStorageDataFormats, this drops the protobuf-java runtime dependency from bookkeeper-proto entirely. LightProto produces wire-compatible output with protoc for the same .proto, so on-the-wire bookie/client compatibility is preserved. Notes on lifecycle handling: - LightProto messages parsed from a ByteBuf hold lazy references into that buffer for field access. The decoders now call materialize() on parsed Request/Response/AuthMessage instances so they survive after the source buffer is released. - Server response paths that put entry payloads into ReadLacResponse or ReadResponse now copy the bytes via ByteBufUtil.getBytes(...), matching the previous ByteString.copyFrom semantics. Drive-by fix: processWriteLacRequestV3/processReadLacRequestV3 were ordering work on r.getAddRequest().getLedgerId() instead of the matching WriteLac/ReadLac request. With protobuf this returned a default 0 for the unset field; with LightProto it throws IllegalStateException.
15303d5 to
2222dc6
Compare
protobuf-java is no longer pulled in transitively by bookkeeper-server, so the shaded jars no longer contain (shaded) protobuf classes, and the flat lightproto-generated classes have replaced the BookkeeperProtocol outer class. - BookKeeperServerShadedJarTest / DistributedLogCoreShadedJarTest: drop the now-irrelevant testProtobufShadedPath checks and switch the BookkeeperProtocol presence check to a real lightproto class (AddRequest). - Drop the dead com.google.protobuf:protobuf-java <include> from the three shade plugin configs (bookkeeper-server-shaded, bookkeeper-server-tests-shaded, distributedlog-core-shaded).
lhotari
left a comment
There was a problem hiding this comment.
Some comments regarding ByteBufList handling and passing a ByteBuf instance directly (instead of calling ByteBufUtil.getBytes(bytebuf))
| Runnable cleanupActionFailedBeforeWrite = toSend::release; | ||
| Runnable cleanupActionAfterWrite = cleanupActionFailedBeforeWrite; | ||
| WriteLacRequest.Builder writeLacBuilder = WriteLacRequest.newBuilder() | ||
| ByteBuf body = ByteBufList.coalesce(toSend); |
There was a problem hiding this comment.
Use the suggested ByteBufList.toByteBuf (introduced in the addEntry method comment) instead of the existing ByteBufList.coalesce method which isn't optimized (the method is annotated with @VisibleForTesting ) and doesn't handle ByteBufList reference counting.
| ByteString body = ByteStringUtil.byteBufListToByteString(bufToSend); | ||
| bufToSend.retain(); | ||
| cleanupActionFailedBeforeWrite = bufToSend::release; | ||
| ByteBuf body = ByteBufList.coalesce(bufToSend); |
There was a problem hiding this comment.
This causes a performance regression since ByteBufList.coalesce is implemented using an unpooled heap buffer. The method is marked with @VisibleForTesting which signals that it's only meant for testing.
It could be useful to implement a new instance method in ByteBufList:
/**
* Returns a coalesced {@link ByteBuf} containing all buffers in this list, and
* decrements this {@code ByteBufList}'s reference count.
*
* <p>Ownership semantics: the returned {@link ByteBuf} owns a reference to the
* underlying buffer data. The caller is responsible for releasing the returned
* {@link ByteBuf} when it is no longer needed. This {@code ByteBufList} is
* released as part of this call.
*
* <p>If this list is empty, {@link Unpooled#EMPTY_BUFFER} is returned.
* If it contains exactly one buffer, that buffer is returned directly (with its
* reference count transferred to the caller) to avoid the overhead of wrapping
* it in a {@link CompositeByteBuf}.
*
* @param allocator the {@link ByteBufAllocator} used to allocate the
* {@link CompositeByteBuf} when more than one buffer is present
* @return a {@link ByteBuf} containing the coalesced contents of this list
*/
public ByteBuf toByteBuf(ByteBufAllocator allocator) {
final int size = buffers.size();
if (size == 0) {
release();
return Unpooled.EMPTY_BUFFER;
}
if (size == 1) {
// Fast path: avoid wrapping a single buffer in a CompositeByteBuf.
// Retain so the buffer survives our release() below; ownership is
// transferred to the caller.
ByteBuf single = buffers.get(0).retain();
release();
return single;
}
CompositeByteBuf composite = allocator.compositeBuffer(size);
for (int i = 0; i < size; i++) {
// Buffers need to be retained because ownership is handed over to
// the CompositeByteBuf, while this ByteBufList continues to hold
// its own references until release() below.
composite.addComponent(true, buffers.get(i).retain());
}
release();
return composite;
}| lac = requestProcessor.bookie.getExplicitLac(ledgerId); | ||
| if (lac != null) { | ||
| readLacResponse.setLacBody(ByteString.copyFrom(lac.nioBuffer())); | ||
| readLacResponse.setLacBody(ByteBufUtil.getBytes(lac)); |
There was a problem hiding this comment.
Pass ByteBuf directly? Also ensure whether .retain() also needs to be called.
| LedgerData newLedgerData = LedgerData.newBuilder(ledgerData) | ||
| .setExplicitLac(ByteString.copyFrom(lac.nioBuffer())).build(); | ||
| LedgerData newLedgerData = new LedgerData().copyFrom(ledgerData) | ||
| .setExplicitLac(ByteBufUtil.getBytes(lac)); |
There was a problem hiding this comment.
Pass ByteBuf directly?
Special attention is needed for the ref count in this case since ByteString.copyFrom is used in the previous code. Does .retain() need to be called?
| } else { | ||
| try { | ||
| readResponseBuilder.setBody(ByteString.copyFrom(entryBody.nioBuffer())); | ||
| readResponseBuilder.setBody(ByteBufUtil.getBytes(entryBody)); |
There was a problem hiding this comment.
Pass ByteBuf directly?
Special attention is needed for the ref count in this case since ByteString.copyFrom is used in the previous code. Does .retain() need to be called?
| } else { | ||
| status = StatusCode.EOK; | ||
| readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer())); | ||
| readResponse.setBody(ByteBufUtil.getBytes(entryBody)); |
There was a problem hiding this comment.
Pass ByteBuf directly? Also ensure whether .retain() also needs to be called.
| lastEntry = requestProcessor.bookie.readEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED); | ||
| if (lastEntry != null) { | ||
| readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry.nioBuffer())); | ||
| readLacResponse.setLastEntryBody(ByteBufUtil.getBytes(lastEntry)); |
There was a problem hiding this comment.
Pass ByteBuf directly? Also ensure whether .retain() also needs to be called.
eolivelli
left a comment
There was a problem hiding this comment.
(I clicked on Approve by mistake)
I am happy with this improvement, but we have to solve the conflicts and fix CI
Summary
Migrates the BookkeeperProtocol.proto wire protocol to use LightProto for serialization. Combined with the prior migration of DataFormats / DbLedgerStorageDataFormats (#4779), this drops the protobuf-java runtime dependency from
bookkeeper-protoentirely.LightProto produces wire-compatible output with protoc for the same
.proto, so on-the-wire bookie/client compatibility is preserved.Lifecycle notes
ByteBufhold lazy references into that buffer for field access. The decoders now callmaterialize()on parsedRequest/Response/AuthMessageinstances so they survive after the source buffer is released.ReadLacResponseorReadResponsenow copy the bytes viaByteBufUtil.getBytes(...), matching the previousByteString.copyFromsemantics.Drive-by fix
processWriteLacRequestV3/processReadLacRequestV3inBookieRequestProcessorwere ordering work onr.getAddRequest().getLedgerId()instead of the matching WriteLac/ReadLac request. With protobuf this returned a default0for the unset field; with LightProto it throwsIllegalStateException. Now uses the correct accessor.Test plan
BookieProtoEncodingTest,TestBookieRequestProcessor,TestPerChannelBookieClient,WriteEntryProcessorV3Test,LongPollReadEntryProcessorV3Test,GetBookieInfoProcessorV3Test,ForceLedgerProcessorV3TestTestBackwardCompatCMS42(V2 wire protocol withAuthMessage)BookKeeperTest,BookieClientTest,BookieInitializationTest,DbLedgerStorageTest,CookieTest,TestLedgerMetadataSerDe,TestLedgerUnderreplicationManager,TestGetBookieInfoTimeout