diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 8f8d2f1e..b65d00c7 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; import com.typesafe.config.ConfigFactory; import java.io.IOException; import java.sql.Connection; @@ -21,12 +22,14 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Stream; import org.hypertrace.core.documentstore.expression.impl.ConstantExpression; import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression; import org.hypertrace.core.documentstore.expression.impl.RelationalExpression; import org.hypertrace.core.documentstore.expression.operators.RelationalOperator; import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; import org.hypertrace.core.documentstore.model.exception.SchemaMismatchException; +import org.hypertrace.core.documentstore.model.options.MissingColumnStrategy; import org.hypertrace.core.documentstore.model.options.ReturnDocumentType; import org.hypertrace.core.documentstore.model.options.UpdateOptions; import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; @@ -40,6 +43,11 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; @@ -61,8 +69,7 @@ public class FlatCollectionWriteTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String FLAT_COLLECTION_NAME = "myTestFlat"; private static final String INSERT_STATEMENTS_FILE = "query/pg_flat_collection_insert.json"; - // Initial data has 10 rows (IDs 1-10) - private static final int INITIAL_ROW_COUNT = 10; + private static final String DEFAULT_TENANT = "default"; private static Datastore postgresDatastore; private static Collection flatCollection; @@ -147,6 +154,9 @@ private static void executeInsertStatements() { if (!statement.isEmpty()) { try (PreparedStatement preparedStatement = connection.prepareStatement(statement)) { preparedStatement.executeUpdate(); + } catch (Exception e) { + LOGGER.error("Failed to execute INSERT statement: {}", e.getMessage(), e); + throw e; } } } @@ -222,75 +232,124 @@ void testUpsertAndReturn() { class CreateTests { @Test - @DisplayName("Should create a new document with all field types") - void testCreateNewDocument() throws Exception { + @DisplayName("Should create document with all supported data types") + void testCreateWithAllDataTypes() throws Exception { ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "new-doc-100"); - objectNode.put("item", "Brand New Item"); + String docId = getRandomDocId(4); + + objectNode.put("id", docId); + objectNode.put("item", "Comprehensive Test Item"); objectNode.put("price", 999); - objectNode.put("quantity", 50); + objectNode.put("quantity", "50"); + objectNode.put("big_number", 9223372036854775807L); + objectNode.put("rating", 4.5f); + objectNode.put("weight", 123.456789); objectNode.put("in_stock", true); - objectNode.set("tags", OBJECT_MAPPER.createArrayNode().add("electronics").add("sale")); + objectNode.put("date", 1705315800000L); + objectNode.put("created_date", "2024-01-15"); + objectNode.putArray("tags").add("electronics").add("sale").add("featured"); + objectNode.put("categoryTags", "single-category"); + objectNode.putArray("numbers").add(10).add(20).add(30); + objectNode.putArray("scores").add(1.5).add(2.5).add(3.5); + objectNode.putArray("flags").add(true).add(false).add(true); - // Add JSONB field ObjectNode propsNode = OBJECT_MAPPER.createObjectNode(); propsNode.put("color", "blue"); + propsNode.put("size", "large"); propsNode.put("weight", 2.5); propsNode.put("warranty", true); + propsNode.putObject("nested").put("key", "value"); objectNode.set("props", propsNode); + ObjectNode salesNode = OBJECT_MAPPER.createObjectNode(); + salesNode.put("total", 1000); + salesNode.put("region", "US"); + objectNode.set("sales", salesNode); + Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "new-doc-100"); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); CreateResult result = flatCollection.create(key, document); assertTrue(result.isSucceed()); + assertFalse(result.isPartial()); + assertTrue(result.getSkippedFields().isEmpty()); - // Verify the data was inserted - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT * FROM \"%s\" WHERE \"id\" = 'new-doc-100'", FLAT_COLLECTION_NAME)); - ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals("Brand New Item", rs.getString("item")); - assertEquals(999, rs.getInt("price")); - assertEquals(50, rs.getInt("quantity")); - assertTrue(rs.getBoolean("in_stock")); - - // Verify tags array - java.sql.Array tagsArray = rs.getArray("tags"); - assertNotNull(tagsArray); - String[] tags = (String[]) tagsArray.getArray(); - assertEquals(2, tags.length); - assertEquals("electronics", tags[0]); - assertEquals("sale", tags[1]); - - // Verify JSONB props - String propsJson = rs.getString("props"); - assertNotNull(propsJson); - JsonNode propsResult = OBJECT_MAPPER.readTree(propsJson); - assertEquals("blue", propsResult.get("color").asText()); - assertEquals(2.5, propsResult.get("weight").asDouble(), 0.01); - assertTrue(propsResult.get("warranty").asBoolean()); - } + // Verify all data types were inserted correctly + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + + assertEquals("Comprehensive Test Item", rs.getString("item")); + assertEquals(999, rs.getInt("price")); + assertEquals(50, rs.getInt("quantity")); + assertEquals(9223372036854775807L, rs.getLong("big_number")); + assertEquals(4.5f, rs.getFloat("rating"), 0.01f); + assertEquals(123.456789, rs.getDouble("weight"), 0.0001); + assertTrue(rs.getBoolean("in_stock")); + assertEquals(1705315800000L, rs.getTimestamp("date").getTime()); // epoch millis + assertNotNull(rs.getDate("created_date")); + + String[] tags = (String[]) rs.getArray("tags").getArray(); + assertEquals(3, tags.length); + assertEquals("electronics", tags[0]); + assertEquals("sale", tags[1]); + assertEquals("featured", tags[2]); + + // Single value auto-converted to array + String[] categoryTags = (String[]) rs.getArray("categoryTags").getArray(); + assertEquals(1, categoryTags.length); + assertEquals("single-category", categoryTags[0]); + + Integer[] numbers = (Integer[]) rs.getArray("numbers").getArray(); + assertEquals(3, numbers.length); + assertEquals(10, numbers[0]); + assertEquals(20, numbers[1]); + assertEquals(30, numbers[2]); + + Double[] scores = (Double[]) rs.getArray("scores").getArray(); + assertEquals(3, scores.length); + assertEquals(1.5, scores[0], 0.01); + + Boolean[] flags = (Boolean[]) rs.getArray("flags").getArray(); + assertEquals(3, flags.length); + assertTrue(flags[0]); + assertFalse(flags[1]); + + String propsJson = rs.getString("props"); + assertNotNull(propsJson); + JsonNode propsResult = OBJECT_MAPPER.readTree(propsJson); + assertEquals("blue", propsResult.get("color").asText()); + assertEquals("large", propsResult.get("size").asText()); + assertEquals(2.5, propsResult.get("weight").asDouble(), 0.01); + assertTrue(propsResult.get("warranty").asBoolean()); + assertEquals("value", propsResult.get("nested").get("key").asText()); + + String salesJson = rs.getString("sales"); + assertNotNull(salesJson); + JsonNode salesResult = OBJECT_MAPPER.readTree(salesJson); + assertEquals(1000, salesResult.get("total").asInt()); + assertEquals("US", salesResult.get("region").asText()); + }); } @Test @DisplayName("Should throw DuplicateDocumentException when creating with existing key") void testCreateDuplicateDocument() throws Exception { - // First create succeeds + + String docId = getRandomDocId(4); ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); objectNode.put("id", "dup-doc-200"); objectNode.put("item", "First Item"); Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "dup-doc-200"); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); - flatCollection.create(key, document); + CreateResult createResult = flatCollection.create(key, document); + Preconditions.checkArgument( + createResult.isSucceed(), + "Preconditions failure: Could not create doc with id: " + docId); - // Second create with same key should fail ObjectNode objectNode2 = OBJECT_MAPPER.createObjectNode(); objectNode2.put("id", "dup-doc-200"); objectNode2.put("item", "Second Item"); @@ -299,125 +358,57 @@ void testCreateDuplicateDocument() throws Exception { assertThrows(DuplicateDocumentException.class, () -> flatCollection.create(key, document2)); } - @Test - @DisplayName("Should create document with JSONB field") - void testCreateWithJsonbField() throws Exception { - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "jsonb-doc-300"); - objectNode.put("item", "Item with Props"); - ObjectNode propsNode = OBJECT_MAPPER.createObjectNode(); - propsNode.put("color", "blue"); - propsNode.put("size", "large"); - objectNode.set("props", propsNode); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "jsonb-doc-300"); - - CreateResult result = flatCollection.create(key, document); - - assertTrue(result.isSucceed()); + @ParameterizedTest + @DisplayName( + "When MissingColumnStrategy is Throw, should throw an exception for unknown fields. Unknown fields are those fields that are not found in the schema but are present in the doc") + @ArgumentsSource(MissingColumnStrategyProvider.class) + void testUnknownFieldsAsPerMissingColumnStrategy(MissingColumnStrategy missingColumnStrategy) + throws Exception { - // Verify JSONB data - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT props->>'color' as color FROM \"%s\" WHERE \"id\" = 'jsonb-doc-300'", - FLAT_COLLECTION_NAME)); - ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals("blue", rs.getString("color")); - } - } + String docId = getRandomDocId(4); - @Test - @DisplayName("Should skip unknown fields and insert known fields") - void testCreateSkipsUnknownFields() throws Exception { ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "unknown-field-doc-400"); + objectNode.put("id", docId); objectNode.put("item", "Item"); - objectNode.put("unknown_column", "should be skipped"); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "unknown-field-doc-400"); - - CreateResult result = flatCollection.create(key, document); - - assertTrue(result.isSucceed()); - - // Verify only known columns were inserted - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT * FROM \"%s\" WHERE \"id\" = 'unknown-field-doc-400'", - FLAT_COLLECTION_NAME)); - ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals("Item", rs.getString("item")); - } - } - - @Test - @DisplayName("Should return skipped fields in CreateResult when columns are missing") - void testCreateReturnsSkippedFieldsInResult() throws Exception { - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "skipped-fields-doc-500"); - objectNode.put("item", "Valid Item"); - objectNode.put("price", 100); - objectNode.put("nonexistent_field1", "value1"); - objectNode.put("nonexistent_field2", "value2"); + objectNode.put("unknown_column", "should throw"); Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "skipped-fields-doc-500"); - - CreateResult result = flatCollection.create(key, document); - - assertTrue(result.isSucceed()); - assertTrue(result.isPartial()); - assertNotNull(result.getSkippedFields()); - assertEquals(2, result.getSkippedFields().size()); - assertTrue( - result - .getSkippedFields() - .containsAll(List.of("nonexistent_field1", "nonexistent_field2"))); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); - // Verify the valid fields were inserted - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT * FROM \"%s\" WHERE \"id\" = 'skipped-fields-doc-500'", - FLAT_COLLECTION_NAME)); - ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals("Valid Item", rs.getString("item")); - assertEquals(100, rs.getInt("price")); + if (missingColumnStrategy == MissingColumnStrategy.THROW) { + Collection collection = getFlatCollectionWithStrategy(MissingColumnStrategy.THROW); + assertThrows(SchemaMismatchException.class, () -> collection.create(key, document)); + // Verify no document was inserted + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT COUNT(*) FROM \"%s\" WHERE \"id\" = '%s'", + FLAT_COLLECTION_NAME, key)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1), "Document should not exist in DB after exception"); + } + } else { + CreateResult result = flatCollection.create(key, document); + // for SKIP + assertTrue(result.isSucceed()); + // this is a partial write because unknown_column was not written to + assertTrue(result.isPartial()); + assertTrue(result.getSkippedFields().contains("unknown_column")); + + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + assertEquals("Item", rs.getString("item")); + }); } } - @Test - @DisplayName("Should return empty skipped fields when all columns exist") - void testCreateReturnsEmptySkippedFieldsWhenAllColumnsExist() throws Exception { - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "all-valid-doc-600"); - objectNode.put("item", "Valid Item"); - objectNode.put("price", 200); - objectNode.put("quantity", 10); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "all-valid-doc-600"); - - CreateResult result = flatCollection.create(key, document); - - assertTrue(result.isSucceed()); - assertFalse(result.isPartial()); - assertTrue(result.getSkippedFields().isEmpty()); - } - @Test @DisplayName("Should return failure when all fields are unknown (parsed.isEmpty)") void testCreateFailsWhenAllFieldsAreUnknown() throws Exception { - // Document with only unknown fields - no valid columns will be found ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); objectNode.put("completely_unknown_field1", "value1"); objectNode.put("completely_unknown_field2", "value2"); @@ -427,8 +418,8 @@ void testCreateFailsWhenAllFieldsAreUnknown() throws Exception { CreateResult result = flatCollection.create(key, document); - // Should fail because no valid columns found (parsed.isEmpty() == true) - assertFalse(result.isSucceed()); + // Although no column exists in the schema, it'll create a new doc with the key as the id + assertTrue(result.isSucceed()); assertEquals(3, result.getSkippedFields().size()); assertTrue( result @@ -445,11 +436,11 @@ void testCreateFailsWhenAllFieldsAreUnknown() throws Exception { PreparedStatement ps = conn.prepareStatement( String.format( - "SELECT COUNT(*) FROM \"%s\" WHERE \"id\" = 'all-unknown-doc-700'", - FLAT_COLLECTION_NAME)); + "SELECT COUNT(*) FROM \"%s\" WHERE \"id\" = '%s'", + FLAT_COLLECTION_NAME, key)); ResultSet rs = ps.executeQuery()) { assertTrue(rs.next()); - assertEquals(0, rs.getInt(1)); + assertEquals(1, rs.getInt(1)); } } @@ -504,202 +495,250 @@ void testCreateRefreshesSchemaOnUndefinedColumnError() throws Exception { assertTrue(result.getSkippedFields().contains("temp_col")); // Verify the valid fields were inserted - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT * FROM \"%s\" WHERE \"id\" = 'retry-doc-800'", - FLAT_COLLECTION_NAME)); - ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals("Item after schema refresh", rs.getString("item")); - } + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + assertEquals("Item after schema refresh", rs.getString("item")); + }); } - @Test + @ParameterizedTest @DisplayName("Should skip column with unparseable value and add to skippedFields") - void testCreateSkipsUnparseableValues() throws Exception { + @ArgumentsSource(MissingColumnStrategyProvider.class) + void testUnparsableValuesAsPerMissingColStrategy(MissingColumnStrategy missingColumnStrategy) + throws Exception { + + String docId = getRandomDocId(4); + // Try to insert a string value into an integer column with wrong type // The unparseable column should be skipped, not throw an exception ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "datatype-mismatch-doc-900"); + objectNode.put("id", docId); objectNode.put("item", "Valid Item"); objectNode.put("price", "not_a_number_at_all"); // price is INTEGER, this will fail parsing Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "datatype-mismatch-doc-900"); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); + + if (missingColumnStrategy == MissingColumnStrategy.THROW) { + CreateResult result = + getFlatCollectionWithStrategy(MissingColumnStrategy.SKIP).create(key, document); + + // Should succeed with the valid columns, skipping the unparseable one + assertTrue(result.isSucceed()); + assertTrue(result.isPartial()); + assertEquals(1, result.getSkippedFields().size()); + assertTrue(result.getSkippedFields().contains("price")); + + // Verify the valid fields were inserted + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + assertEquals("Valid Item", rs.getString("item")); + // price should be null since it was skipped + assertEquals(0, rs.getInt("price")); + assertTrue(rs.wasNull()); + }); + } else { + // SKIP strategy: unparseable value should be skipped, document created + CreateResult result = flatCollection.create(key, document); + assertTrue(result.isSucceed()); + assertTrue(result.isPartial()); + assertEquals(1, result.getSkippedFields().size()); + assertTrue(result.getSkippedFields().contains("price")); + + // Verify the valid fields were inserted + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + assertEquals("Valid Item", rs.getString("item")); + // price should be null since it was skipped + assertEquals(0, rs.getInt("price")); + assertTrue(rs.wasNull()); + }); + } + } + } - CreateResult result = flatCollection.create(key, document); + private String getRandomDocId(int len) { + return org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils.random( + len, true, false); + } - // Should succeed with the valid columns, skipping the unparseable one - assertTrue(result.isSucceed()); - assertTrue(result.isPartial()); - assertEquals(1, result.getSkippedFields().size()); - assertTrue(result.getSkippedFields().contains("price")); + private static Collection getFlatCollectionWithStrategy(MissingColumnStrategy strategy) { + return ((PostgresDatastore) postgresDatastore) + .getFlatCollection(FLAT_COLLECTION_NAME, strategy); + } - // Verify the valid fields were inserted - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT * FROM \"%s\" WHERE \"id\" = 'datatype-mismatch-doc-900'", - FLAT_COLLECTION_NAME)); - ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals("Valid Item", rs.getString("item")); - // price should be null since it was skipped - assertEquals(0, rs.getInt("price")); - assertTrue(rs.wasNull()); - } + private void queryAndAssert(Key key, ResultSetConsumer consumer) throws Exception { + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM \"%s\" WHERE \"id\" = '%s'", FLAT_COLLECTION_NAME, key)); + ResultSet rs = ps.executeQuery()) { + consumer.accept(rs); } + } + + @FunctionalInterface + interface ResultSetConsumer { + + void accept(ResultSet rs) throws Exception; + } + + static class MissingColumnStrategyProvider implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext context) { + return Stream.of(MissingColumnStrategy.values()) + .filter( + strategy -> + (strategy == MissingColumnStrategy.THROW) + || (strategy == MissingColumnStrategy.SKIP)) + .map(Arguments::of); + } + } + + @Nested + @DisplayName("CreateOrReplace Operations") + class CreateOrReplaceTests { @Test - @DisplayName("Should handle all scalar data types including string parsing and nulls") - void testCreateWithAllDataTypes() throws Exception { - // Test 1: All types with native values (number nodes, boolean nodes, etc.) - ObjectNode nativeTypesNode = OBJECT_MAPPER.createObjectNode(); - nativeTypesNode.put("id", "native-types-doc"); - nativeTypesNode.put("item", "Native Types"); // TEXT - nativeTypesNode.put("price", 100); // INTEGER (number node) - nativeTypesNode.put("big_number", 9223372036854775807L); // BIGINT (number node) - nativeTypesNode.put("rating", 4.5f); // REAL (number node) - nativeTypesNode.put("weight", 123.456789); // DOUBLE PRECISION (number node) - nativeTypesNode.put("in_stock", true); // BOOLEAN (boolean node) - nativeTypesNode.put("date", "2024-01-15T10:30:00Z"); // TIMESTAMPTZ (textual) - nativeTypesNode.put("created_date", "2024-01-15"); // DATE (textual) - nativeTypesNode.putObject("props").put("key", "value"); // JSONB - - CreateResult result1 = - flatCollection.create( - new SingleValueKey("default", "native-types-doc"), new JSONDocument(nativeTypesNode)); - assertTrue(result1.isSucceed()); - - // Test 2: String representations of numbers (covers parseInt, parseLong, etc.) - ObjectNode stringTypesNode = OBJECT_MAPPER.createObjectNode(); - stringTypesNode.put("id", "string-types-doc"); - stringTypesNode.put("item", "String Types"); - stringTypesNode.put("price", "200"); // INTEGER from string - stringTypesNode.put("big_number", "1234567890123"); // BIGINT from string - stringTypesNode.put("rating", "3.75"); // REAL from string - stringTypesNode.put("weight", "987.654"); // DOUBLE PRECISION from string - stringTypesNode.put("in_stock", "true"); // BOOLEAN from string - - CreateResult result2 = - flatCollection.create( - new SingleValueKey("default", "string-types-doc"), new JSONDocument(stringTypesNode)); - assertTrue(result2.isSucceed()); - - // Test 3: TIMESTAMPTZ from epoch milliseconds - long epochMillis = 1705315800000L; - ObjectNode epochNode = OBJECT_MAPPER.createObjectNode(); - epochNode.put("id", "epoch-doc"); - epochNode.put("item", "Epoch Timestamp"); - epochNode.put("date", epochMillis); // TIMESTAMPTZ from number - - CreateResult result3 = - flatCollection.create( - new SingleValueKey("default", "epoch-doc"), new JSONDocument(epochNode)); - assertTrue(result3.isSucceed()); - - // Test 4: Null values (covers setParameter null handling) - ObjectNode nullNode = OBJECT_MAPPER.createObjectNode(); - nullNode.put("id", "null-doc"); - nullNode.put("item", "Null Values"); - nullNode.putNull("price"); - nullNode.putNull("date"); - nullNode.putNull("in_stock"); - - CreateResult result4 = - flatCollection.create( - new SingleValueKey("default", "null-doc"), new JSONDocument(nullNode)); - assertTrue(result4.isSucceed()); - - // Verify all inserts - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT * FROM \"%s\" WHERE \"id\" IN ('native-types-doc', 'string-types-doc', 'epoch-doc', 'null-doc') ORDER BY \"id\"", - FLAT_COLLECTION_NAME)); - ResultSet rs = ps.executeQuery()) { + @DisplayName("Should create new document and return true") + void testCreateOrReplaceNewDocument() throws Exception { - // epoch-doc - assertTrue(rs.next()); - assertEquals(epochMillis, rs.getTimestamp("date").getTime()); + String docId = getRandomDocId(4); - // native-types-doc - assertTrue(rs.next()); - assertEquals(100, rs.getInt("price")); - assertEquals(9223372036854775807L, rs.getLong("big_number")); - assertEquals(4.5f, rs.getFloat("rating"), 0.01f); - assertEquals(123.456789, rs.getDouble("weight"), 0.0001); - assertTrue(rs.getBoolean("in_stock")); + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "upsert-new-doc-100"); + objectNode.put("item", "New Upsert Item"); + objectNode.put("price", 500); + objectNode.put("quantity", 25); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); - // null-doc - assertTrue(rs.next()); - rs.getInt("price"); - assertTrue(rs.wasNull()); + boolean isNew = flatCollection.createOrReplace(key, document); - // string-types-doc - assertTrue(rs.next()); - assertEquals(200, rs.getInt("price")); - assertEquals(1234567890123L, rs.getLong("big_number")); - assertEquals(3.75f, rs.getFloat("rating"), 0.01f); - } + assertTrue(isNew); + + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + assertEquals("New Upsert Item", rs.getString("item")); + assertEquals(500, rs.getInt("price")); + assertEquals(25, rs.getInt("quantity")); + }); } @Test - @DisplayName("Should handle array types and single-to-array conversion") - void testCreateWithArrayTypes() throws Exception { - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; - - // Test 1: Proper arrays - ObjectNode arrayNode = OBJECT_MAPPER.createObjectNode(); - arrayNode.put("id", "array-doc"); - arrayNode.put("item", "Array Types"); - arrayNode.putArray("tags").add("tag1").add("tag2"); // TEXT[] - arrayNode.putArray("numbers").add(10).add(20); // INTEGER[] - arrayNode.putArray("scores").add(1.5).add(2.5); // DOUBLE PRECISION[] - arrayNode.putArray("flags").add(true).add(false); // BOOLEAN[] - - CreateResult result1 = - flatCollection.create( - new SingleValueKey("default", "array-doc"), new JSONDocument(arrayNode)); - assertTrue(result1.isSucceed()); - - // Test 2: Single values auto-converted to arrays - ObjectNode singleNode = OBJECT_MAPPER.createObjectNode(); - singleNode.put("id", "single-to-array-doc"); - singleNode.put("item", "Single to Array"); - singleNode.put("tags", "single-tag"); // TEXT[] from single value - singleNode.put("numbers", 42); // INTEGER[] from single value - - CreateResult result2 = - flatCollection.create( - new SingleValueKey("default", "single-to-array-doc"), new JSONDocument(singleNode)); - assertTrue(result2.isSucceed()); - - // Verify - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT * FROM \"%s\" WHERE \"id\" IN ('array-doc', 'single-to-array-doc') ORDER BY \"id\"", - FLAT_COLLECTION_NAME)); - ResultSet rs = ps.executeQuery()) { + @DisplayName("Should replace existing document and return false") + void testCreateOrReplaceExistingDocument() throws Exception { + String docId = getRandomDocId(4); + ObjectNode initialNode = OBJECT_MAPPER.createObjectNode(); + initialNode.put("id", docId); + initialNode.put("item", "Original Item"); + initialNode.put("price", 100); + Document initialDoc = new JSONDocument(initialNode); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); + + boolean firstResult = flatCollection.createOrReplace(key, initialDoc); + + Preconditions.checkArgument( + firstResult, "Preconditions failure: Could not create first document with id: " + docId); + + // Now replace with updated document + ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode(); + updatedNode.put("id", docId); + updatedNode.put("item", "Updated Item"); + updatedNode.put("price", 999); + updatedNode.put("quantity", 50); + Document updatedDoc = new JSONDocument(updatedNode); + + boolean secondResult = flatCollection.createOrReplace(key, updatedDoc); + + assertFalse(secondResult); + + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + assertEquals("Updated Item", rs.getString("item")); + assertEquals(999, rs.getInt("price")); + assertEquals(50, rs.getInt("quantity")); + }); + } - // array-doc - assertTrue(rs.next()); - assertEquals(2, ((String[]) rs.getArray("tags").getArray()).length); - assertEquals(2, ((Integer[]) rs.getArray("numbers").getArray()).length); + @Test + @DisplayName("Should skip unknown fields in createOrReplace (default SKIP strategy)") + void testCreateOrReplaceSkipsUnknownFields() throws Exception { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "upsert-skip-fields-300"); + objectNode.put("item", "Item with unknown"); + objectNode.put("price", 200); + objectNode.put("unknown_field", "should be skipped"); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey("default", "upsert-skip-fields-300"); + + boolean isNew = flatCollection.createOrReplace(key, document); + assertTrue(isNew); + + // Verify only known fields were inserted + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + assertEquals("Item with unknown", rs.getString("item")); + assertEquals(200, rs.getInt("price")); + }); + } - // single-to-array-doc - assertTrue(rs.next()); - String[] tags = (String[]) rs.getArray("tags").getArray(); - assertEquals(1, tags.length); - assertEquals("single-tag", tags[0]); - } + @Test + @DisplayName("Should handle JSONB fields in createOrReplace") + void testCreateOrReplaceWithJsonbField() throws Exception { + String docId = getRandomDocId(4); + ObjectNode initialNode = OBJECT_MAPPER.createObjectNode(); + initialNode.put("id", docId); + initialNode.put("item", "Item with props"); + ObjectNode initialProps = OBJECT_MAPPER.createObjectNode(); + initialProps.put("color", "red"); + initialProps.put("size", "small"); + initialNode.set("props", initialProps); + Document initialDoc = new JSONDocument(initialNode); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); + + boolean wasCreated = flatCollection.createOrReplace(key, initialDoc); + Preconditions.checkArgument( + wasCreated, "Precondition failure: Doc could not be created with id: " + docId); + + // Update with new JSONB value + ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode(); + updatedNode.put("id", docId); + updatedNode.put("item", "Updated Item"); + ObjectNode updatedProps = OBJECT_MAPPER.createObjectNode(); + updatedProps.put("color", "blue"); + updatedProps.put("size", "large"); + updatedProps.put("weight", 2.5); + updatedNode.set("props", updatedProps); + Document updatedDoc = new JSONDocument(updatedNode); + + boolean isNew = flatCollection.createOrReplace(key, updatedDoc); + assertFalse(isNew); + + // Verify JSONB was updated + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + JsonNode propsResult = OBJECT_MAPPER.readTree(rs.getString("props")); + assertEquals("blue", propsResult.get("color").asText()); + assertEquals("large", propsResult.get("size").asText()); + assertEquals(2.5, propsResult.get("weight").asDouble(), 0.01); + }); } } @@ -1113,254 +1152,69 @@ void testBulkOperationOnArrayValue() throws IOException { } @Nested - @DisplayName("Strict Mode (missingColumnStrategy=THROW)") - class StrictModeTests { - - private Datastore strictDatastore; - private Collection strictCollection; - - @BeforeEach - void setupStrictModeDatastore() { - // Create a datastore with missingColumnStrategy=THROW (strict mode) - String postgresConnectionUrl = - String.format("jdbc:postgresql://localhost:%s/", postgres.getMappedPort(5432)); - - Map strictConfig = new HashMap<>(); - strictConfig.put("url", postgresConnectionUrl); - strictConfig.put("user", "postgres"); - strictConfig.put("password", "postgres"); - // Configure strict mode via customParams - Map customParams = new HashMap<>(); - customParams.put("missingColumnStrategy", "THROW"); - strictConfig.put("customParams", customParams); - - strictDatastore = - DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(strictConfig)); - strictCollection = - strictDatastore.getCollectionForType(FLAT_COLLECTION_NAME, DocumentType.FLAT); - } + @DisplayName("CreateOrReplace Schema Refresh Tests") + class CreateOrReplaceSchemaRefreshTests { @Test - @DisplayName("Should throw SchemaMismatchException when column not in schema (strict mode)") - void testStrictModeThrowsOnUnknownColumn() { - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "strict-unknown-col-doc"); - objectNode.put("item", "Valid Item"); - objectNode.put("unknown_column", "this should fail"); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "strict-unknown-col-doc"); - - SchemaMismatchException exception = - assertThrows(SchemaMismatchException.class, () -> strictCollection.create(key, document)); - - assertTrue(exception.getMessage().contains("unknown_column")); - assertTrue(exception.getMessage().contains("not found in schema")); - } - - @Test - @DisplayName( - "Should throw SchemaMismatchException when value type doesn't match schema (strict mode)") - void testStrictModeThrowsOnTypeMismatch() { - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "strict-type-mismatch-doc"); - objectNode.put("item", "Valid Item"); - objectNode.put("price", "not_a_number_at_all"); // price is INTEGER - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "strict-type-mismatch-doc"); - - SchemaMismatchException exception = - assertThrows(SchemaMismatchException.class, () -> strictCollection.create(key, document)); - - assertTrue(exception.getMessage().contains("price")); - assertTrue(exception.getMessage().contains("Failed to parse value")); - } - - @Test - @DisplayName("Should succeed in strict mode when all fields match schema") - void testStrictModeSucceedsWithValidDocument() throws Exception { - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "strict-valid-doc"); - objectNode.put("item", "Valid Item"); - objectNode.put("price", 100); - objectNode.put("quantity", 5); - objectNode.put("in_stock", true); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "strict-valid-doc"); - - CreateResult result = strictCollection.create(key, document); - - assertTrue(result.isSucceed()); - assertFalse(result.isPartial()); - assertTrue(result.getSkippedFields().isEmpty()); - - // Verify data was inserted + @DisplayName("createOrReplace should refresh schema and retry on dropped column") + void testCreateOrReplaceRefreshesSchemaOnDroppedColumn() throws Exception { PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT * FROM \"%s\" WHERE \"id\" = 'strict-valid-doc'", - FLAT_COLLECTION_NAME)); - ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals("Valid Item", rs.getString("item")); - assertEquals(100, rs.getInt("price")); - } - } - - @Test - @DisplayName("Should throw SchemaMismatchException on first unknown field (strict mode)") - void testStrictModeFailsFastOnFirstUnknownField() { - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "strict-multi-unknown-doc"); - objectNode.put("unknown_field_1", "value1"); - objectNode.put("unknown_field_2", "value2"); - objectNode.put("item", "Valid Item"); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "strict-multi-unknown-doc"); - - // Should throw on the first unknown field encountered - assertThrows(SchemaMismatchException.class, () -> strictCollection.create(key, document)); - } - } - - @Nested - @DisplayName("Ignore Document Mode (missingColumnStrategy=IGNORE_DOCUMENT)") - class IgnoreDocumentModeTests { - - private Datastore ignoreDocDatastore; - private Collection ignoreDocCollection; - - @BeforeEach - void setupIgnoreDocumentModeDatastore() { - // Create a datastore with missingColumnStrategy=IGNORE_DOCUMENT - String postgresConnectionUrl = - String.format("jdbc:postgresql://localhost:%s/", postgres.getMappedPort(5432)); - - Map config = new HashMap<>(); - config.put("url", postgresConnectionUrl); - config.put("user", "postgres"); - config.put("password", "postgres"); - Map customParams = new HashMap<>(); - customParams.put("missingColumnStrategy", "IGNORE_DOCUMENT"); - config.put("customParams", customParams); - - ignoreDocDatastore = - DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(config)); - ignoreDocCollection = - ignoreDocDatastore.getCollectionForType(FLAT_COLLECTION_NAME, DocumentType.FLAT); - } - - @Test - @DisplayName("Should return IGNORED status when document has unknown columns") - void testIgnoreDocumentWithUnknownColumn() throws Exception { - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "ignore-doc-unknown-col"); - objectNode.put("item", "Valid Item"); - objectNode.put("unknown_column", "this should cause ignore"); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "ignore-doc-unknown-col"); - - CreateResult result = ignoreDocCollection.create(key, document); - - assertFalse(result.isSucceed()); - assertTrue(result.isDocumentIgnored()); - assertTrue(result.getSkippedFields().contains("unknown_column")); - // Verify no row was inserted - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + // Step 1: Add a temporary column + String addColumnSQL = + String.format( + "ALTER TABLE \"%s\" ADD COLUMN \"temp_upsert_col\" TEXT", FLAT_COLLECTION_NAME); try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT COUNT(*) FROM \"%s\" WHERE \"id\" = 'ignore-doc-unknown-col'", - FLAT_COLLECTION_NAME)); - ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals(0, rs.getInt(1)); + PreparedStatement ps = conn.prepareStatement(addColumnSQL)) { + ps.execute(); + LOGGER.info("Added temporary column 'temp_upsert_col' to table"); } - } - - @Test - @DisplayName("Should return IGNORED status when document has type mismatch") - void testIgnoreDocumentWithTypeMismatch() throws Exception { - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "ignore-doc-type-mismatch"); - objectNode.put("item", "Valid Item"); - objectNode.put("price", "not_a_number"); // price is INTEGER - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "ignore-doc-type-mismatch"); - - CreateResult result = ignoreDocCollection.create(key, document); - assertFalse(result.isSucceed()); - assertTrue(result.isDocumentIgnored()); - assertTrue(result.getSkippedFields().contains("price")); + // Step 2: Create a document with the temp column to cache the schema + ObjectNode objectNode1 = OBJECT_MAPPER.createObjectNode(); + objectNode1.put("id", "upsert-cache-schema-doc"); + objectNode1.put("item", "Item to cache schema"); + objectNode1.put("temp_upsert_col", "temp value"); + flatCollection.createOrReplace( + new SingleValueKey("default", "upsert-cache-schema-doc"), new JSONDocument(objectNode1)); + LOGGER.info("Schema cached with temp_upsert_col"); - // Verify no row was inserted - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + // Step 3: DROP the column - now the cached schema is stale + String dropColumnSQL = + String.format("ALTER TABLE \"%s\" DROP COLUMN \"temp_upsert_col\"", FLAT_COLLECTION_NAME); try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT COUNT(*) FROM \"%s\" WHERE \"id\" = 'ignore-doc-type-mismatch'", - FLAT_COLLECTION_NAME)); - ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals(0, rs.getInt(1)); + PreparedStatement ps = conn.prepareStatement(dropColumnSQL)) { + ps.execute(); + LOGGER.info("Dropped temp_upsert_col - schema cache is now stale"); } - } - @Test - @DisplayName("Should succeed when all fields match schema (IGNORE_DOCUMENT mode)") - void testIgnoreDocumentSucceedsWithValidDocument() throws Exception { - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "ignore-doc-valid"); - objectNode.put("item", "Valid Item"); - objectNode.put("price", 100); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "ignore-doc-valid"); + // Step 4: Try createOrReplace with the dropped column + // Schema registry still thinks temp_upsert_col exists, so it will include it in UPSERT + // UPSERT will fail with UNDEFINED_COLUMN, triggering handlePSQLExceptionForUpsert + // which will refresh schema and retry + ObjectNode objectNode2 = OBJECT_MAPPER.createObjectNode(); + objectNode2.put("id", "upsert-retry-doc"); + objectNode2.put("item", "Item after schema refresh"); + objectNode2.put("temp_upsert_col", "this column no longer exists"); + Document document = new JSONDocument(objectNode2); + Key key = new SingleValueKey("default", "upsert-retry-doc"); - CreateResult result = ignoreDocCollection.create(key, document); + boolean result = flatCollection.createOrReplace(key, document); - assertTrue(result.isSucceed()); - assertFalse(result.isDocumentIgnored()); - assertTrue(result.getSkippedFields().isEmpty()); + // Should succeed after schema refresh - temp_upsert_col will be skipped + assertTrue(result); - // Verify data was inserted - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + // Verify the valid fields were inserted try (Connection conn = pgDatastore.getPostgresClient(); PreparedStatement ps = conn.prepareStatement( String.format( - "SELECT * FROM \"%s\" WHERE \"id\" = 'ignore-doc-valid'", - FLAT_COLLECTION_NAME)); + "SELECT * FROM \"%s\" WHERE \"id\" = '%s'", FLAT_COLLECTION_NAME, key)); ResultSet rs = ps.executeQuery()) { assertTrue(rs.next()); - assertEquals("Valid Item", rs.getString("item")); - assertEquals(100, rs.getInt("price")); + assertEquals("Item after schema refresh", rs.getString("item")); } } - - @Test - @DisplayName("Should return all problematic fields in skippedFields when ignored") - void testIgnoreDocumentReturnsAllSkippedFields() throws Exception { - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "ignore-doc-multi-issues"); - objectNode.put("item", "Valid Item"); - objectNode.put("unknown_field_1", "value1"); - objectNode.put("unknown_field_2", "value2"); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "ignore-doc-multi-issues"); - - CreateResult result = ignoreDocCollection.create(key, document); - - assertFalse(result.isSucceed()); - assertTrue(result.isDocumentIgnored()); - assertEquals(2, result.getSkippedFields().size()); - assertTrue( - result.getSkippedFields().containsAll(List.of("unknown_field_1", "unknown_field_2"))); - } } @Nested diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java index b52e16e9..4d2f354c 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/ColumnMetadata.java @@ -23,4 +23,6 @@ public interface ColumnMetadata { * @return whether this column is an array type */ boolean isArray(); + + boolean isPrimaryKey(); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java index fd3fa792..f33ccabe 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/SchemaRegistry.java @@ -38,4 +38,12 @@ public interface SchemaRegistry { * @return optional of the col metadata. */ Optional getColumnOrRefresh(String tableName, String colName); + + /** + * Returns the primary key column name for the given table. + * + * @param tableName the table name + * @return optional of the primary key column name + */ + Optional getPrimaryKeyColumn(String tableName); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/MissingColumnStrategy.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/MissingColumnStrategy.java index 4950213a..8ac1804b 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/MissingColumnStrategy.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/options/MissingColumnStrategy.java @@ -19,5 +19,10 @@ public enum MissingColumnStrategy { * a field doesn't match the schema. The write operation will fail. */ THROW, - IGNORE_DOCUMENT + /** Ignore the entire document if it doesn't match the schema. */ + IGNORE_DOCUMENT; + + public static MissingColumnStrategy defaultStrategy() { + return SKIP; + } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index c1173563..0b307e24 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import java.io.IOException; +import java.sql.Array; import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; @@ -16,15 +17,9 @@ import java.sql.Timestamp; import java.sql.Types; import java.time.Instant; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; +import java.util.stream.Collectors; import org.hypertrace.core.documentstore.BulkArrayValueUpdateRequest; import org.hypertrace.core.documentstore.BulkDeleteResult; import org.hypertrace.core.documentstore.BulkUpdateRequest; @@ -52,6 +47,7 @@ import org.hypertrace.core.documentstore.postgres.update.FlatUpdateContext; import org.hypertrace.core.documentstore.postgres.update.parser.FlatCollectionSubDocSetOperatorParser; import org.hypertrace.core.documentstore.postgres.update.parser.FlatCollectionSubDocUpdateOperatorParser; +import org.hypertrace.core.documentstore.postgres.utils.PostgresUtils; import org.hypertrace.core.documentstore.query.Query; import org.postgresql.util.PSQLException; import org.postgresql.util.PSQLState; @@ -72,6 +68,7 @@ public class FlatPostgresCollection extends PostgresCollection { private static final String WRITE_NOT_SUPPORTED = "Write operations are not supported for flat collections yet!"; private static final String MISSING_COLUMN_STRATEGY_CONFIG = "missingColumnStrategy"; + private static final String DEFAULT_PRIMARY_KEY_COLUMN = "key"; private static final Map SUB_DOC_UPDATE_PARSERS = Map.of(SET, new FlatCollectionSubDocSetOperatorParser()); @@ -88,15 +85,26 @@ public class FlatPostgresCollection extends PostgresCollection { final PostgresClient client, final String collectionName, final PostgresLazyilyLoadedSchemaRegistry schemaRegistry) { + this(client, collectionName, schemaRegistry, null); + } + + FlatPostgresCollection( + final PostgresClient client, + final String collectionName, + final PostgresLazyilyLoadedSchemaRegistry schemaRegistry, + final MissingColumnStrategy missingColumnStrategy) { super(client, collectionName); this.schemaRegistry = schemaRegistry; - this.missingColumnStrategy = parseMissingColumnStrategy(client.getCustomParameters()); + this.missingColumnStrategy = + missingColumnStrategy != null + ? missingColumnStrategy + : parseMissingColumnStrategy(client.getCustomParameters()); } private static MissingColumnStrategy parseMissingColumnStrategy(Map params) { String value = params.get(MISSING_COLUMN_STRATEGY_CONFIG); if (value == null || value.isEmpty()) { - return MissingColumnStrategy.SKIP; // default + return MissingColumnStrategy.defaultStrategy(); } try { return MissingColumnStrategy.valueOf(value.toUpperCase()); @@ -104,8 +112,8 @@ private static MissingColumnStrategy parseMissingColumnStrategy(Map skippedFields = new ArrayList<>(); + + try { + TypedDocument parsed = parseDocument(document, tableName, skippedFields); + + // Add the key as the primary key column + String pkColumn = getPKForTable(tableName); + String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn); + PostgresDataType pkType = getPrimaryKeyType(tableName, pkColumn); + parsed.add(quotedPkColumn, key.toString(), pkType, false); + + String sql = buildUpsertSql(parsed.getColumns(), quotedPkColumn); + LOGGER.debug("Upsert SQL: {}", sql); + + return executeUpsert(sql, parsed); + + } catch (PSQLException e) { + return handlePSQLExceptionForUpsert(e, key, document, tableName, isRetry); + } catch (SQLException e) { + LOGGER.error("SQLException in createOrReplace. key: {} content: {}", key, document, e); + throw new IOException(e); + } + } + + /** + * Builds a PostgreSQL upsert (INSERT ... ON CONFLICT DO UPDATE) SQL statement. + * + *

