-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[core] Support reading sequence_number in AuditLogTable and BinlogTable #6858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
ed453a8 to
eae1a70
Compare
eae1a70 to
7ab955a
Compare
yunfengzhou-hub
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! Left some comments as below.
|
|
||
| // Join system fields first, then physical fields | ||
| // Natural order: [system fields...] + [physical fields...] | ||
| joinedRow.replace(systemFieldsRow, value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little concerned about the correctness risk due to object reuse. We can see that in this project, JoinedRow#replace is mainly used in two ways
new JoinedRow().replace: no reuse for this class.joinedRow.replacesoon followed by a serializer or writer, which means there is no more modification to this row after reused.
But here, the joined row will be output by the source to downstream operators, whose operations might modify the row in place and have KeyValueSystemFieldsRecordReader and that operator affects each other.
You can refer to online blogs and stackoverflow questions about the correctness risk of Flink object reuse mechanism, and check whether it is acceptable to enable object reuse here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch! You're right - downstream operator behavior is unpredictable and could mutate the row in-place. I've changed it to new JoinedRow(). The minor overhead of object allocation is acceptable to ensure correctness and avoid potential interference between KeyValueSystemFieldsRecordReader and downstream operators.
| @Nullable int[] projection) { | ||
| if (systemFieldExtractors.isEmpty()) { | ||
| // No system fields, use the default unwrap logic | ||
| return KeyValueTableRead.unwrap(reader); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to merge this method into KeyValueTableRead#unwrap. As a decorator/wrapper to this method, extending the original method might be more beneficial to extensibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I've moved the logic into KeyValueTableRead.java. The method structure is now:
unwrap(RecordReader<KeyValue> reader, List<SpecialFieldExtractor> specialFieldExtractors, @Nullable int[] projection)as public method for extensibilityunwrap(RecordReader<KeyValue> reader)as private method for internal use
| * | ||
| * <p>Extracts the sequence number from KeyValue metadata. | ||
| */ | ||
| SystemFieldExtractor SEQUENCE_NUMBER = kv -> kv.sequenceNumber(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| SystemFieldExtractor SEQUENCE_NUMBER = kv -> kv.sequenceNumber(); | |
| SystemFieldExtractor SEQUENCE_NUMBER = KeyValue::sequenceNumber; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| * future implementation where level information would need to be tracked through the read | ||
| * path. | ||
| */ | ||
| SystemFieldExtractor LEVEL = kv -> null; // TODO: Level information needs to be propagated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this system field cannot be extracted for now, I would prefer to direct throw exceptions in such cases, instead of giving users a false assumption that it could work, but only gets null values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now throw new UnsupportedOperationException
| * scenarios. | ||
| */ | ||
| SystemFieldExtractor ROW_ID = | ||
| kv -> null; // ROW_ID is computed from file metadata, not available in KeyValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now throw new UnsupportedOperationException
| checkAnswer( | ||
| sql("SELECT * FROM paimon_incremental_query('`t$audit_log`', 'tag1', 'tag2') ORDER BY id"), | ||
| Seq(Row("-D", 999))) | ||
| Seq(Row("-D", 100002L, 999))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100002L is the value of id column, rather than sequence number. Not sure whether it is correct here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
999 is the value of id, 100002L is the sequence number
| } | ||
|
|
||
| /** Creates a table with changelog producer enabled. */ | ||
| private FileStoreTable createChangelogTable(String tableName) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is only used in one place. So compared with introducing this method, it might be better to preserve the original code structure and commit history.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While it's currently used in one place, I prefer keeping this helper method for single responsibility and better extensibility. It encapsulates the changelog table creation logic cleanly, making future test additions easier without duplicating configuration code.
| import java.util.Map; | ||
|
|
||
| /** | ||
| * A decorator for {@link RecordReader} that injects system fields into the output rows for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A decorator pattern should have the same wrapped type and output type, but here the wrapped is RecordReader<KeyValue>, and the output is RecordReader<InternalRow>. It might be more proper to call this class a wrapper rather than a decorator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to wrapped
| */ | ||
| @Nullable | ||
| public static SystemFieldExtractor getExtractor(String fieldName) { | ||
| return EXTRACTOR_REGISTRY.get(fieldName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method only checks field names. It might be better to also check field data types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For special fields (like _ROWKIND, _SEQUENCE_NUMBER), field name matching is sufficient.
| * <p><b>Performance:</b> Implementation uses {@link JoinedRow} for zero-copy concatenation of | ||
| * system fields and physical fields, then {@link ProjectedRow} for zero-copy field reordering. | ||
| */ | ||
| public class KeyValueSystemFieldsRecordReader implements RecordReader<InternalRow> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the existing SpecialFields class, Might be better to name them as KeyValueSpecialFieldsRecordReader and SpecialFieldExtractor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion. i have changed all terms: system -> special
Purpose
This PR adds support for accessing system fields (specifically
sequence_number) inAuditLogTableandBinlogTable.Currently, downstream consumers often need the
sequence_numberto ensure strict ordering or to implement exactly-once processing when reading from audit logs or binlogs. However, these system tables previously did not expose this field.This change implements
KeyValueSystemFieldsRecordReaderto enable the projection of system fields for KeyValue records, allowing users to querysequence_numberdirectly from audit log and binlog tables.Tests
IncrementalReadSystemFieldsTestto verify functionality.AuditLogTableTestandBinlogTableTestto coversequence_numberread cases.API and Format
Documentation
docs/content/concepts/system-tables.mdneeds to change