From ae689b578471b1bc68ff39fd6752b44a90261d1f Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Thu, 15 Jan 2026 14:56:23 +0530 Subject: [PATCH 1/8] Flat collections sub-doc updates SET --- .../FlatCollectionWriteTest.java | 264 ++++++++++++++++++ .../query/pg_flat_collection_insert.json | 20 +- .../postgres/FlatPostgresCollection.java | 179 +++++++++++- .../postgres/update/FlatUpdateContext.java | 43 +++ .../update/parser/FlatSetOperatorParser.java | 70 +++++ .../parser/FlatUpdateOperatorParser.java | 26 ++ 6 files changed, 590 insertions(+), 12 deletions(-) create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/FlatUpdateContext.java create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatSetOperatorParser.java create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatUpdateOperatorParser.java diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 1595799c..43d56ee0 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -1,7 +1,10 @@ package org.hypertrace.core.documentstore; import static org.hypertrace.core.documentstore.utils.Utils.readFileFromResource; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -10,11 +13,22 @@ import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import org.hypertrace.core.documentstore.expression.impl.ConstantExpression; +import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression; +import org.hypertrace.core.documentstore.expression.impl.RelationalExpression; +import org.hypertrace.core.documentstore.expression.operators.RelationalOperator; +import org.hypertrace.core.documentstore.model.options.ReturnDocumentType; +import org.hypertrace.core.documentstore.model.options.UpdateOptions; +import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; +import org.hypertrace.core.documentstore.model.subdoc.UpdateOperator; import org.hypertrace.core.documentstore.postgres.PostgresDatastore; +import org.hypertrace.core.documentstore.query.Query; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -322,6 +336,256 @@ void testUpdateWithCondition() { } } + @Nested + @DisplayName("SubDocument Update Operations") + class SubDocUpdateTests { + + @Nested + @DisplayName("SET Operator Tests") + class SetOperatorTests { + + @Test + @DisplayName("Should update top-level column with SET operator") + void testUpdateTopLevelColumn() throws Exception { + // Update the price of item with id = 1 + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + List updates = List.of(SubDocumentUpdate.of("price", 999)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + Optional result = flatCollection.update(query, updates, options); + + assertTrue(result.isPresent()); + JsonNode resultJson = OBJECT_MAPPER.readTree(result.get().toJson()); + assertEquals(999, resultJson.get("price").asInt()); + + // Verify in database + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"price\" FROM \"%s\" WHERE \"id\" = '1'", FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals(999, rs.getInt("price")); + } + } + + @Test + @DisplayName("Should update multiple top-level columns in single update") + void testUpdateMultipleColumns() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("2"))) + .build(); + + List updates = + List.of(SubDocumentUpdate.of("price", 555), SubDocumentUpdate.of("quantity", 100)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + Optional result = flatCollection.update(query, updates, options); + + assertTrue(result.isPresent()); + JsonNode resultJson = OBJECT_MAPPER.readTree(result.get().toJson()); + assertEquals(555, resultJson.get("price").asInt()); + assertEquals(100, resultJson.get("quantity").asInt()); + } + + @Test + @DisplayName("Should update nested path in JSONB column") + void testUpdateNestedJsonbPath() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("3"))) + .build(); + + // Update props.brand nested path + List updates = + List.of(SubDocumentUpdate.of("props.brand", "UpdatedBrand")); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + Optional result = flatCollection.update(query, updates, options); + + assertTrue(result.isPresent()); + JsonNode resultJson = OBJECT_MAPPER.readTree(result.get().toJson()); + assertNotNull(resultJson.get("props")); + assertEquals("UpdatedBrand", resultJson.get("props").get("brand").asText()); + } + + @Test + @DisplayName("Should return BEFORE_UPDATE document") + void testUpdateReturnsBeforeDocument() throws Exception { + // First get the current price + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("4"))) + .build(); + + List updates = List.of(SubDocumentUpdate.of("price", 777)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.BEFORE_UPDATE).build(); + + Optional result = flatCollection.update(query, updates, options); + + assertTrue(result.isPresent()); + JsonNode resultJson = OBJECT_MAPPER.readTree(result.get().toJson()); + // Should return the old price (5 from initial data), not the new one (777) + assertEquals(5, resultJson.get("price").asInt()); + + // But database should have the new value + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"price\" FROM \"%s\" WHERE \"id\" = '4'", FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals(777, rs.getInt("price")); + } + } + } + + @Test + @DisplayName("Should return empty when no document matches query") + void testUpdateNoMatch() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("9999"))) + .build(); + + List updates = List.of(SubDocumentUpdate.of("price", 100)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + Optional result = flatCollection.update(query, updates, options); + + assertTrue(result.isEmpty()); + } + + @Test + @DisplayName("Should throw IOException when column does not exist") + void testUpdateNonExistentColumn() { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("_id"), + RelationalOperator.EQ, + ConstantExpression.of(1))) + .build(); + + List updates = + List.of(SubDocumentUpdate.of("nonexistent_column", "value")); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + assertThrows(IOException.class, () -> flatCollection.update(query, updates, options)); + } + + @Test + @DisplayName("Should throw IOException when nested path on non-JSONB column") + void testUpdateNestedPathOnNonJsonbColumn() { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("_id"), + RelationalOperator.EQ, + ConstantExpression.of(1))) + .build(); + + // "item" is TEXT, not JSONB - nested path should fail + List updates = List.of(SubDocumentUpdate.of("item.nested", "value")); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + assertThrows(IOException.class, () -> flatCollection.update(query, updates, options)); + } + + @Test + @DisplayName("Should throw IOException for unsupported operator") + void testUpdateUnsupportedOperator() { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("_id"), + RelationalOperator.EQ, + ConstantExpression.of(1))) + .build(); + + // UNSET is not supported yet + List updates = + List.of( + SubDocumentUpdate.builder() + .subDocument("price") + .operator(UpdateOperator.UNSET) + .build()); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + assertThrows(IOException.class, () -> flatCollection.update(query, updates, options)); + } + + @Test + @DisplayName("Should throw UnsupportedOperationException for bulkUpdate") + void testBulkUpdate() { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("price"), + RelationalOperator.GT, + ConstantExpression.of(5))) + .build(); + + List updates = List.of(SubDocumentUpdate.of("price", 100)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + assertThrows( + UnsupportedOperationException.class, + () -> flatCollection.bulkUpdate(query, updates, options)); + } + } + @Nested @DisplayName("Drop Operations") class DropTests { diff --git a/document-store/src/integrationTest/resources/query/pg_flat_collection_insert.json b/document-store/src/integrationTest/resources/query/pg_flat_collection_insert.json index 2ca68949..895a951c 100644 --- a/document-store/src/integrationTest/resources/query/pg_flat_collection_insert.json +++ b/document-store/src/integrationTest/resources/query/pg_flat_collection_insert.json @@ -1,14 +1,14 @@ { "statements": [ - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n1, 'Soap', 10, 2, '2014-03-01T08:00:00Z', true,\n'{\"hygiene\", \"personal-care\", \"premium\"}',\n'{\"Hygiene\", \"PersonalCare\"}',\n'{\"colors\": [\"Blue\", \"Green\"], \"brand\": \"Dettol\", \"size\": \"M\", \"product-code\": \"SOAP-DET-001\", \"source-loc\": [\"warehouse-A\", \"store-1\"], \"seller\": {\"name\": \"Metro Chemicals Pvt. Ltd.\", \"address\": {\"city\": \"Mumbai\", \"pincode\": 400004}}}',\nNULL,\n'{1, 2, 3}',\n'{4.5, 9.2}',\n'{true, false}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n2, 'Mirror', 20, 1, '2014-03-01T09:00:00Z', true,\n'{\"home-decor\", \"reflective\", \"glass\"}',\n'{\"HomeDecor\"}',\nNULL,\nNULL,\n'{10, 20}',\n'{1.5, 2.5, 3.5}',\n'{false, false}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n3, 'Shampoo', 5, 10, '2014-03-15T09:00:00Z', true,\n'{\"hair-care\", \"personal-care\", \"premium\", \"herbal\"}',\n'{\"HairCare\", \"PersonalCare\"}',\n'{\"colors\": [\"Black\"], \"brand\": \"Sunsilk\", \"size\": \"L\", \"product-code\": \"SHAMP-SUN-003\", \"source-loc\": [\"warehouse-B\", \"store-2\", \"online\"], \"seller\": {\"name\": \"Metro Chemicals Pvt. Ltd.\", \"address\": {\"city\": \"Mumbai\", \"pincode\": 400004}}}',\nNULL,\n'{5, 10, 15}',\n'{3.14, 2.71}',\n'{true, false, true}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n4, 'Shampoo', 5, 20, '2014-04-04T11:21:39.736Z', false,\n'{\"hair-care\", \"budget\", \"bulk\"}',\n'{\"HairCare\"}',\nNULL,\nNULL,\n'{1, 2}',\n'{5.0, 10.0}',\n'{true, true}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n5, 'Soap', 20, 5, '2014-04-04T21:23:13.331Z', true,\n'{\"hygiene\", \"antibacterial\", \"family-pack\"}',\n'{\"Hygiene\"}',\n'{\"colors\": [\"Orange\", \"Blue\"], \"brand\": \"Lifebuoy\", \"size\": \"S\", \"product-code\": \"SOAP-LIF-005\", \"source-loc\": [\"warehouse-C\"], \"seller\": {\"name\": \"Hans and Co.\", \"address\": {\"city\": \"Kolkata\", \"pincode\": 700007}}}',\nNULL,\n'{3, 6, 9}',\n'{7.5}',\n'{false}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n6, 'Comb', 7.5, 5, '2015-06-04T05:08:13Z', true,\n'{\"grooming\", \"plastic\", \"essential\"}',\n'{\"Grooming\"}',\nNULL,\nNULL,\n'{20, 30}',\n'{6.0, 8.0}',\n'{true, false}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n7, 'Comb', 7.5, 10, '2015-09-10T08:43:00Z', false,\n'{\"grooming\", \"bulk\", \"wholesale\"}',\n'{\"Grooming\"}',\n'{\"colors\": [], \"product-code\": null, \"source-loc\": null, \"seller\": {\"name\": \"Go Go Plastics\", \"address\": {\"city\": \"Kolkata\", \"pincode\": 700007}}}',\nNULL,\n'{10}',\n'{3.0}',\n'{false, false, false}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n8, 'Soap', 10, 5, '2016-02-06T20:20:13Z', true,\n'{\"hygiene\", \"budget\", \"basic\"}',\n'{\"Hygiene\"}',\nNULL,\nNULL,\n'{1, 10, 20}',\n'{2.5, 5.0}',\n'{true}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n9, 'Bottle', 15, 3, '2016-03-01T10:00:00Z', false,\nNULL,\nNULL,\nNULL,\nNULL,\nNULL,\nNULL,\nNULL\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n10, 'Cup', 8, 2, '2016-04-01T10:00:00Z', true,\n'{}',\n'{}',\nNULL,\nNULL,\nNULL,\nNULL,\nNULL\n)" + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'1', 'Soap', 10, 2, '2014-03-01T08:00:00Z', true,\n'{\"hygiene\", \"personal-care\", \"premium\"}',\n'{\"Hygiene\", \"PersonalCare\"}',\n'{\"colors\": [\"Blue\", \"Green\"], \"brand\": \"Dettol\", \"size\": \"M\", \"product-code\": \"SOAP-DET-001\", \"source-loc\": [\"warehouse-A\", \"store-1\"], \"seller\": {\"name\": \"Metro Chemicals Pvt. Ltd.\", \"address\": {\"city\": \"Mumbai\", \"pincode\": 400004}}}',\nNULL,\n'{1, 2, 3}',\n'{4.5, 9.2}',\n'{true, false}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'2', 'Mirror', 20, 1, '2014-03-01T09:00:00Z', true,\n'{\"home-decor\", \"reflective\", \"glass\"}',\n'{\"HomeDecor\"}',\nNULL,\nNULL,\n'{10, 20}',\n'{1.5, 2.5, 3.5}',\n'{false, false}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'3', 'Shampoo', 5, 10, '2014-03-15T09:00:00Z', true,\n'{\"hair-care\", \"personal-care\", \"premium\", \"herbal\"}',\n'{\"HairCare\", \"PersonalCare\"}',\n'{\"colors\": [\"Black\"], \"brand\": \"Sunsilk\", \"size\": \"L\", \"product-code\": \"SHAMP-SUN-003\", \"source-loc\": [\"warehouse-B\", \"store-2\", \"online\"], \"seller\": {\"name\": \"Metro Chemicals Pvt. Ltd.\", \"address\": {\"city\": \"Mumbai\", \"pincode\": 400004}}}',\nNULL,\n'{5, 10, 15}',\n'{3.14, 2.71}',\n'{true, false, true}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'4', 'Shampoo', 5, 20, '2014-04-04T11:21:39.736Z', false,\n'{\"hair-care\", \"budget\", \"bulk\"}',\n'{\"HairCare\"}',\nNULL,\nNULL,\n'{1, 2}',\n'{5.0, 10.0}',\n'{true, true}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'5', 'Soap', 20, 5, '2014-04-04T21:23:13.331Z', true,\n'{\"hygiene\", \"antibacterial\", \"family-pack\"}',\n'{\"Hygiene\"}',\n'{\"colors\": [\"Orange\", \"Blue\"], \"brand\": \"Lifebuoy\", \"size\": \"S\", \"product-code\": \"SOAP-LIF-005\", \"source-loc\": [\"warehouse-C\"], \"seller\": {\"name\": \"Hans and Co.\", \"address\": {\"city\": \"Kolkata\", \"pincode\": 700007}}}',\nNULL,\n'{3, 6, 9}',\n'{7.5}',\n'{false}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'6', 'Comb', 7.5, 5, '2015-06-04T05:08:13Z', true,\n'{\"grooming\", \"plastic\", \"essential\"}',\n'{\"Grooming\"}',\nNULL,\nNULL,\n'{20, 30}',\n'{6.0, 8.0}',\n'{true, false}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'7', 'Comb', 7.5, 10, '2015-09-10T08:43:00Z', false,\n'{\"grooming\", \"bulk\", \"wholesale\"}',\n'{\"Grooming\"}',\n'{\"colors\": [], \"product-code\": null, \"source-loc\": null, \"seller\": {\"name\": \"Go Go Plastics\", \"address\": {\"city\": \"Kolkata\", \"pincode\": 700007}}}',\nNULL,\n'{10}',\n'{3.0}',\n'{false, false, false}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'8', 'Soap', 10, 5, '2016-02-06T20:20:13Z', true,\n'{\"hygiene\", \"budget\", \"basic\"}',\n'{\"Hygiene\"}',\nNULL,\nNULL,\n'{1, 10, 20}',\n'{2.5, 5.0}',\n'{true}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'9', 'Bottle', 15, 3, '2016-03-01T10:00:00Z', false,\nNULL,\nNULL,\nNULL,\nNULL,\nNULL,\nNULL,\nNULL\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'10', 'Cup', 8, 2, '2016-04-01T10:00:00Z', true,\n'{}',\n'{}',\nNULL,\nNULL,\nNULL,\nNULL,\nNULL\n)" ] } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 747b63bd..e4ead453 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -1,6 +1,17 @@ package org.hypertrace.core.documentstore.postgres; +import static org.hypertrace.core.documentstore.model.options.ReturnDocumentType.AFTER_UPDATE; +import static org.hypertrace.core.documentstore.model.options.ReturnDocumentType.BEFORE_UPDATE; +import static org.hypertrace.core.documentstore.model.options.ReturnDocumentType.NONE; +import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.SET; + import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -12,14 +23,22 @@ import org.hypertrace.core.documentstore.CloseableIterator; import org.hypertrace.core.documentstore.CreateResult; import org.hypertrace.core.documentstore.Document; +import org.hypertrace.core.documentstore.DocumentType; import org.hypertrace.core.documentstore.Filter; import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.UpdateResult; import org.hypertrace.core.documentstore.model.options.QueryOptions; +import org.hypertrace.core.documentstore.model.options.ReturnDocumentType; import org.hypertrace.core.documentstore.model.options.UpdateOptions; import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; +import org.hypertrace.core.documentstore.model.subdoc.UpdateOperator; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; import org.hypertrace.core.documentstore.postgres.query.v1.transformer.FlatPostgresFieldTransformer; +import org.hypertrace.core.documentstore.postgres.update.FlatUpdateContext; +import org.hypertrace.core.documentstore.postgres.update.parser.FlatSetOperatorParser; +import org.hypertrace.core.documentstore.postgres.update.parser.FlatUpdateOperatorParser; import org.hypertrace.core.documentstore.query.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +56,9 @@ public class FlatPostgresCollection extends PostgresCollection { private static final String WRITE_NOT_SUPPORTED = "Write operations are not supported for flat collections yet!"; + private static final Map OPERATOR_PARSERS = + Map.of(SET, new FlatSetOperatorParser()); + private final PostgresLazyilyLoadedSchemaRegistry schemaRegistry; FlatPostgresCollection( @@ -174,10 +196,163 @@ public UpdateResult update(Key key, Document document, Filter condition) throws @Override public Optional update( org.hypertrace.core.documentstore.query.Query query, - java.util.Collection updates, + Collection updates, UpdateOptions updateOptions) throws IOException { - throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); + + if (updates == null || updates.isEmpty()) { + throw new IOException("Updates collection cannot be null or empty"); + } + + String tableName = tableIdentifier.getTableName(); + + try (Connection connection = client.getTransactionalConnection()) { + try { + // 1. Validate all columns exist and operators are supported + validateUpdates(updates, tableName); + + // 2. Get before-document if needed + Optional beforeDoc = Optional.empty(); + ReturnDocumentType returnType = updateOptions.getReturnDocumentType(); + if (returnType == BEFORE_UPDATE || returnType == AFTER_UPDATE) { + beforeDoc = selectFirstDocument(connection, query); + } + + if (beforeDoc.isEmpty() && returnType != NONE) { + connection.commit(); + return Optional.empty(); + } + + // 3. Build and execute UPDATE + executeUpdate(connection, query, updates, tableName); + + // 4. Resolve return document based on options + Document returnDoc = null; + if (returnType == BEFORE_UPDATE) { + returnDoc = beforeDoc.orElse(null); + } else if (returnType == AFTER_UPDATE) { + returnDoc = selectFirstDocument(connection, query).orElse(null); + } + + connection.commit(); + return Optional.ofNullable(returnDoc); + + } catch (Exception e) { + connection.rollback(); + throw e; + } + } catch (SQLException e) { + LOGGER.error("SQLException during update operation", e); + throw new IOException(e); + } + } + + private void validateUpdates(Collection updates, String tableName) + throws IOException { + for (SubDocumentUpdate update : updates) { + UpdateOperator operator = update.getOperator(); + + // Check operator is supported + if (!OPERATOR_PARSERS.containsKey(operator)) { + throw new IOException("Unsupported update operator: " + operator); + } + + // Check column exists + String path = update.getSubDocument().getPath(); + String rootColumn = path.contains(".") ? path.split("\\.")[0] : path; + + Optional colMeta = + schemaRegistry.getColumnOrRefresh(tableName, rootColumn); + + if (colMeta.isEmpty()) { + throw new IOException("Column not found in schema: " + rootColumn); + } + + // For nested paths, root column must be JSONB + if (path.contains(".") && colMeta.get().getPostgresType() != PostgresDataType.JSONB) { + throw new IOException( + "Nested path updates require JSONB column, but column '" + + rootColumn + + "' is of type: " + + colMeta.get().getPostgresType()); + } + } + } + + private Optional selectFirstDocument(Connection connection, Query query) + throws SQLException, IOException { + PostgresQueryParser parser = createParser(query); + String selectQuery = parser.buildSelectQueryForUpdate(); + + try (PreparedStatement ps = + queryExecutor.buildPreparedStatement( + selectQuery, parser.getParamsBuilder().build(), connection)) { + return getFirstDocumentForFlat(ps.executeQuery()); + } + } + + private Optional getFirstDocumentForFlat(ResultSet resultSet) throws IOException { + CloseableIterator iterator = + new PostgresResultIteratorWithBasicTypes(resultSet, DocumentType.FLAT); + return getFirstDocument(iterator); + } + + private void executeUpdate( + Connection connection, Query query, Collection updates, String tableName) + throws SQLException { + + // Build WHERE clause + PostgresQueryParser filterParser = createParser(query); + String filterClause = filterParser.buildFilterClause(); + Params filterParams = filterParser.getParamsBuilder().build(); + + // Build SET clause fragments + List setFragments = new ArrayList<>(); + List params = new ArrayList<>(); + + for (SubDocumentUpdate update : updates) { + String path = update.getSubDocument().getPath(); + String rootColumn = path.contains(".") ? path.split("\\.")[0] : path; + String[] nestedPath = + path.contains(".") ? path.substring(path.indexOf(".") + 1).split("\\.") : new String[0]; + + PostgresColumnMetadata colMeta = + schemaRegistry.getColumnOrRefresh(tableName, rootColumn).orElseThrow(); + + FlatUpdateContext context = + FlatUpdateContext.builder() + .columnName(rootColumn) + .nestedPath(nestedPath) + .columnType(colMeta.getPostgresType()) + .value(update.getSubDocumentValue()) + .params(params) + .build(); + + FlatUpdateOperatorParser operatorParser = OPERATOR_PARSERS.get(update.getOperator()); + String fragment = operatorParser.parse(context); + setFragments.add(fragment); + } + + // Build final UPDATE SQL + String sql = + String.format( + "UPDATE %s SET %s %s", tableIdentifier, String.join(", ", setFragments), filterClause); + + LOGGER.debug("Executing update SQL: {}", sql); + + try (PreparedStatement ps = connection.prepareStatement(sql)) { + int idx = 1; + // Add SET clause params + for (Object param : params) { + ps.setObject(idx++, param); + } + // Add WHERE clause params + for (Object param : filterParams.getObjectParams().values()) { + ps.setObject(idx++, param); + } + int rowsUpdated = ps.executeUpdate(); + LOGGER.debug("Rows updated: {}", rowsUpdated); + } } @Override diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/FlatUpdateContext.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/FlatUpdateContext.java new file mode 100644 index 00000000..5537c974 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/FlatUpdateContext.java @@ -0,0 +1,43 @@ +package org.hypertrace.core.documentstore.postgres.update; + +import java.util.List; +import lombok.Builder; +import lombok.Value; +import org.hypertrace.core.documentstore.model.subdoc.SubDocumentValue; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; + +/** + * Context object containing all information needed to generate SQL for a single field update in + * flat collections. + */ +@Value +@Builder +public class FlatUpdateContext { + /** The column name in the database (e.g., "price", "props") */ + String columnName; + + /** + * The nested path within a JSONB column, empty array for top-level columns. For example, for + * "props.seller.name", columnName would be "props" and nestedPath would be ["seller", "name"]. + */ + String[] nestedPath; + + /** The PostgreSQL data type of the column */ + PostgresDataType columnType; + + /** The value to set/update */ + SubDocumentValue value; + + /** Accumulator for prepared statement parameters (mutable) */ + List params; + + /** Returns true if this is a top-level column update (no nested path) */ + public boolean isTopLevel() { + return nestedPath == null || nestedPath.length == 0; + } + + /** Returns true if the column is a JSONB type */ + public boolean isJsonbColumn() { + return columnType == PostgresDataType.JSONB; + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatSetOperatorParser.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatSetOperatorParser.java new file mode 100644 index 00000000..09e06d53 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatSetOperatorParser.java @@ -0,0 +1,70 @@ +package org.hypertrace.core.documentstore.postgres.update.parser; + +import org.hypertrace.core.documentstore.model.subdoc.MultiValuedPrimitiveSubDocumentValue; +import org.hypertrace.core.documentstore.model.subdoc.NestedSubDocumentValue; +import org.hypertrace.core.documentstore.model.subdoc.PrimitiveSubDocumentValue; +import org.hypertrace.core.documentstore.model.subdoc.SubDocumentValue; +import org.hypertrace.core.documentstore.postgres.update.FlatUpdateContext; + +/** + * Parser for the SET operator in flat collections. + * + *

Handles two cases: + * + *

    + *
  • Top-level columns: {@code SET "column" = ?} + *
  • Nested JSONB paths: {@code SET "column" = jsonb_set(COALESCE("column", '{}'), '{path}', + * to_jsonb(?))} + *
+ */ +public class FlatSetOperatorParser implements FlatUpdateOperatorParser { + + @Override + public String parse(FlatUpdateContext context) { + if (context.isTopLevel()) { + return parseTopLevel(context); + } else { + return parseNestedJsonb(context); + } + } + + private String parseTopLevel(FlatUpdateContext context) { + context.getParams().add(extractValue(context.getValue())); + return String.format("\"%s\" = ?", context.getColumnName()); + } + + private String parseNestedJsonb(FlatUpdateContext context) { + String jsonPath = buildJsonPath(context.getNestedPath()); + Object value = extractValue(context.getValue()); + + context.getParams().add(jsonPath); + context.getParams().add(value); + + // Use jsonb_set with COALESCE to handle null columns + // to_jsonb(?) converts the value to proper JSONB format + return String.format( + "\"%s\" = jsonb_set(COALESCE(\"%s\", '{}'), ?::text[], to_jsonb(?))", + context.getColumnName(), context.getColumnName()); + } + + /** + * Builds a PostgreSQL text array path from nested path components. For example, ["seller", + * "name"] becomes "{seller,name}" + */ + private String buildJsonPath(String[] nestedPath) { + return "{" + String.join(",", nestedPath) + "}"; + } + + /** Extracts the raw value from SubDocumentValue for use in prepared statements. */ + private Object extractValue(SubDocumentValue value) { + if (value instanceof PrimitiveSubDocumentValue) { + return ((PrimitiveSubDocumentValue) value).getValue(); + } else if (value instanceof MultiValuedPrimitiveSubDocumentValue) { + return ((MultiValuedPrimitiveSubDocumentValue) value).getValues(); + } else if (value instanceof NestedSubDocumentValue) { + return ((NestedSubDocumentValue) value).getJsonValue(); + } + throw new UnsupportedOperationException( + "Unsupported SubDocumentValue type: " + value.getClass().getSimpleName()); + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatUpdateOperatorParser.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatUpdateOperatorParser.java new file mode 100644 index 00000000..be4d3e24 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatUpdateOperatorParser.java @@ -0,0 +1,26 @@ +package org.hypertrace.core.documentstore.postgres.update.parser; + +import org.hypertrace.core.documentstore.postgres.update.FlatUpdateContext; + +/** + * Parser interface for converting SubDocumentUpdate operations to SQL fragments for flat + * collections. + * + *

Each implementation handles a specific {@link + * org.hypertrace.core.documentstore.model.subdoc.UpdateOperator} and generates the appropriate SQL + * SET clause fragment. + */ +public interface FlatUpdateOperatorParser { + + /** + * Generates SQL SET clause fragment for this operator. + * + *

For top-level columns, this typically produces: {@code "column" = ?} + * + *

For nested JSONB paths, this produces: {@code "column" = jsonb_set(...)} + * + * @param context The update context containing column info, value, and parameter accumulator + * @return SQL fragment to be used in SET clause + */ + String parse(FlatUpdateContext context); +} From a32b3814393a4fd3d7205ea2562d83bdd40bbcab Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Fri, 16 Jan 2026 12:26:49 +0530 Subject: [PATCH 2/8] WIP --- .../update/parser/FlatSetOperatorParser.java | 50 +++++++++++++------ 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatSetOperatorParser.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatSetOperatorParser.java index 09e06d53..3558ca2d 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatSetOperatorParser.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatSetOperatorParser.java @@ -1,9 +1,11 @@ package org.hypertrace.core.documentstore.postgres.update.parser; +import org.hypertrace.core.documentstore.model.subdoc.MultiValuedNestedSubDocumentValue; import org.hypertrace.core.documentstore.model.subdoc.MultiValuedPrimitiveSubDocumentValue; import org.hypertrace.core.documentstore.model.subdoc.NestedSubDocumentValue; +import org.hypertrace.core.documentstore.model.subdoc.NullSubDocumentValue; import org.hypertrace.core.documentstore.model.subdoc.PrimitiveSubDocumentValue; -import org.hypertrace.core.documentstore.model.subdoc.SubDocumentValue; +import org.hypertrace.core.documentstore.model.subdoc.visitor.SubDocumentValueVisitor; import org.hypertrace.core.documentstore.postgres.update.FlatUpdateContext; /** @@ -19,6 +21,35 @@ */ public class FlatSetOperatorParser implements FlatUpdateOperatorParser { + /** Visitor to extract raw values from SubDocumentValue for use in prepared statements. */ + private static final SubDocumentValueVisitor VALUE_EXTRACTOR = + new SubDocumentValueVisitor<>() { + @Override + public Object visit(PrimitiveSubDocumentValue value) { + return value.getValue(); + } + + @Override + public Object visit(MultiValuedPrimitiveSubDocumentValue value) { + return value.getValues(); + } + + @Override + public Object visit(NestedSubDocumentValue value) { + return value.getJsonValue(); + } + + @Override + public Object visit(MultiValuedNestedSubDocumentValue value) { + return value.getJsonValues(); + } + + @Override + public Object visit(NullSubDocumentValue value) { + return null; + } + }; + @Override public String parse(FlatUpdateContext context) { if (context.isTopLevel()) { @@ -29,13 +60,13 @@ public String parse(FlatUpdateContext context) { } private String parseTopLevel(FlatUpdateContext context) { - context.getParams().add(extractValue(context.getValue())); + context.getParams().add(context.getValue().accept(VALUE_EXTRACTOR)); return String.format("\"%s\" = ?", context.getColumnName()); } private String parseNestedJsonb(FlatUpdateContext context) { String jsonPath = buildJsonPath(context.getNestedPath()); - Object value = extractValue(context.getValue()); + Object value = context.getValue().accept(VALUE_EXTRACTOR); context.getParams().add(jsonPath); context.getParams().add(value); @@ -54,17 +85,4 @@ private String parseNestedJsonb(FlatUpdateContext context) { private String buildJsonPath(String[] nestedPath) { return "{" + String.join(",", nestedPath) + "}"; } - - /** Extracts the raw value from SubDocumentValue for use in prepared statements. */ - private Object extractValue(SubDocumentValue value) { - if (value instanceof PrimitiveSubDocumentValue) { - return ((PrimitiveSubDocumentValue) value).getValue(); - } else if (value instanceof MultiValuedPrimitiveSubDocumentValue) { - return ((MultiValuedPrimitiveSubDocumentValue) value).getValues(); - } else if (value instanceof NestedSubDocumentValue) { - return ((NestedSubDocumentValue) value).getJsonValue(); - } - throw new UnsupportedOperationException( - "Unsupported SubDocumentValue type: " + value.getClass().getSimpleName()); - } } From 561eb6e06e36e0cd5bfd232c0e0a9ed6481c5b04 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 19 Jan 2026 12:30:30 +0530 Subject: [PATCH 3/8] WIP --- .../postgres/FlatPostgresCollection.java | 24 +++++++++---------- ...latCollectionSubDocSetOperatorParser.java} | 3 ++- ...CollectionSubDocUpdateOperatorParser.java} | 2 +- 3 files changed, 14 insertions(+), 15 deletions(-) rename document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/{FlatSetOperatorParser.java => FlatCollectionSubDocSetOperatorParser.java} (96%) rename document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/{FlatUpdateOperatorParser.java => FlatCollectionSubDocUpdateOperatorParser.java} (93%) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index e4ead453..aaf2c9ed 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -37,8 +37,8 @@ import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; import org.hypertrace.core.documentstore.postgres.query.v1.transformer.FlatPostgresFieldTransformer; import org.hypertrace.core.documentstore.postgres.update.FlatUpdateContext; -import org.hypertrace.core.documentstore.postgres.update.parser.FlatSetOperatorParser; -import org.hypertrace.core.documentstore.postgres.update.parser.FlatUpdateOperatorParser; +import org.hypertrace.core.documentstore.postgres.update.parser.FlatCollectionSubDocSetOperatorParser; +import org.hypertrace.core.documentstore.postgres.update.parser.FlatCollectionSubDocUpdateOperatorParser; import org.hypertrace.core.documentstore.query.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,8 +56,8 @@ public class FlatPostgresCollection extends PostgresCollection { private static final String WRITE_NOT_SUPPORTED = "Write operations are not supported for flat collections yet!"; - private static final Map OPERATOR_PARSERS = - Map.of(SET, new FlatSetOperatorParser()); + private static final Map OPERATOR_PARSERS = + Map.of(SET, new FlatCollectionSubDocSetOperatorParser()); private final PostgresLazyilyLoadedSchemaRegistry schemaRegistry; @@ -211,16 +211,15 @@ public Optional update( // 1. Validate all columns exist and operators are supported validateUpdates(updates, tableName); - // 2. Get before-document if needed + // 2. Get before-document if needed (only for BEFORE_UPDATE) Optional beforeDoc = Optional.empty(); ReturnDocumentType returnType = updateOptions.getReturnDocumentType(); - if (returnType == BEFORE_UPDATE || returnType == AFTER_UPDATE) { + if (returnType == BEFORE_UPDATE) { beforeDoc = selectFirstDocument(connection, query); - } - - if (beforeDoc.isEmpty() && returnType != NONE) { - connection.commit(); - return Optional.empty(); + if (beforeDoc.isEmpty()) { + connection.commit(); + return Optional.empty(); + } } // 3. Build and execute UPDATE @@ -252,7 +251,6 @@ private void validateUpdates(Collection updates, String table for (SubDocumentUpdate update : updates) { UpdateOperator operator = update.getOperator(); - // Check operator is supported if (!OPERATOR_PARSERS.containsKey(operator)) { throw new IOException("Unsupported update operator: " + operator); } @@ -328,7 +326,7 @@ private void executeUpdate( .params(params) .build(); - FlatUpdateOperatorParser operatorParser = OPERATOR_PARSERS.get(update.getOperator()); + FlatCollectionSubDocUpdateOperatorParser operatorParser = OPERATOR_PARSERS.get(update.getOperator()); String fragment = operatorParser.parse(context); setFragments.add(fragment); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatSetOperatorParser.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatCollectionSubDocSetOperatorParser.java similarity index 96% rename from document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatSetOperatorParser.java rename to document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatCollectionSubDocSetOperatorParser.java index 3558ca2d..08d9eb7d 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatSetOperatorParser.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatCollectionSubDocSetOperatorParser.java @@ -19,7 +19,8 @@ * to_jsonb(?))} * */ -public class FlatSetOperatorParser implements FlatUpdateOperatorParser { +public class FlatCollectionSubDocSetOperatorParser implements + FlatCollectionSubDocUpdateOperatorParser { /** Visitor to extract raw values from SubDocumentValue for use in prepared statements. */ private static final SubDocumentValueVisitor VALUE_EXTRACTOR = diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatUpdateOperatorParser.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatCollectionSubDocUpdateOperatorParser.java similarity index 93% rename from document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatUpdateOperatorParser.java rename to document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatCollectionSubDocUpdateOperatorParser.java index be4d3e24..38ef44ef 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatUpdateOperatorParser.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatCollectionSubDocUpdateOperatorParser.java @@ -10,7 +10,7 @@ * org.hypertrace.core.documentstore.model.subdoc.UpdateOperator} and generates the appropriate SQL * SET clause fragment. */ -public interface FlatUpdateOperatorParser { +public interface FlatCollectionSubDocUpdateOperatorParser { /** * Generates SQL SET clause fragment for this operator. From 829960d83e50aad2e3647865646989c0e1ffbaa6 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Tue, 20 Jan 2026 12:19:49 +0530 Subject: [PATCH 4/8] WIP --- .../FlatCollectionWriteTest.java | 223 ++++++++++++++++++ .../postgres/FlatPostgresCollection.java | 142 ++++++++--- ...FlatCollectionSubDocSetOperatorParser.java | 7 +- 3 files changed, 335 insertions(+), 37 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 66110449..96c49cd8 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -1360,4 +1360,227 @@ void testIgnoreDocumentReturnsAllSkippedFields() throws Exception { result.getSkippedFields().containsAll(List.of("unknown_field_1", "unknown_field_2"))); } } + + @Nested + @DisplayName("Update SET Operator Tests") + class UpdateSetOperatorTests { + + @Test + @DisplayName("Case 1: SET on field not in schema should skip (default SKIP strategy)") + void testSetFieldNotInSchema() throws Exception { + // Update a field that doesn't exist in the schema + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + SubDocumentUpdate update = + SubDocumentUpdate.builder() + .subDocument("nonexistent_column.some_key") + .operator(UpdateOperator.SET) + .subDocumentValue( + org.hypertrace.core.documentstore.model.subdoc.SubDocumentValue.of("new_value")) + .build(); + + // With default SKIP strategy, this should not throw but skip the update + Optional result = + flatCollection.update( + query, + List.of(update), + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build()); + + // Document should still be returned (unchanged since update was skipped) + assertTrue(result.isPresent()); + + // Verify the document wasn't modified (item should still be "Soap") + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"item\" FROM \"%s\" WHERE \"id\" = '1'", FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("Soap", rs.getString("item")); + } + } + + @Test + @DisplayName("Case 2: SET on JSONB column that is NULL should create the structure") + void testSetJsonbColumnIsNull() throws Exception { + // Row 2 has props = NULL + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("2"))) + .build(); + + SubDocumentUpdate update = + SubDocumentUpdate.builder() + .subDocument("props.newKey") + .operator(UpdateOperator.SET) + .subDocumentValue( + org.hypertrace.core.documentstore.model.subdoc.SubDocumentValue.of("newValue")) + .build(); + + Optional result = + flatCollection.update( + query, + List.of(update), + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build()); + + assertTrue(result.isPresent()); + + // Verify props now has the new key + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"props\"->>'newKey' as newKey FROM \"%s\" WHERE \"id\" = '2'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("newValue", rs.getString("newKey")); + } + } + + @Test + @DisplayName("Case 3: SET on JSONB path that exists should update the value") + void testSetJsonbPathExists() throws Exception { + // Row 1 has props.brand = "Dettol" + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + SubDocumentUpdate update = + SubDocumentUpdate.builder() + .subDocument("props.brand") + .operator(UpdateOperator.SET) + .subDocumentValue( + org.hypertrace.core.documentstore.model.subdoc.SubDocumentValue.of( + "UpdatedBrand")) + .build(); + + Optional result = + flatCollection.update( + query, + List.of(update), + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build()); + + assertTrue(result.isPresent()); + + // Verify props.brand was updated + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"props\"->>'brand' as brand FROM \"%s\" WHERE \"id\" = '1'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("UpdatedBrand", rs.getString("brand")); + } + } + + @Test + @DisplayName("Case 4: SET on JSONB path that doesn't exist should create the key") + void testSetJsonbPathDoesNotExist() throws Exception { + // Row 1 has props but no "newAttribute" key + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + SubDocumentUpdate update = + SubDocumentUpdate.builder() + .subDocument("props.newAttribute") + .operator(UpdateOperator.SET) + .subDocumentValue( + org.hypertrace.core.documentstore.model.subdoc.SubDocumentValue.of( + "brandNewValue")) + .build(); + + Optional result = + flatCollection.update( + query, + List.of(update), + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build()); + + assertTrue(result.isPresent()); + + // Verify props.newAttribute was created + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"props\"->>'newAttribute' as newAttr, \"props\"->>'brand' as brand FROM \"%s\" WHERE \"id\" = '1'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("brandNewValue", rs.getString("newAttr")); + // Verify existing data wasn't lost + assertEquals("Dettol", rs.getString("brand")); + } + } + + @Test + @DisplayName("SET on top-level column should update the value directly") + void testSetTopLevelColumn() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + SubDocumentUpdate update = + SubDocumentUpdate.builder() + .subDocument("item") + .operator(UpdateOperator.SET) + .subDocumentValue( + org.hypertrace.core.documentstore.model.subdoc.SubDocumentValue.of("UpdatedSoap")) + .build(); + + Optional result = + flatCollection.update( + query, + List.of(update), + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build()); + + assertTrue(result.isPresent()); + + // Verify item was updated + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"item\" FROM \"%s\" WHERE \"id\" = '1'", FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("UpdatedSoap", rs.getString("item")); + } + } + } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 1e0c245c..648f6e09 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import java.io.IOException; import java.sql.Connection; import java.sql.Date; @@ -72,8 +73,8 @@ public class FlatPostgresCollection extends PostgresCollection { "Write operations are not supported for flat collections yet!"; private static final String MISSING_COLUMN_STRATEGY_CONFIG = "missingColumnStrategy"; - private static final Map OPERATOR_PARSERS = - Map.of(SET, new FlatCollectionSubDocSetOperatorParser()); + private static final Map + SUB_DOC_UPDATE_PARSERS = Map.of(SET, new FlatCollectionSubDocSetOperatorParser()); private final PostgresLazyilyLoadedSchemaRegistry schemaRegistry; @@ -239,16 +240,16 @@ public Optional update( UpdateOptions updateOptions) throws IOException { - if (updates == null || updates.isEmpty()) { - throw new IOException("Updates collection cannot be null or empty"); - } + Preconditions.checkArgument( + updateOptions != null && !updates.isEmpty(), "Updates collection cannot be NULL or empty"); String tableName = tableIdentifier.getTableName(); + // Acquire a transactional connection that can be managed manually try (Connection connection = client.getTransactionalConnection()) { try { - // 1. Validate all columns exist and operators are supported - validateUpdates(updates, tableName); + // 1. Validate all columns exist and operators are supported. + Map resolvedColumns = resolvePathsToColumns(updates, tableName); // 2. Get before-document if needed (only for BEFORE_UPDATE) Optional beforeDoc = Optional.empty(); @@ -262,7 +263,7 @@ public Optional update( } // 3. Build and execute UPDATE - executeUpdate(connection, query, updates, tableName); + executeUpdate(connection, query, updates, tableName, resolvedColumns); // 4. Resolve return document based on options Document returnDoc = null; @@ -285,35 +286,91 @@ public Optional update( } } - private void validateUpdates(Collection updates, String tableName) - throws IOException { + /** + * Validates all updates and resolves column names. + * + * @return Map of path -> columnName for all resolved columns. For example: customAttributes.props + * -> customAttributes (since customAttributes is the top-level JSONB col) + */ + private Map resolvePathsToColumns( + Collection updates, String tableName) { + Map resolvedColumns = new HashMap<>(); + for (SubDocumentUpdate update : updates) { UpdateOperator operator = update.getOperator(); - if (!OPERATOR_PARSERS.containsKey(operator)) { - throw new IOException("Unsupported update operator: " + operator); - } + Preconditions.checkArgument( + SUB_DOC_UPDATE_PARSERS.containsKey(operator), "Unsupported UPDATE operator: " + operator); - // Check column exists String path = update.getSubDocument().getPath(); - String rootColumn = path.contains(".") ? path.split("\\.")[0] : path; + Optional columnName = resolveColumnName(path, tableName); + + // If the column is not found and missing column strategy is configured to throw, throw an + // exception. + Preconditions.checkArgument( + columnName.isPresent() || missingColumnStrategy != MissingColumnStrategy.THROW, + "Column not found in schema for path: " + + path + + " and missing column strategy is configured to: " + + missingColumnStrategy.toString()); + + columnName.ifPresent(col -> resolvedColumns.put(path, col)); + } - Optional colMeta = - schemaRegistry.getColumnOrRefresh(tableName, rootColumn); + return resolvedColumns; + } + + /** + * Resolves a path to its column name, handling both dotted column names and JSONB paths. + * + *

Resolution order: + * + *

    + *
  1. Check if full path exists as a column name (handles "customProps.something") + *
  2. If not, progressively try shorter prefixes to find a JSONB column + *
+ * + * @return Optional containing the column name, or empty if no valid column found + */ + private Optional resolveColumnName(String path, String tableName) { + // First, check if the full path is a column name. If yes, then it's a top-level field. Return + // it. + if (schemaRegistry.getColumnOrRefresh(tableName, path).isPresent()) { + return Optional.of(path); + } + + // Not a direct column - try to find a JSONB column prefix + if (!path.contains(".")) { + return Optional.empty(); + } - if (colMeta.isEmpty()) { - throw new IOException("Column not found in schema: " + rootColumn); + String[] parts = path.split("\\."); + StringBuilder columnBuilder = new StringBuilder(parts[0]); + + for (int i = 0; i < parts.length - 1; i++) { + if (i > 0) { + columnBuilder.append(".").append(parts[i]); } + String candidateColumn = columnBuilder.toString(); + Optional colMeta = + schemaRegistry.getColumnOrRefresh(tableName, candidateColumn); - // For nested paths, root column must be JSONB - if (path.contains(".") && colMeta.get().getPostgresType() != PostgresDataType.JSONB) { - throw new IOException( - "Nested path updates require JSONB column, but column '" - + rootColumn - + "' is of type: " - + colMeta.get().getPostgresType()); + if (colMeta.isPresent() && colMeta.get().getPostgresType() == PostgresDataType.JSONB) { + return Optional.of(candidateColumn); } } + + return Optional.empty(); + } + + /** Extracts the nested JSONB path from a full path given the resolved column name. */ + private String[] getNestedPath(String fullPath, String columnName) { + if (fullPath.equals(columnName)) { + return new String[0]; + } + // Remove column name prefix and split the rest + String nested = fullPath.substring(columnName.length() + 1); // +1 for the dot + return nested.split("\\."); } private Optional selectFirstDocument(Connection connection, Query query) @@ -335,7 +392,11 @@ private Optional getFirstDocumentForFlat(ResultSet resultSet) throws I } private void executeUpdate( - Connection connection, Query query, Collection updates, String tableName) + Connection connection, + Query query, + Collection updates, + String tableName, + Map resolvedColumns) throws SQLException { // Build WHERE clause @@ -349,27 +410,39 @@ private void executeUpdate( for (SubDocumentUpdate update : updates) { String path = update.getSubDocument().getPath(); - String rootColumn = path.contains(".") ? path.split("\\.")[0] : path; - String[] nestedPath = - path.contains(".") ? path.substring(path.indexOf(".") + 1).split("\\.") : new String[0]; + String columnName = resolvedColumns.get(path); + + if (columnName == null) { + LOGGER.warn("Skipping update for unresolved path: {}", path); + continue; + } PostgresColumnMetadata colMeta = - schemaRegistry.getColumnOrRefresh(tableName, rootColumn).orElseThrow(); + schemaRegistry.getColumnOrRefresh(tableName, columnName).orElseThrow(); FlatUpdateContext context = FlatUpdateContext.builder() - .columnName(rootColumn) - .nestedPath(nestedPath) + .columnName(columnName) + // get the nested path. So for example, if colName is `customAttr` and full path is + // `customAttr.props`, then the nested path is `props`. + .nestedPath(getNestedPath(path, columnName)) .columnType(colMeta.getPostgresType()) .value(update.getSubDocumentValue()) .params(params) .build(); - FlatCollectionSubDocUpdateOperatorParser operatorParser = OPERATOR_PARSERS.get(update.getOperator()); + FlatCollectionSubDocUpdateOperatorParser operatorParser = + SUB_DOC_UPDATE_PARSERS.get(update.getOperator()); String fragment = operatorParser.parse(context); setFragments.add(fragment); } + // If all updates were skipped, nothing to do + if (setFragments.isEmpty()) { + LOGGER.warn("All update paths were skipped - no valid columns to update"); + return; + } + // Build final UPDATE SQL String sql = String.format( @@ -542,6 +615,7 @@ private boolean shouldRefreshSchemaAndRetry(String sqlState) { * by column name. LinkedHashMap preserves insertion order for consistent parameter binding. */ private static class TypedDocument { + private final Map values = new HashMap<>(); private final Map types = new HashMap<>(); private final Map arrays = new HashMap<>(); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatCollectionSubDocSetOperatorParser.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatCollectionSubDocSetOperatorParser.java index 08d9eb7d..9c4c0ddb 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatCollectionSubDocSetOperatorParser.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatCollectionSubDocSetOperatorParser.java @@ -19,8 +19,8 @@ * to_jsonb(?))} * */ -public class FlatCollectionSubDocSetOperatorParser implements - FlatCollectionSubDocUpdateOperatorParser { +public class FlatCollectionSubDocSetOperatorParser + implements FlatCollectionSubDocUpdateOperatorParser { /** Visitor to extract raw values from SubDocumentValue for use in prepared statements. */ private static final SubDocumentValueVisitor VALUE_EXTRACTOR = @@ -74,8 +74,9 @@ private String parseNestedJsonb(FlatUpdateContext context) { // Use jsonb_set with COALESCE to handle null columns // to_jsonb(?) converts the value to proper JSONB format + // 4th param (true) creates the key if it doesn't exist return String.format( - "\"%s\" = jsonb_set(COALESCE(\"%s\", '{}'), ?::text[], to_jsonb(?))", + "\"%s\" = jsonb_set(COALESCE(\"%s\", '{}'), ?::text[], to_jsonb(?), true)", context.getColumnName(), context.getColumnName()); } From f1012ce9d0985cdfca039929b526be80e02d9e6f Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Tue, 20 Jan 2026 12:23:31 +0530 Subject: [PATCH 5/8] Fix failing test case --- .../hypertrace/core/documentstore/FlatCollectionWriteTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 96c49cd8..969878a5 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -1009,7 +1009,8 @@ void testUpdateUnsupportedOperator() { UpdateOptions options = UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); - assertThrows(IOException.class, () -> flatCollection.update(query, updates, options)); + assertThrows( + IllegalArgumentException.class, () -> flatCollection.update(query, updates, options)); } @Test From 8f7d201b22c18a17cab64bb8f34b76beba652d91 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Tue, 20 Jan 2026 12:39:37 +0530 Subject: [PATCH 6/8] Fix failing test case --- .../documentstore/DocStoreQueryV1Test.java | 42 +++++++++++++------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java index a0c5a1e0..1f2831da 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java @@ -201,7 +201,7 @@ private static void createFlatCollectionSchema( String createTableSQL = String.format( "CREATE TABLE \"%s\" (" - + "\"_id\" INTEGER PRIMARY KEY," + + "\"id\" TEXT PRIMARY KEY," + "\"item\" TEXT," + "\"price\" INTEGER," + "\"quantity\" INTEGER," @@ -3540,7 +3540,7 @@ void testFlatPostgresCollectionUnnestWithOnlyMainFilter(String dataStoreName) void testFlatPostgresCollectionArrayRelationalFilter(String dataStoreName) throws IOException { Collection flatCollection = getFlatCollection(dataStoreName); - // Filter: ANY tag in tags equals "hygiene" AND _id <= 8 + // Filter: ANY tag in tags equals "hygiene" AND id IN (1-8) // Exclude docs 9-10 (NULL/empty arrays) to avoid ARRAY[] type error Query arrayRelationalQuery = Query.builder() @@ -3560,7 +3560,10 @@ void testFlatPostgresCollectionArrayRelationalFilter(String dataStoreName) throw .build()) .operand( RelationalExpression.of( - IdentifierExpression.of("_id"), LTE, ConstantExpression.of(8))) + IdentifierExpression.of("id"), + IN, + ConstantExpression.ofStrings( + List.of("1", "2", "3", "4", "5", "6", "7", "8")))) .build()) .build(); @@ -3617,7 +3620,10 @@ void testFlatVsNestedCollectionNestedFieldSelections(String dataStoreName) throw .addSort(IdentifierExpression.of("item"), ASC) .setFilter( RelationalExpression.of( - IdentifierExpression.of("_id"), LTE, ConstantExpression.of(8))) + IdentifierExpression.of("id"), + IN, + ConstantExpression.ofStrings( + List.of("1", "2", "3", "4", "5", "6", "7", "8")))) .build(); // Assert both match the expected response @@ -3643,7 +3649,10 @@ void testFlatVsNestedCollectionNestedFieldSelections(String dataStoreName) throw .addSort(IdentifierExpression.of("item"), ASC) .setFilter( RelationalExpression.of( - IdentifierExpression.of("_id"), LTE, ConstantExpression.of(8))) + IdentifierExpression.of("id"), + IN, + ConstantExpression.ofStrings( + List.of("1", "2", "3", "4", "5", "6", "7", "8")))) .build(); // Assert both match the expected response @@ -3669,7 +3678,10 @@ void testFlatVsNestedCollectionNestedFieldSelections(String dataStoreName) throw .addSort(IdentifierExpression.of("item"), ASC) .setFilter( RelationalExpression.of( - IdentifierExpression.of("_id"), LTE, ConstantExpression.of(8))) + IdentifierExpression.of("id"), + IN, + ConstantExpression.ofStrings( + List.of("1", "2", "3", "4", "5", "6", "7", "8")))) .build(); // Assert both match the expected response with nested structure @@ -5242,7 +5254,7 @@ void testRelOpArrayContains(String dataStoreName) { // @> ('["Green"]')::jsonb) p(countWithParser) assertEquals(1, containsCount); - // Test 2: NOT_CONTAINS - props.colors NOT_CONTAINS "Green" AND _id <= 8 + // Test 2: NOT_CONTAINS - props.colors NOT_CONTAINS "Green" AND id IN (1-8) // Expected: 7 documents (all except id=1 which has Green, limited to first 8) Query notContainsQuery = Query.builder() @@ -5257,13 +5269,16 @@ void testRelOpArrayContains(String dataStoreName) { ConstantExpression.of("Green"))) .operand( RelationalExpression.of( - IdentifierExpression.of("_id"), LTE, ConstantExpression.of(8))) + IdentifierExpression.of("id"), + IN, + ConstantExpression.ofStrings( + List.of("1", "2", "3", "4", "5", "6", "7", "8")))) .build()) .build(); long notContainsCount = flatCollection.count(notContainsQuery); // Generated query: SELECT COUNT(*) FROM (SELECT * FROM "myTestFlat" WHERE ("props"->'colors' - // IS NULL OR NOT "props"->'colors' @> ('["Green"]')::jsonb) AND ("_id" <= ('8'::int4))) + // IS NULL OR NOT "props"->'colors' @> ('["Green"]')::jsonb) AND ("id" IN ('1', '2', ...))) // p(countWithParser) assertEquals(7, notContainsCount); } @@ -5287,7 +5302,7 @@ void testRelOpArrayIN(String dataStoreName) { long inCount = flatCollection.count(inQuery); assertEquals(2, inCount); - // Test 2: NOT_IN - props.brand NOT_IN ["Dettol"] AND _id <= 8 + // Test 2: NOT_IN - props.brand NOT_IN ["Dettol"] AND id IN (1-8) // Expected: 7 documents (all except id=1 which is Dettol, limited to first 8) Query notInQuery = Query.builder() @@ -5301,7 +5316,10 @@ void testRelOpArrayIN(String dataStoreName) { ConstantExpression.ofStrings(List.of("Dettol")))) .operand( RelationalExpression.of( - IdentifierExpression.of("_id"), LTE, ConstantExpression.of(8))) + IdentifierExpression.of("id"), + IN, + ConstantExpression.ofStrings( + List.of("1", "2", "3", "4", "5", "6", "7", "8")))) .build()) .build(); @@ -5607,7 +5625,7 @@ public void testUnnestNestedArrayWithNullValue(String dataStoreName) throws IOEx UnnestExpression.of(JsonIdentifierExpression.of("props", "source-loc"), true)) .setFilter( RelationalExpression.of( - IdentifierExpression.of("_id"), EQ, ConstantExpression.of(1))) + IdentifierExpression.of("id"), EQ, ConstantExpression.of("1"))) .build(); Iterator iterator = collection.aggregate(query); From e7225a5ad4bdf60523c1e64d61af515557ffb7dc Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Tue, 20 Jan 2026 13:17:25 +0530 Subject: [PATCH 7/8] WIP --- .../FlatCollectionWriteTest.java | 94 +++++++++++++++++++ ...FlatCollectionSubDocSetOperatorParser.java | 39 +++++++- 2 files changed, 130 insertions(+), 3 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 969878a5..8f8d2f1e 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -15,6 +15,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -1583,5 +1584,98 @@ void testSetTopLevelColumn() throws Exception { assertEquals("UpdatedSoap", rs.getString("item")); } } + + @Test + @DisplayName("SET with empty object value") + void testSetWithEmptyObjectValue() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + // SET a JSON object containing an empty object + SubDocumentUpdate update = + SubDocumentUpdate.builder() + .subDocument("props.newProperty") + .operator(UpdateOperator.SET) + .subDocumentValue( + org.hypertrace.core.documentstore.model.subdoc.SubDocumentValue.of( + new JSONDocument( + Map.of("hello", "world", "emptyObject", Collections.emptyMap())))) + .build(); + + Optional result = + flatCollection.update( + query, + List.of(update), + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build()); + + assertTrue(result.isPresent()); + + // Verify the JSON object was set correctly + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"props\"->'newProperty' as newProp FROM \"%s\" WHERE \"id\" = '1'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + String jsonStr = rs.getString("newProp"); + assertNotNull(jsonStr); + assertTrue(jsonStr.contains("hello")); + assertTrue(jsonStr.contains("emptyObject")); + } + } + + @Test + @DisplayName("SET with JSON document as value") + void testSetWithJsonDocumentValue() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + // SET a JSON document as value + SubDocumentUpdate update = + SubDocumentUpdate.builder() + .subDocument("props.nested") + .operator(UpdateOperator.SET) + .subDocumentValue( + org.hypertrace.core.documentstore.model.subdoc.SubDocumentValue.of( + new JSONDocument(Map.of("key1", "value1", "key2", 123)))) + .build(); + + Optional result = + flatCollection.update( + query, + List.of(update), + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build()); + + assertTrue(result.isPresent()); + + // Verify the JSON document was set correctly + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"props\"->'nested'->>'key1' as key1, \"props\"->'nested'->>'key2' as key2 FROM \"%s\" WHERE \"id\" = '1'", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals("value1", rs.getString("key1")); + assertEquals("123", rs.getString("key2")); + } + } } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatCollectionSubDocSetOperatorParser.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatCollectionSubDocSetOperatorParser.java index 9c4c0ddb..40cc11f6 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatCollectionSubDocSetOperatorParser.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatCollectionSubDocSetOperatorParser.java @@ -51,6 +51,39 @@ public Object visit(NullSubDocumentValue value) { } }; + /** + * Visitor that returns the appropriate SQL value expression for jsonb_set. JSON document values + * use ?::jsonb to parse the JSON string directly. Primitive values use to_jsonb(?) to convert to + * proper JSONB format. + */ + private static final SubDocumentValueVisitor VALUE_EXPR_VISITOR = + new SubDocumentValueVisitor<>() { + @Override + public String visit(PrimitiveSubDocumentValue value) { + return "to_jsonb(?)"; + } + + @Override + public String visit(MultiValuedPrimitiveSubDocumentValue value) { + return "to_jsonb(?)"; + } + + @Override + public String visit(NestedSubDocumentValue value) { + return "?::jsonb"; + } + + @Override + public String visit(MultiValuedNestedSubDocumentValue value) { + return "?::jsonb"; + } + + @Override + public String visit(NullSubDocumentValue value) { + return "to_jsonb(?)"; + } + }; + @Override public String parse(FlatUpdateContext context) { if (context.isTopLevel()) { @@ -73,11 +106,11 @@ private String parseNestedJsonb(FlatUpdateContext context) { context.getParams().add(value); // Use jsonb_set with COALESCE to handle null columns - // to_jsonb(?) converts the value to proper JSONB format // 4th param (true) creates the key if it doesn't exist + String valueExpr = context.getValue().accept(VALUE_EXPR_VISITOR); return String.format( - "\"%s\" = jsonb_set(COALESCE(\"%s\", '{}'), ?::text[], to_jsonb(?), true)", - context.getColumnName(), context.getColumnName()); + "\"%s\" = jsonb_set(COALESCE(\"%s\", '{}'), ?::text[], %s, true)", + context.getColumnName(), context.getColumnName(), valueExpr); } /** From d0bc0abb676b4f907e455b7606b8626af337c9c6 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Tue, 20 Jan 2026 13:34:42 +0530 Subject: [PATCH 8/8] Catch and log SQLException --- .../core/documentstore/postgres/FlatPostgresCollection.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 648f6e09..c1173563 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -462,6 +462,9 @@ private void executeUpdate( } int rowsUpdated = ps.executeUpdate(); LOGGER.debug("Rows updated: {}", rowsUpdated); + } catch (SQLException e) { + LOGGER.error("Failed to execute update. SQL: {}, SQLState: {}", sql, e.getSQLState(), e); + throw e; } }