This method constructs an atomic upsert query that: + * + *

    + *
  • Inserts a new row if no conflict on the primary key + *
  • Updates all non-PK columns if a row with the same PK already exists + *
+ * + *

Generated SQL pattern: + * + *

{@code
+   * INSERT INTO table (col1, col2, pk_col)
+   * VALUES (?, ?, ?)
+   * ON CONFLICT (pk_col) DO UPDATE SET col1 = EXCLUDED.col1, col2 = EXCLUDED.col2
+   * RETURNING (xmax = 0) AS is_insert
+   * }
+ * + *

The EXCLUDED table: In PostgreSQL's ON CONFLICT clause, {@code EXCLUDED} is a special + * table that references the row that would have been inserted (the "proposed" row). This allows + * us to update existing rows with the new values without re-specifying them. + * + *

The RETURNING clause: {@code (xmax = 0) AS is_insert} is a PostgreSQL trick to + * determine if the operation was an INSERT or UPDATE: + * + *

    + *
  • {@code xmax} is a system column that stores the transaction ID of the deleting/updating + * transaction + *
  • For a freshly inserted row, {@code xmax = 0} (no prior transaction modified it) + *
  • For an updated row, {@code xmax != 0} (the UPDATE sets it to the current transaction ID) + *
  • Thus, {@code is_insert = true} means INSERT, {@code is_insert = false} means UPDATE + *
