Skip to content

Commit 37ac451

Browse files
authored
DGS-19711 Support Protobuf oneof fields in Data Contract rules (#1392)
* DGS-19711 Support Protobuf oneof fields in Data Contract rules * Improve err msg
1 parent 82ca9d8 commit 37ac451

File tree

5 files changed

+209
-137
lines changed

5 files changed

+209
-137
lines changed

schemaregistry/rules/cel/cel_executor.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package cel
1818

1919
import (
2020
"encoding/json"
21+
"fmt"
2122
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
2223
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde"
2324
"github.com/google/cel-go/cel"
@@ -132,7 +133,7 @@ func (c *Executor) executeRule(ctx serde.RuleContext, expr string, obj interface
132133
c.cache[string(ruleJSON)] = program
133134
c.cacheLock.Unlock()
134135
}
135-
return c.eval(program, args)
136+
return c.eval(expr, program, args)
136137
}
137138

138139
func toDecls(args map[string]interface{}) []cel.EnvOption {
@@ -224,10 +225,10 @@ func (c *Executor) newProgram(expr string, msg interface{}, decls []cel.EnvOptio
224225
return prg, nil
225226
}
226227

227-
func (c *Executor) eval(program cel.Program, args map[string]interface{}) (interface{}, error) {
228+
func (c *Executor) eval(expr string, program cel.Program, args map[string]interface{}) (interface{}, error) {
228229
out, _, err := program.Eval(args)
229230
if err != nil {
230-
return nil, err
231+
return nil, fmt.Errorf("CEL expr %s failed: %w", expr, err)
231232
}
232233
if out.Type() == types.ErrType {
233234
return nil, out.Value().(error)

schemaregistry/serde/protobuf/protobuf_test.go

Lines changed: 48 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ const (
5252
(confluent.field_meta).tags = "PII"
5353
];
5454
repeated string works = 4;
55+
oneof pii_oneof {
56+
Pizza oneof_message = 5;
57+
string oneof_string = 6 [(.confluent.field_meta).tags = "PII"];
58+
}
5559
}
5660
5761
message Pizza {
@@ -109,9 +113,10 @@ func TestProtobufSerdeWithSimple(t *testing.T) {
109113
serde.MaybeFail("Serializer configuration", err)
110114

111115
obj := test.Author{
112-
Name: "Kafka",
113-
Id: 123,
114-
Works: []string{"The Castle", "The Trial"},
116+
Name: "Kafka",
117+
Id: 123,
118+
Works: []string{"The Castle", "The Trial"},
119+
PiiOneof: &test.Author_OneofString{OneofString: "oneof"},
115120
}
116121
bytes, err := ser.Serialize("topic1", &obj)
117122
serde.MaybeFail("serialization", err)
@@ -128,9 +133,10 @@ func TestProtobufSerdeWithSimple(t *testing.T) {
128133

129134
// serialize second object
130135
obj = test.Author{
131-
Name: "Kierkegaard",
132-
Id: 123,
133-
Works: []string{"Fear And Trembling"},
136+
Name: "Kierkegaard",
137+
Id: 123,
138+
Works: []string{"Fear And Trembling"},
139+
PiiOneof: &test.Author_OneofString{OneofString: "oneof"},
134140
}
135141
bytes, err = ser.Serialize("topic1", &obj)
136142
serde.MaybeFail("serialization", err)
@@ -331,10 +337,11 @@ func TestProtobufSerdeWithCELCondition(t *testing.T) {
331337
}
332338

333339
obj := test.Author{
334-
Name: "Kafka",
335-
Id: 123,
336-
Picture: []byte{1, 2},
337-
Works: []string{"The Castle", "The Trial"},
340+
Name: "Kafka",
341+
Id: 123,
342+
Picture: []byte{1, 2},
343+
Works: []string{"The Castle", "The Trial"},
344+
PiiOneof: &test.Author_OneofString{OneofString: "oneof"},
338345
}
339346

340347
bytes, err := ser.Serialize("topic1", &obj)
@@ -391,10 +398,11 @@ func TestProtobufSerdeWithCELConditionFail(t *testing.T) {
391398
}
392399

393400
obj := test.Author{
394-
Name: "Kafka",
395-
Id: 123,
396-
Picture: []byte{1, 2},
397-
Works: []string{"The Castle", "The Trial"},
401+
Name: "Kafka",
402+
Id: 123,
403+
Picture: []byte{1, 2},
404+
Works: []string{"The Castle", "The Trial"},
405+
PiiOneof: &test.Author_OneofString{OneofString: "oneof"},
398406
}
399407

400408
_, err = ser.Serialize("topic1", &obj)
@@ -423,7 +431,7 @@ func TestProtobufSerdeWithCELFieldTransform(t *testing.T) {
423431
Kind: "TRANSFORM",
424432
Mode: "WRITE",
425433
Type: "CEL_FIELD",
426-
Expr: "name == 'name' ; value + '-suffix'",
434+
Expr: "typeName == 'STRING' ; value + '-suffix'",
427435
}
428436
ruleSet := schemaregistry.RuleSet{
429437
DomainRules: []schemaregistry.Rule{encRule},
@@ -442,10 +450,11 @@ func TestProtobufSerdeWithCELFieldTransform(t *testing.T) {
442450
}
443451

444452
obj := test.Author{
445-
Name: "Kafka",
446-
Id: 123,
447-
Picture: []byte{1, 2},
448-
Works: []string{"The Castle", "The Trial"},
453+
Name: "Kafka",
454+
Id: 123,
455+
Picture: []byte{1, 2},
456+
Works: []string{"The Castle", "The Trial"},
457+
PiiOneof: &test.Author_OneofString{OneofString: "oneof"},
449458
}
450459

451460
bytes, err := ser.Serialize("topic1", &obj)
@@ -460,10 +469,11 @@ func TestProtobufSerdeWithCELFieldTransform(t *testing.T) {
460469
serde.MaybeFail("register message", err)
461470

462471
obj2 := test.Author{
463-
Name: "Kafka-suffix",
464-
Id: 123,
465-
Picture: []byte{1, 2},
466-
Works: []string{"The Castle", "The Trial"},
472+
Name: "Kafka-suffix",
473+
Id: 123,
474+
Picture: []byte{1, 2},
475+
Works: []string{"The Castle-suffix", "The Trial-suffix"},
476+
PiiOneof: &test.Author_OneofString{OneofString: "oneof-suffix"},
467477
}
468478

469479
newobj, err := deser.Deserialize("topic1", bytes)
@@ -509,10 +519,11 @@ func TestProtobufSerdeWithCELFieldCondition(t *testing.T) {
509519
}
510520

511521
obj := test.Author{
512-
Name: "Kafka",
513-
Id: 123,
514-
Picture: []byte{1, 2},
515-
Works: []string{"The Castle", "The Trial"},
522+
Name: "Kafka",
523+
Id: 123,
524+
Picture: []byte{1, 2},
525+
Works: []string{"The Castle", "The Trial"},
526+
PiiOneof: &test.Author_OneofString{OneofString: "oneof"},
516527
}
517528

518529
bytes, err := ser.Serialize("topic1", &obj)
@@ -569,10 +580,11 @@ func TestProtobufSerdeWithCELFieldConditionFail(t *testing.T) {
569580
}
570581

571582
obj := test.Author{
572-
Name: "Kafka",
573-
Id: 123,
574-
Picture: []byte{1, 2},
575-
Works: []string{"The Castle", "The Trial"},
583+
Name: "Kafka",
584+
Id: 123,
585+
Picture: []byte{1, 2},
586+
Works: []string{"The Castle", "The Trial"},
587+
PiiOneof: &test.Author_OneofString{OneofString: "oneof"},
576588
}
577589

578590
_, err = ser.Serialize("topic1", &obj)
@@ -629,10 +641,11 @@ func TestProtobufSerdeEncryption(t *testing.T) {
629641
}
630642

631643
obj := test.Author{
632-
Name: "Kafka",
633-
Id: 123,
634-
Picture: []byte{1, 2},
635-
Works: []string{"The Castle", "The Trial"},
644+
Name: "Kafka",
645+
Id: 123,
646+
Picture: []byte{1, 2},
647+
Works: []string{"The Castle", "The Trial"},
648+
PiiOneof: &test.Author_OneofString{OneofString: "oneof"},
636649
}
637650

638651
bytes, err := ser.Serialize("topic1", &obj)

schemaregistry/serde/protobuf/protobuf_util.go

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,6 @@ func transform(ctx serde.RuleContext, descriptor protoreflect.Descriptor, msg in
2929
if msg == nil || descriptor == nil {
3030
return msg, nil
3131
}
32-
v := reflect.ValueOf(msg)
33-
if v.Kind() == reflect.Slice {
34-
var result []interface{}
35-
for i := 0; i < v.Len(); i++ {
36-
newmsg, err := transform(ctx, descriptor, v.Index(i).Interface(), fieldTransform)
37-
if err != nil {
38-
return nil, err
39-
}
40-
result = append(result, newmsg)
41-
}
42-
return result, nil
43-
}
44-
if v.Kind() == reflect.Map {
45-
return msg, nil
46-
}
4732
m, ok := msg.(proto.Message)
4833
if ok {
4934
desc := descriptor.(protoreflect.MessageDescriptor)
@@ -60,9 +45,30 @@ func transform(ctx serde.RuleContext, descriptor protoreflect.Descriptor, msg in
6045
}
6146
fieldCtx := ctx.CurrentField()
6247
if fieldCtx != nil {
48+
desc := descriptor.(protoreflect.MessageDescriptor)
49+
fd := desc.Fields().ByName(protoreflect.Name(fieldCtx.Name))
50+
val := msg.(protoreflect.Value)
51+
52+
// val.Interface() returns a pointer for list and map
53+
if reflect.ValueOf(val.Interface()).Kind() == reflect.Pointer {
54+
if fd.IsList() {
55+
v := val.List()
56+
var result []interface{}
57+
for i := 0; i < v.Len(); i++ {
58+
newmsg, err := transform(ctx, descriptor, v.Get(i), fieldTransform)
59+
if err != nil {
60+
return nil, err
61+
}
62+
result = append(result, newmsg)
63+
}
64+
return result, nil
65+
} else if fd.IsMap() {
66+
return msg, nil
67+
}
68+
}
69+
6370
ruleTags := ctx.Rule.Tags
6471
if (len(ruleTags) == 0) || !disjoint(ruleTags, fieldCtx.Tags) {
65-
val := msg.(protoreflect.Value)
6672
newVal, err := fieldTransform.Transform(ctx, *fieldCtx, val.Interface())
6773
if err != nil {
6874
return nil, err
@@ -75,9 +81,15 @@ func transform(ctx serde.RuleContext, descriptor protoreflect.Descriptor, msg in
7581

7682
func transformField(ctx serde.RuleContext, fd protoreflect.FieldDescriptor, desc protoreflect.MessageDescriptor,
7783
msg interface{}, clone proto.Message, fieldTransform serde.FieldTransform) error {
78-
schemaFd := desc.Fields().ByName(fd.Name())
84+
name := fd.Name()
85+
fullName := fd.FullName()
86+
schemaFd := desc.Fields().ByName(name)
7987
defer ctx.LeaveField()
80-
ctx.EnterField(msg, string(fd.FullName()), string(fd.Name()), getType(fd), getInlineTags(schemaFd))
88+
ctx.EnterField(msg, string(fullName), string(name), getType(fd), getInlineTags(schemaFd))
89+
if fd.ContainingOneof() != nil && !clone.ProtoReflect().Has(fd) {
90+
// skip oneof fields that are not set
91+
return nil
92+
}
8193
value := clone.ProtoReflect().Get(fd)
8294
d := desc
8395
md, ok := desc.(protoreflect.MessageDescriptor)
@@ -89,17 +101,33 @@ func transformField(ctx serde.RuleContext, fd protoreflect.FieldDescriptor, desc
89101
if err != nil {
90102
return err
91103
}
92-
newProtoValue := newValue.(protoreflect.Value)
93104
if ctx.Rule.Kind == "CONDITION" {
94-
i := newProtoValue.Interface()
95-
newBool, ok := i.(bool)
96-
if ok && !newBool {
97-
return serde.RuleConditionErr{
98-
Rule: ctx.Rule,
105+
newProtoValue, ok := newValue.(protoreflect.Value)
106+
if ok {
107+
i := newProtoValue.Interface()
108+
newBool, ok := i.(bool)
109+
if ok && !newBool {
110+
return serde.RuleConditionErr{
111+
Rule: ctx.Rule,
112+
}
99113
}
100114
}
101115
} else {
102-
clone.ProtoReflect().Set(fd, newProtoValue)
116+
newProtoValue, ok := newValue.(protoreflect.Value)
117+
if ok {
118+
clone.ProtoReflect().Set(fd, newProtoValue)
119+
} else {
120+
if fd.IsList() {
121+
newValues := newValue.([]interface{})
122+
list := clone.ProtoReflect().NewField(fd).List()
123+
for i := 0; i < len(newValues); i++ {
124+
list.Append(newValues[i].(protoreflect.Value))
125+
}
126+
clone.ProtoReflect().Set(fd, protoreflect.ValueOfList(list))
127+
} else {
128+
clone.ProtoReflect().Set(fd, protoreflect.ValueOf(newValue))
129+
}
130+
}
103131
}
104132
return nil
105133
}

0 commit comments

Comments
 (0)