[Spanner Change Streams] Parse proto bytes change stream record in ChangeStreamRecordMapper#37427
[Spanner Change Streams] Parse proto bytes change stream record in ChangeStreamRecordMapper#37427chenxuesdu wants to merge 1 commit intoapache:masterfrom
Conversation
Summary of ChangesHello @chenxuesdu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the Spanner Change Streams connector to support parsing change stream records that are delivered as proto bytes, particularly in PostgreSQL environments. By introducing specific methods to identify and deserialize these proto bytes, the system can now uniformly process change stream data, whether it originates as JSONB or proto, leveraging existing proto-based mapping logic. This change improves compatibility and robustness for different Spanner database dialects and output formats. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
| public com.google.spanner.v1.ChangeStreamRecord getProtoChangeStreamRecordFromBytes() { | ||
| recordReadAt = Timestamp.now(); | ||
| // Use getBytes(0) for the BYTES column returned by read_proto_bytes_ TVF | ||
| byte[] protoBytes = resultSet.getBytes(0).toByteArray(); |
There was a problem hiding this comment.
Can we pass this directly to the function below to avoid an extra copy of the bytes?
There was a problem hiding this comment.
Updated. Thanks.
| && resultSet.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.PROTO; | ||
| } | ||
|
|
||
| public com.google.spanner.v1.ChangeStreamRecord getProtoChangeStreamRecordFromBytes() { |
There was a problem hiding this comment.
Can we add accurate comments for this function, following similar convention for other functions? Thanks.
There was a problem hiding this comment.
Added comments. Please take a look.
| try { | ||
| return com.google.spanner.v1.ChangeStreamRecord.parseFrom(protoBytes); | ||
| } catch (InvalidProtocolBufferException e) { | ||
| throw new RuntimeException("Failed to parse ChangeStreamRecord proto", e); |
There was a problem hiding this comment.
Maybe "Failed to parse the proto bytes to ChangeStreamRecord proto".
There was a problem hiding this comment.
Updated. Thanks.
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
a3858ab to
5e5c9aa
Compare
|
Run Java_GCP_IO_Direct PreCommit |
1 similar comment
|
Run Java_GCP_IO_Direct PreCommit |
|
Run Java_GCP_IO_Direct PreCommit |
|
Assigning reviewers: R: @Abacn for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
| * Returns the only change stream record proto at the current pointer of the result set. It also | ||
| * updates the timestamp at which the record was read. This function enhances the getProtoMessage | ||
| * function but only focus on the ChangeStreamRecord type. | ||
| * |
There was a problem hiding this comment.
Can you add a similar comment here for GoogleSQl databases like you did for line 136? Thanks.
| if (this.isPostgres()) { | ||
| // In PostgresQL, change stream records are returned as JsonB. | ||
| // For `MUTABLE_KEY_RANGE` option, change stream records are returned as protos. | ||
| if (resultSet.isProtoBytesChangeRecord()) { |
There was a problem hiding this comment.
Maybe we have the getBytes(0) reflected here to match up getPgJsonb(0) so that it is very clear
that we should only expect one column from the result set.
nielm
left a comment
There was a problem hiding this comment.
LGTM - pending existing comments
When parsing the change stream record in postgresql, if the output is proto bytes, map it to proto type, so we can reuse the existing logic to parse the change stream record.