+ * + * @param columns List of quoted column names to include in the upsert (including PK) + * @param pkColumn The quoted primary key column name used for conflict detection + * @return The complete upsert SQL statement with placeholders for values + */ + private String buildUpsertSql(List columns, String pkColumn) { + String columnList = String.join(", ", columns); + String placeholders = String.join(", ", columns.stream().map(c -> "?").toArray(String[]::new)); + + // Build SET clause for non-PK columns: col = EXCLUDED.col + String setClause = + columns.stream() + .filter(col -> !col.equals(pkColumn)) + .map(col -> col + " = EXCLUDED." + col) + .collect(Collectors.joining(", ")); + + return String.format( + "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s RETURNING (xmax = 0) AS is_insert", + tableIdentifier, columnList, placeholders, pkColumn, setClause); + } + + private boolean executeUpsert(String sql, TypedDocument parsed) throws SQLException { + try (Connection conn = client.getPooledConnection(); + PreparedStatement ps = conn.prepareStatement(sql)) { + int index = 1; + for (String column : parsed.getColumns()) { + setParameter( + conn, + ps, + index++, + parsed.getValue(column), + parsed.getType(column), + parsed.isArray(column)); + } + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + // is_insert is true if xmax = 0 (new row), false if updated. This helps us differentiate + // b/w creates/upserts + return rs.getBoolean("is_insert"); + } + } + return false; + } + } + + private boolean handlePSQLExceptionForUpsert( + PSQLException e, Key key, Document document, String tableName, boolean isRetry) + throws IOException { + if (!isRetry && shouldRefreshSchemaAndRetry(e.getSQLState())) { + LOGGER.info( + "Schema mismatch detected during upsert (SQLState: {}), refreshing schema and retrying. key: {}", + e.getSQLState(), + key); + schemaRegistry.invalidate(tableName); + return createOrReplaceWithRetry(key, document, true); + } + LOGGER.error("SQLException in createOrReplace. key: {} content: {}", key, document, e); + throw new IOException(e); + } + private CreateResult handlePSQLExceptionForCreate( PSQLException e, Key key, Document document, String tableName, boolean isRetry) throws IOException { if (!isRetry && shouldRefreshSchemaAndRetry(e.getSQLState())) { LOGGER.info( - "Schema mismatch detected (SQLState: {}), refreshing schema and retrying. key: {}", + "Schema mismatch detected during create (SQLState: {}), refreshing schema and retrying. key: {}", e.getSQLState(), key); schemaRegistry.invalidate(tableName); @@ -613,6 +747,17 @@ private boolean shouldRefreshSchemaAndRetry(String sqlState) { || PSQLState.DATATYPE_MISMATCH.getState().equals(sqlState); } + private String getPKForTable(String tableName) { + return schemaRegistry.getPrimaryKeyColumn(tableName).orElse(DEFAULT_PRIMARY_KEY_COLUMN); + } + + private PostgresDataType getPrimaryKeyType(String tableName, String pkColumn) { + return schemaRegistry + .getColumnOrRefresh(tableName, pkColumn) + .map(PostgresColumnMetadata::getPostgresType) + .orElse(PostgresDataType.TEXT); + } + /** * Typed document contains field information along with the field type. Uses LinkedHashMaps keyed * by column name. LinkedHashMap preserves insertion order for consistent parameter binding. @@ -629,10 +774,6 @@ void add(String column, Object value, PostgresDataType type, boolean isArray) { arrays.put(column, isArray); } - boolean isEmpty() { - return values.isEmpty(); - } - List getColumns() { return new ArrayList<>(values.keySet()); } @@ -721,8 +862,9 @@ private void setParameter( } if (isArray) { - Object[] arrayValues = (value instanceof Object[]) ? (Object[]) value : new Object[] {value}; - java.sql.Array sqlArray = conn.createArrayOf(type.getSqlType(), arrayValues); + // todo: Maybe check if the value is actually an array + Object[] arrayValues = (Object[]) value; + Array sqlArray = conn.createArrayOf(type.getSqlType(), arrayValues); ps.setArray(index, sqlArray); return; } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java index e2d6ac58..a16db31d 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java @@ -26,6 +26,7 @@ import org.hypertrace.core.documentstore.model.config.ConnectionConfig; import org.hypertrace.core.documentstore.model.config.DatastoreConfig; import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig; +import org.hypertrace.core.documentstore.model.options.MissingColumnStrategy; import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -181,6 +182,15 @@ public Collection getCollectionForType(String collectionName, DocumentType docum } } + public Collection getFlatCollection( + String collectionName, MissingColumnStrategy missingColumnStrategy) { + return new FlatPostgresCollection( + client, + collectionName, + (PostgresLazyilyLoadedSchemaRegistry) schemaRegistry, + missingColumnStrategy); + } + @Override public boolean healthCheck() { String healthCheckSql = "SELECT 1;"; diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresLazyilyLoadedSchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresLazyilyLoadedSchemaRegistry.java index 38ac8dd5..28821eaf 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresLazyilyLoadedSchemaRegistry.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresLazyilyLoadedSchemaRegistry.java @@ -170,4 +170,19 @@ private boolean canRefresh(String tableName) { } return Duration.between(lastRefresh, Instant.now()).compareTo(refreshCooldown) >= 0; } + + /** + * Returns the primary key column name for the given table. + * + * @param tableName the name of the table + * @return optional of the primary key column name, or empty if no primary key is found + */ + @Override + public Optional getPrimaryKeyColumn(String tableName) { + Map schema = getSchema(tableName); + return schema.values().stream() + .filter(PostgresColumnMetadata::isPrimaryKey) + .map(PostgresColumnMetadata::getName) + .findFirst(); + } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java index 8e35937b..7dc58bd3 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java @@ -5,11 +5,14 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import lombok.AllArgsConstructor; import org.hypertrace.core.documentstore.expression.impl.DataType; import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; +import org.hypertrace.core.documentstore.postgres.utils.PostgresUtils; /** * Fetches schema metadata directly from Postgres system catalogs. Hardcoded to query @@ -25,28 +28,38 @@ public class PostgresMetadataFetcher { + "FROM information_schema.columns " + "WHERE table_schema = 'public' AND table_name = ?"; + private static final String PRIMARY_KEY_SQL = + "SELECT a.attname as column_name " + + "FROM pg_index i " + + "JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) " + + "WHERE i.indrelid = ?::regclass AND i.indisprimary"; + public Map fetch(String tableName) { Map metadataMap = new HashMap<>(); - try (Connection conn = client.getPooledConnection(); - PreparedStatement ps = conn.prepareStatement(DISCOVERY_SQL)) { + try (Connection conn = client.getPooledConnection()) { + Set primaryKeyColumns = fetchPrimaryKeyColumns(conn, tableName); - ps.setString(1, tableName); - try (ResultSet rs = ps.executeQuery()) { - while (rs.next()) { - String columnName = rs.getString("column_name"); - String udtName = rs.getString("udt_name"); - boolean isNullable = "YES".equalsIgnoreCase(rs.getString("is_nullable")); - boolean isArray = udtName != null && udtName.startsWith("_"); - String baseType = isArray ? udtName.substring(1) : udtName; - metadataMap.put( - columnName, - new PostgresColumnMetadata( - columnName, - mapToCanonicalType(baseType), - mapToPostgresType(baseType), - isNullable, - isArray)); + try (PreparedStatement ps = conn.prepareStatement(DISCOVERY_SQL)) { + ps.setString(1, tableName); + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + String columnName = rs.getString("column_name"); + String udtName = rs.getString("udt_name"); + boolean isNullable = "YES".equalsIgnoreCase(rs.getString("is_nullable")); + boolean isArray = udtName != null && udtName.startsWith("_"); + String baseType = isArray ? udtName.substring(1) : udtName; + boolean isPrimaryKey = primaryKeyColumns.contains(columnName); + metadataMap.put( + columnName, + new PostgresColumnMetadata( + columnName, + mapToCanonicalType(baseType), + mapToPostgresType(baseType), + isNullable, + isArray, + isPrimaryKey)); + } } } return metadataMap; @@ -55,6 +68,20 @@ public Map fetch(String tableName) { } } + private Set fetchPrimaryKeyColumns(Connection conn, String tableName) + throws SQLException { + Set pkColumns = new HashSet<>(); + try (PreparedStatement ps = conn.prepareStatement(PRIMARY_KEY_SQL)) { + ps.setString(1, PostgresUtils.wrapFieldNamesWithDoubleQuotes(tableName)); + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + pkColumns.add(rs.getString("column_name")); + } + } + } + return pkColumns; + } + /** Maps Postgres udt_name to canonical DataType. */ private DataType mapToCanonicalType(String udtName) { if (udtName == null) { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java index ecc6ca6a..441d3b14 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/model/PostgresColumnMetadata.java @@ -16,6 +16,7 @@ public class PostgresColumnMetadata implements ColumnMetadata { @Getter private final PostgresDataType postgresType; private final boolean nullable; private final boolean isArray; + private final boolean isPrimaryKey; @Override public String getName() { @@ -36,4 +37,9 @@ public boolean isNullable() { public boolean isArray() { return isArray; } + + @Override + public boolean isPrimaryKey() { + return isPrimaryKey; + } } diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java index 7c373eb8..b714e2ac 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcherTest.java @@ -5,7 +5,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -30,35 +29,52 @@ class PostgresMetadataFetcherTest { @Mock private PostgresClient client; @Mock private Connection connection; - @Mock private PreparedStatement preparedStatement; - @Mock private ResultSet resultSet; + @Mock private PreparedStatement columnsPreparedStatement; + @Mock private PreparedStatement pkPreparedStatement; + @Mock private ResultSet columnsResultSet; + @Mock private ResultSet pkResultSet; private PostgresMetadataFetcher fetcher; + private static final String DISCOVERY_SQL = + "SELECT column_name, udt_name, is_nullable " + + "FROM information_schema.columns " + + "WHERE table_schema = 'public' AND table_name = ?"; + + private static final String PRIMARY_KEY_SQL = + "SELECT a.attname as column_name " + + "FROM pg_index i " + + "JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) " + + "WHERE i.indrelid = ?::regclass AND i.indisprimary"; + @BeforeEach void setUp() throws SQLException { when(client.getPooledConnection()).thenReturn(connection); - when(connection.prepareStatement(anyString())).thenReturn(preparedStatement); - when(preparedStatement.executeQuery()).thenReturn(resultSet); + when(connection.prepareStatement(DISCOVERY_SQL)).thenReturn(columnsPreparedStatement); + when(connection.prepareStatement(PRIMARY_KEY_SQL)).thenReturn(pkPreparedStatement); + when(columnsPreparedStatement.executeQuery()).thenReturn(columnsResultSet); + when(pkPreparedStatement.executeQuery()).thenReturn(pkResultSet); + // Default: no primary keys + when(pkResultSet.next()).thenReturn(false); fetcher = new PostgresMetadataFetcher(client); } @Test void fetchReturnsEmptyMapForTableWithNoColumns() throws SQLException { - when(resultSet.next()).thenReturn(false); + when(columnsResultSet.next()).thenReturn(false); Map result = fetcher.fetch(TEST_TABLE); assertTrue(result.isEmpty()); - verify(preparedStatement).setString(1, TEST_TABLE); + verify(columnsPreparedStatement).setString(1, TEST_TABLE); } @Test void fetchReturnsSingleColumn() throws SQLException { - when(resultSet.next()).thenReturn(true, false); - when(resultSet.getString("column_name")).thenReturn("id"); - when(resultSet.getString("udt_name")).thenReturn("int4"); - when(resultSet.getString("is_nullable")).thenReturn("NO"); + when(columnsResultSet.next()).thenReturn(true, false); + when(columnsResultSet.getString("column_name")).thenReturn("id"); + when(columnsResultSet.getString("udt_name")).thenReturn("int4"); + when(columnsResultSet.getString("is_nullable")).thenReturn("NO"); Map result = fetcher.fetch(TEST_TABLE); @@ -73,10 +89,10 @@ void fetchReturnsSingleColumn() throws SQLException { @Test void fetchReturnsMultipleColumns() throws SQLException { - when(resultSet.next()).thenReturn(true, true, true, false); - when(resultSet.getString("column_name")).thenReturn("id", "name", "price"); - when(resultSet.getString("udt_name")).thenReturn("int8", "text", "float8"); - when(resultSet.getString("is_nullable")).thenReturn("NO", "YES", "YES"); + when(columnsResultSet.next()).thenReturn(true, true, true, false); + when(columnsResultSet.getString("column_name")).thenReturn("id", "name", "price"); + when(columnsResultSet.getString("udt_name")).thenReturn("int8", "text", "float8"); + when(columnsResultSet.getString("is_nullable")).thenReturn("NO", "YES", "YES"); Map result = fetcher.fetch(TEST_TABLE); @@ -291,7 +307,7 @@ void fetchHandlesCaseInsensitiveUdtName() throws SQLException { @Test void fetchThrowsRuntimeExceptionOnSqlException() throws SQLException { - when(preparedStatement.executeQuery()).thenThrow(new SQLException("Connection failed")); + when(columnsPreparedStatement.executeQuery()).thenThrow(new SQLException("Connection failed")); RuntimeException exception = assertThrows(RuntimeException.class, () -> fetcher.fetch(TEST_TABLE)); @@ -366,9 +382,9 @@ void fetchReturnsIsArrayFalseForNonArrayTypes() throws SQLException { private void setupSingleColumnResult(String colName, String udtName, String isNullable) throws SQLException { - when(resultSet.next()).thenReturn(true, false); - when(resultSet.getString("column_name")).thenReturn(colName); - when(resultSet.getString("udt_name")).thenReturn(udtName); - when(resultSet.getString("is_nullable")).thenReturn(isNullable); + when(columnsResultSet.next()).thenReturn(true, false); + when(columnsResultSet.getString("column_name")).thenReturn(colName); + when(columnsResultSet.getString("udt_name")).thenReturn(udtName); + when(columnsResultSet.getString("is_nullable")).thenReturn(isNullable); } }