Skip to content

Commit 1f31b8e

Browse files
authored
Ensure algorithm query param is passed for CSFLE (#1373)
* Add missing algorithm query param * Add test
1 parent bf51d32 commit 1f31b8e

File tree

3 files changed

+79
-4
lines changed

3 files changed

+79
-4
lines changed

schemaregistry/internal/rest_service.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ const (
6969
Keks = "/dek-registry/v1/keks"
7070
KekByName = Keks + "/%s?deleted=%t"
7171
Deks = Keks + "/%s/deks"
72-
DeksBySubject = Deks + "/%s?deleted=%t"
73-
DeksByVersion = Deks + "/%s/versions/%v?deleted=%t"
72+
DeksBySubject = Deks + "/%s?algorithm=%s&deleted=%t"
73+
DeksByVersion = Deks + "/%s/versions/%v?algorithm=%s&deleted=%t"
7474

7575
TargetSRClusterKey = "Target-Sr-Cluster"
7676
TargetIdentityPoolIDKey = "Confluent-Identity-Pool-Id"

schemaregistry/rules/encryption/deks/dekregistry_client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ func (c *client) GetDek(kekName string, subject string, algorithm string, delete
349349
// another goroutine could have already put it in cache
350350
cacheValue, ok = c.dekCache.Get(cacheKey)
351351
if !ok {
352-
err = c.restService.HandleRequest(internal.NewRequest("GET", internal.DeksBySubject, nil, url.QueryEscape(kekName), url.QueryEscape(subject), deleted), &dek)
352+
err = c.restService.HandleRequest(internal.NewRequest("GET", internal.DeksBySubject, nil, url.QueryEscape(kekName), url.QueryEscape(subject), algorithm, deleted), &dek)
353353
if err == nil {
354354
c.dekCache.Put(cacheKey, &dek)
355355
}
@@ -423,7 +423,7 @@ func (c *client) GetDekVersion(kekName string, subject string, version int, algo
423423
// another goroutine could have already put it in cache
424424
cacheValue, ok = c.dekCache.Get(cacheKey)
425425
if !ok {
426-
err = c.restService.HandleRequest(internal.NewRequest("GET", internal.DeksByVersion, nil, url.QueryEscape(kekName), url.QueryEscape(subject), version, deleted), &dek)
426+
err = c.restService.HandleRequest(internal.NewRequest("GET", internal.DeksByVersion, nil, url.QueryEscape(kekName), url.QueryEscape(subject), version, algorithm, deleted), &dek)
427427
if err == nil {
428428
c.dekCache.Put(cacheKey, &dek)
429429
}

schemaregistry/serde/avrov2/avro_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1351,6 +1351,81 @@ func TestAvroSerdeEncryption(t *testing.T) {
13511351
serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj))
13521352
}
13531353

1354+
func TestAvroSerdeEncryptionDeterministic(t *testing.T) {
1355+
serde.MaybeFail = serde.InitFailFunc(t)
1356+
var err error
1357+
1358+
conf := schemaregistry.NewConfig("mock://")
1359+
1360+
client, err := schemaregistry.NewClient(conf)
1361+
serde.MaybeFail("Schema Registry configuration", err)
1362+
1363+
serConfig := NewSerializerConfig()
1364+
serConfig.AutoRegisterSchemas = false
1365+
serConfig.UseLatestVersion = true
1366+
serConfig.RuleConfig = map[string]string{
1367+
"secret": "mysecret",
1368+
}
1369+
ser, err := NewSerializer(client, serde.ValueSerde, serConfig)
1370+
serde.MaybeFail("Serializer configuration", err)
1371+
1372+
encRule := schemaregistry.Rule{
1373+
Name: "test-encrypt",
1374+
Kind: "TRANSFORM",
1375+
Mode: "WRITEREAD",
1376+
Type: "ENCRYPT",
1377+
Tags: []string{"PII"},
1378+
Params: map[string]string{
1379+
"encrypt.kek.name": "kek1",
1380+
"encrypt.kms.type": "local-kms",
1381+
"encrypt.kms.key.id": "mykey",
1382+
"encrypt.dek.algorithm": "AES256_SIV",
1383+
},
1384+
OnFailure: "ERROR,NONE",
1385+
}
1386+
ruleSet := schemaregistry.RuleSet{
1387+
DomainRules: []schemaregistry.Rule{encRule},
1388+
}
1389+
1390+
info := schemaregistry.SchemaInfo{
1391+
Schema: demoSchema,
1392+
SchemaType: "AVRO",
1393+
RuleSet: &ruleSet,
1394+
}
1395+
1396+
id, err := client.Register("topic1-value", info, false)
1397+
serde.MaybeFail("Schema registration", err)
1398+
if id <= 0 {
1399+
t.Errorf("Expected valid schema id, found %d", id)
1400+
}
1401+
1402+
obj := DemoSchema{}
1403+
obj.IntField = 123
1404+
obj.DoubleField = 45.67
1405+
obj.StringField = "hi"
1406+
obj.BoolField = true
1407+
obj.BytesField = []byte{1, 2}
1408+
1409+
bytes, err := ser.Serialize("topic1", &obj)
1410+
serde.MaybeFail("serialization", err)
1411+
1412+
// Reset encrypted field
1413+
obj.StringField = "hi"
1414+
obj.BytesField = []byte{1, 2}
1415+
1416+
deserConfig := NewDeserializerConfig()
1417+
deserConfig.RuleConfig = map[string]string{
1418+
"secret": "mysecret",
1419+
}
1420+
deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig)
1421+
serde.MaybeFail("Deserializer configuration", err)
1422+
deser.Client = ser.Client
1423+
deser.MessageFactory = testMessageFactory
1424+
1425+
newobj, err := deser.Deserialize("topic1", bytes)
1426+
serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj))
1427+
}
1428+
13541429
func TestAvroSerdeEncryptionWithSimpleMap(t *testing.T) {
13551430
serde.MaybeFail = serde.InitFailFunc(t)
13561431
var err error

0 commit comments

Comments
 (0)