diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Mutation.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Mutation.java index c5a09bc3ee..d4a5f576e0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Mutation.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Mutation.java @@ -21,7 +21,9 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.ListValue; +import com.google.protobuf.Timestamp; import java.io.Serializable; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -87,6 +89,16 @@ public enum Op { /** Deletes rows from a table. Succeeds whether or not the named rows were present. */ DELETE, + + /** + * Send a message to a queue, optionally with specified delivery time. + */ + SEND, + + /** + * Acknowledge a message in a queue. Ack only succeeds if the message still exists. + */ + ACK, } private final String table; @@ -94,6 +106,12 @@ public enum Op { private final ImmutableList columns; private final ImmutableList values; private final KeySet keySet; + // Queue related fields + private final String queue; + private final Key key; + private final Value payload; + private final Instant deliveryTime; + private final boolean ignoreNotFound; private Mutation( String table, @@ -101,11 +119,30 @@ private Mutation( @Nullable ImmutableList columns, @Nullable ImmutableList values, @Nullable KeySet keySet) { + this(table, operation, columns, values, keySet, null, null, null, null, null); + } + + private Mutation( + String table, + Op operation, + @Nullable ImmutableList columns, + @Nullable ImmutableList values, + @Nullable KeySet keySet, + @Nullable String queue, + @Nullable Key key, + @Nullable Value payload, + @Nullable Instant deliveryTime, + @Nullable Boolean ignoreNotFound) { this.table = table; this.operation = operation; this.columns = columns; this.values = values; this.keySet = keySet; + this.queue = queue; + this.key = key; + this.payload = payload; + this.deliveryTime = deliveryTime; + this.ignoreNotFound = ignoreNotFound != null && ignoreNotFound; } /** @@ -153,6 +190,22 @@ public static Mutation delete(String table, KeySet keySet) { return new Mutation(table, Op.DELETE, null, null, checkNotNull(keySet)); } + /** + * Returns a builder that can be used to construct an {@link Op#SEND} mutation against {@code queue}; see the + * {@code SEND} documentation for mutation semantics. + */ + public static SendBuilder newSendBuilder(String queue) { + return new SendBuilder(queue); + } + + /** + * Returns a builder that can be used to construct an {@link Op#ACK} mutation against {@code queue}; see the + * {@code ACK} documentation for mutation semantics. + */ + public static AckBuilder newAckBuilder(String queue) { + return new AckBuilder(queue); + } + /** * Builder for {@link Op#INSERT}, {@link Op#INSERT_OR_UPDATE}, {@link Op#UPDATE}, and {@link * Op#REPLACE} mutations. @@ -227,6 +280,69 @@ private void checkDuplicateColumns(ImmutableList columnNames) { } } + /** + * Builder for {@link Op#SEND} mutation. + */ + public static class SendBuilder { + private final String queue; + private Key key; + private Value payload; + private Instant deliveryTime; + + private SendBuilder(String queue) { + this.queue = checkNotNull(queue); + } + + public SendBuilder setKey(Key key) { + this.key = checkNotNull(key); + return this; + } + + public SendBuilder setPayload(Value payload) { + this.payload = checkNotNull(payload); + return this; + } + + public SendBuilder setDeliveryTime(Instant deliveryTime) { + this.deliveryTime = deliveryTime; + return this; + } + + public Mutation build() { + checkState(key != null, "Key must be set for Send mutation"); + checkState(payload != null, "Payload must be set for Send mutation"); + return new Mutation(null, Op.SEND, null, null, null, queue, key, payload, deliveryTime, null); + } + } + + /** + * Builder for {@link Op#ACK} mutation. + */ + public static class AckBuilder { + private final String queue; + private Key key; + private boolean ignoreNotFound = false; + + private AckBuilder(String queue) { + this.queue = checkNotNull(queue); + } + + public AckBuilder setKey(Key key) { + this.key = checkNotNull(key); + return this; + } + + public AckBuilder setIgnoreNotFound(boolean ignoreNotFound) { + this.ignoreNotFound = ignoreNotFound; + return this; + } + + public Mutation build() { + checkState(key != null, "Key must be set for Ack mutation"); + return new Mutation(null, Op.ACK, null, null, null, queue, key, null, null, ignoreNotFound); + } + } + /** Returns the name of the table that this mutation will affect. */ public String getTable() { return table; @@ -248,27 +364,63 @@ public Iterable getColumns() { } /** - * For all types except {@link Op#DELETE}, returns the values that this mutation will write. The - * number of elements returned is always the same as the number returned by {@link #getColumns()}, + * For all types except {@link Op#DELETE}, {@link Op#SEND}, and {@link Op#ACK}, returns the values that this mutation + * will write. The number of elements returned is always the same as the number returned by {@link #getColumns()}, * and the {@code i}th value corresponds to the {@code i}th column. * - * @throws IllegalStateException if {@code operation() == Op.DELETE} + * @throws IllegalStateException + * if {@code operation() == Op.DELETE or operation() == Op.SEND or operation() == Op.ACK} */ public Iterable getValues() { - checkState(operation != Op.DELETE, "values() cannot be called for a DELETE mutation"); + checkState(operation != Op.DELETE && operation != Op.SEND && operation != Op.ACK, + "values() cannot be called for a DELETE/SEND/ACK mutation"); return values; } + /** Returns the name of the queue that this mutation will affect. */ + public String getQueue() { + checkState(operation == Op.SEND || operation == Op.ACK, "getQueue() can only be called " + + "for SEND or ACK mutations"); + return queue; + } + + /** Returns the key of the message to the queue that this mutation will affect. */ + public Key getKey() { + checkState(operation == Op.SEND || operation == Op.ACK, "getKey() can only be called for " + + "SEND or ACK mutations"); + return key; + } + + /** Returns the payload of the message to the queue that this mutation will affect. */ + public Value getPayload() { + checkState(operation == Op.SEND, "getPayload() can only be called for a SEND mutation"); + return payload; + } + + /** Returns the delivery timestamp of the message to the queue that this mutation will affect. */ + @Nullable + public Instant getDeliveryTime() { + checkState(operation == Op.SEND, "getDeliverTime() can only be called for a SEND mutation"); + return deliveryTime; + } + + /** Returns whether an error will be ignored for an ACK mutation that affects a message that does not exist */ + public boolean getIgnoreNotFound() { + checkState(operation == Op.ACK, "getIgnoreNotFound() can only be called for an ACK mutation"); + return ignoreNotFound; + } + /** - * For all types except {@link Op#DELETE}, constructs a map from column name to value. This is - * mainly intended as a convenience for testing; direct access via {@link #getColumns()} and + * For all types except {@link Op#DELETE}, {@link Op#SEND}, and {@link Op#ACK}, constructs a map from column name to + * value. This is mainly intended as a convenience for testing; direct access via {@link #getColumns()} and * {@link #getValues()} is more efficient. * - * @throws IllegalStateException if {@code operation() == Op.DELETE}, or if any duplicate columns - * are present. Detection of duplicates does not consider case. + * @throws IllegalStateException if {@code operation() == Op.DELETE or operation() == Op.SEND or operation() == + * Op.ACK}, or if any duplicate columns are present. Detection of duplicates does not consider case. */ public Map asMap() { - checkState(operation != Op.DELETE, "asMap() cannot be called for a DELETE mutation"); + checkState(operation != Op.DELETE && operation != Op.SEND && operation != Op.ACK, + "asMap() cannot be called for a DELETE/SEND/ACK mutation"); LinkedHashMap map = new LinkedHashMap<>(); for (int i = 0; i < columns.size(); ++i) { Value existing = map.put(columns.get(i), values.get(i)); @@ -310,6 +462,25 @@ void toString(StringBuilder b) { opName = "delete"; isWrite = false; break; + case SEND: + // return directly for SEND + b.append("send(").append(queue).append('{'); + b.append("key=").append(key); + b.append(", payload=").append(payload); + if (deliveryTime != null) { + b.append(", deliveryTime=").append(deliveryTime); + } + b.append("})"); + return; + case ACK: + // return directly for ACK + b.append("ack(").append(queue).append('{'); + b.append("key=").append(key); + if (ignoreNotFound) { + b.append(", ignoreNotFound=true"); + } + b.append("})"); + return; default: throw new AssertionError("Unhandled Op: " + operation); } @@ -348,8 +519,24 @@ public boolean equals(Object o) { } Mutation that = (Mutation) o; - return operation == that.operation - && Objects.equals(table, that.table) + if (operation != that.operation) { + return false; + } + + if (operation == Op.SEND) { + return Objects.equals(queue, that.queue) + && Objects.equals(key, that.key) + && Objects.equals(payload, that.payload) + && Objects.equals(deliveryTime, that.deliveryTime); + } + + if (operation == Op.ACK) { + return Objects.equals(queue, that.queue) + && Objects.equals(key, that.key) + && Objects.equals(ignoreNotFound, that.ignoreNotFound); + } + + return Objects.equals(table, that.table) && Objects.equals(columns, that.columns) && areValuesEqual(values, that.values) && Objects.equals(keySet, that.keySet); @@ -357,7 +544,7 @@ && areValuesEqual(values, that.values) @Override public int hashCode() { - return Objects.hash(operation, table, columns, values, keySet); + return Objects.hash(operation, table, columns, values, keySet, key, payload, deliveryTime, ignoreNotFound); } /** @@ -435,16 +622,7 @@ static com.google.spanner.v1.Mutation toProtoAndReturnRandomMutation( if (last != null && last.operation == Op.DELETE && mutation.table.equals(last.table)) { mutation.keySet.appendToProto(keySet); } else { - if (proto != null) { - com.google.spanner.v1.Mutation builtMutation = proto.build(); - out.add(builtMutation); - // Skip tracking the largest insert mutation if there are mutations other than INSERT. - if (allMutationsExcludingInsert.isEmpty() - && checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) { - largestInsertMutation = builtMutation; - } - maybeAddMutationToListExcludingInserts(builtMutation, allMutationsExcludingInsert); - } + largestInsertMutation = flushMutation(out, proto, allMutationsExcludingInsert, largestInsertMutation); proto = com.google.spanner.v1.Mutation.newBuilder(); com.google.spanner.v1.Mutation.Delete.Builder delete = proto.getDeleteBuilder().setTable(mutation.table); @@ -452,6 +630,27 @@ && checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) { mutation.keySet.appendToProto(keySet); } write = null; + } else if (mutation.operation == Op.SEND) { + largestInsertMutation = flushMutation(out, proto, allMutationsExcludingInsert, largestInsertMutation); + proto = com.google.spanner.v1.Mutation.newBuilder(); + com.google.spanner.v1.Mutation.Send.Builder send = proto.getSendBuilder() + .setQueue(mutation.queue) + .setKey(mutation.key.toProto()) + .setPayload(mutation.payload.toProto()); + if (mutation.getDeliveryTime() != null) { + Instant deliveryTime = mutation.getDeliveryTime(); + Timestamp.Builder timeBuilder = send.getDeliverTimeBuilder() + .setSeconds(deliveryTime.getEpochSecond()) + .setNanos(deliveryTime.getNano()); + send.setDeliverTime(timeBuilder); + } + } else if (mutation.operation == Op.ACK) { + largestInsertMutation = flushMutation(out, proto, allMutationsExcludingInsert, largestInsertMutation); + proto = com.google.spanner.v1.Mutation.newBuilder(); + proto.getAckBuilder() + .setQueue(mutation.queue) + .setKey(mutation.getKey().toProto()) + .setIgnoreNotFound(mutation.ignoreNotFound); } else { ListValue.Builder values = ListValue.newBuilder(); for (Value value : mutation.getValues()) { @@ -464,16 +663,7 @@ && checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) { // Same as previous mutation: coalesce values to reduce request size. write.addValues(values); } else { - if (proto != null) { - com.google.spanner.v1.Mutation builtMutation = proto.build(); - out.add(builtMutation); - // Skip tracking the largest insert mutation if there are mutations other than INSERT. - if (allMutationsExcludingInsert.isEmpty() - && checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) { - largestInsertMutation = builtMutation; - } - maybeAddMutationToListExcludingInserts(builtMutation, allMutationsExcludingInsert); - } + largestInsertMutation = flushMutation(out, proto, allMutationsExcludingInsert, largestInsertMutation); proto = com.google.spanner.v1.Mutation.newBuilder(); switch (mutation.operation) { case INSERT: @@ -498,9 +688,24 @@ && checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) { last = mutation; } // Flush last item. + largestInsertMutation = flushMutation(out, proto, allMutationsExcludingInsert, largestInsertMutation); + + // Select a random mutation based on the heuristic. + if (!allMutationsExcludingInsert.isEmpty()) { + return allMutationsExcludingInsert.get( + ThreadLocalRandom.current().nextInt(allMutationsExcludingInsert.size())); + } else { + return largestInsertMutation; + } + } + + private static com.google.spanner.v1.Mutation flushMutation(List out, + com.google.spanner.v1.Mutation.Builder proto, + List allMutationsExcludingInsert, + com.google.spanner.v1.Mutation largestInsertMutation) { if (proto != null) { com.google.spanner.v1.Mutation builtMutation = proto.build(); - out.add(proto.build()); + out.add(builtMutation); // Skip tracking the largest insert mutation if there are mutations other than INSERT. if (allMutationsExcludingInsert.isEmpty() && checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) { @@ -508,14 +713,7 @@ && checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) { } maybeAddMutationToListExcludingInserts(builtMutation, allMutationsExcludingInsert); } - - // Select a random mutation based on the heuristic. - if (!allMutationsExcludingInsert.isEmpty()) { - return allMutationsExcludingInsert.get( - ThreadLocalRandom.current().nextInt(allMutationsExcludingInsert.size())); - } else { - return largestInsertMutation; - } + return largestInsertMutation; } // Returns true if the input mutation is of type INSERT and has more values than the current diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MutationTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MutationTest.java index a8ddfe706a..4cbd57a846 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MutationTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MutationTest.java @@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.testing.EqualsTester; import java.math.BigDecimal; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -185,6 +186,59 @@ public void delete() { assertThat(m.toString()).isEqualTo("delete(T1{[k1]})"); } + @Test + public void send() { + Key key = Key.of(123); + Value payload = Value.bytes(ByteArray.copyFrom("payload")); + Instant deliverAt = Instant.now().plusSeconds(3600); + Mutation m = Mutation.newSendBuilder("TestQueue") + .setKey(key) + .setPayload(payload) + .setDeliveryTime(deliverAt) + .build(); + assertThat(m.getOperation()).isEqualTo(Mutation.Op.SEND); + assertThat(m.getQueue()).isEqualTo("TestQueue"); + assertThat(m.getKey()).isEqualTo(key); + assertThat(m.getPayload()).isEqualTo(payload); + assertThat(m.getDeliveryTime()).isEqualTo(deliverAt); + assertThat(m.toString()) + .isEqualTo("send(TestQueue{key=[123], payload=" + payload + ", deliveryTime=" + deliverAt + "})"); + } + + @Test + public void sendMissingKey() { + IllegalStateException e = + assertThrows(IllegalStateException.class, + () -> Mutation.newSendBuilder("TestQueue").setPayload(Value.string("payload")).build()); + assertThat(e.getMessage()).contains("Key must be set"); + } + + @Test + public void sendMissingPayload() { + IllegalStateException e = + assertThrows(IllegalStateException.class, + () -> Mutation.newSendBuilder("TestQueue").setKey(Key.of("k1")).build()); + assertThat(e.getMessage()).contains("Payload must be set"); + } + + @Test + public void ackIgnoreNotFound() { + Key key = Key.of("k1"); + Mutation m = Mutation.newAckBuilder("TestQueue").setKey(key).setIgnoreNotFound(true).build(); + assertThat(m.getOperation()).isEqualTo(Mutation.Op.ACK); + assertThat(m.getQueue()).isEqualTo("TestQueue"); + assertThat(m.getKey()).isEqualTo(key); + assertTrue(m.getIgnoreNotFound()); + assertThat(m.toString()).isEqualTo("ack(TestQueue{key=[k1], ignoreNotFound=true})"); + } + + @Test + public void ackMissingKey() { + IllegalStateException e = + assertThrows(IllegalStateException.class, () -> Mutation.newAckBuilder("TestQueue").build()); + assertThat(e.getMessage()).contains("Key must be set"); + } + @Test public void equalsAndHashCode() { EqualsTester tester = new EqualsTester(); @@ -305,15 +359,66 @@ public void equalsAndHashCode() { tester.testEquals(); } + @Test + public void equalsAndHashCode_sendAndAck() { + EqualsTester tester = new EqualsTester(); + + Key key1 = Key.of("k1"); + Key key2 = Key.of("k2"); + Value payload1 = Value.string("p1"); + Value payload2 = Value.string("p2"); + Instant time1 = Instant.now(); + Instant time2 = time1.plusSeconds(10); + + // SEND + tester.addEqualityGroup( + Mutation.newSendBuilder("TestQueue").setKey(key1).setPayload(payload1).build(), + Mutation.newSendBuilder("TestQueue").setKey(key1).setPayload(payload1).build()); + // Different key + tester.addEqualityGroup(Mutation.newSendBuilder("TestQueue").setKey(key2).setPayload(payload1).build()); + // Different payload + tester.addEqualityGroup(Mutation.newSendBuilder("TestQueue").setKey(key1).setPayload(payload2).build()); + // Different queue + tester.addEqualityGroup(Mutation.newSendBuilder("TestQueue2").setKey(key1).setPayload(payload1).build()); + // Different time + tester.addEqualityGroup( + Mutation.newSendBuilder("TestQueue").setKey(key1).setPayload(payload1).setDeliveryTime(time1).build(), + Mutation.newSendBuilder("TestQueue").setKey(key1).setPayload(payload1).setDeliveryTime(time1).build()); + tester.addEqualityGroup( + Mutation.newSendBuilder("TestQueue").setKey(key1).setPayload(payload1).setDeliveryTime(time2).build()); + + // ACK + tester.addEqualityGroup( + Mutation.newAckBuilder("TestQueue").setKey(key1).build(), + Mutation.newAckBuilder("TestQueue").setKey(key1).build()); + // Different key + tester.addEqualityGroup(Mutation.newAckBuilder("TestQueue").setKey(key2).build()); + // Different queue + tester.addEqualityGroup(Mutation.newAckBuilder("TestQueue2").setKey(key1).build()); + // Different ignoreNotFound + tester.addEqualityGroup( + Mutation.newAckBuilder("TestQueue").setKey(key1).setIgnoreNotFound(true).build(), + Mutation.newAckBuilder("TestQueue").setKey(key1).setIgnoreNotFound(true).build()); + + // Distinct Op types + tester.addEqualityGroup(Mutation.newInsertBuilder("TestQueue").build()); + + tester.testEquals(); + } + @Test public void serializationBasic() { + Instant time = Instant.now(); List mutations = Arrays.asList( Mutation.newInsertBuilder("T").set("C").to("V").build(), Mutation.newUpdateBuilder("T").set("C").to("V").build(), Mutation.newInsertOrUpdateBuilder("T").set("C").to("V").build(), Mutation.newReplaceBuilder("T").set("C").to("V").build(), - Mutation.delete("T", KeySet.singleKey(Key.of("k")))); + Mutation.delete("T", KeySet.singleKey(Key.of("k"))), + Mutation.newSendBuilder("Q").setKey(Key.of("k")).setPayload(Value.string("p")) + .setDeliveryTime(time).build(), + Mutation.newAckBuilder("Q").setKey(Key.of("k")).setIgnoreNotFound(true).build()); List proto = new ArrayList<>(); @@ -328,7 +433,7 @@ public void serializationBasic() { assertThat(proto.get(0)).isSameInstanceAs(existingProto); proto.remove(0); - assertThat(proto.size()).isEqualTo(5); + assertThat(proto.size()).isEqualTo(7); MatcherAssert.assertThat( proto.get(0), matchesProto("insert { table: 'T' columns: 'C' values { values { string_value: 'V' } } }")); @@ -347,6 +452,13 @@ public void serializationBasic() { MatcherAssert.assertThat( proto.get(4), matchesProto("delete { table: 'T' key_set { keys { values { string_value: 'k' } } } }")); + MatcherAssert.assertThat( + proto.get(5), + matchesProto("send { queue: 'Q' key { values { string_value: 'k' } } deliver_time { seconds: " + + time.getEpochSecond() + " nanos: " + time.getNano() + " } payload { string_value: 'p' } }")); + MatcherAssert.assertThat( + proto.get(6), + matchesProto("ack { queue: 'Q' key { values { string_value: 'k' } } ignore_not_found: true }")); } @Test diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITQueueTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITQueueTest.java new file mode 100644 index 0000000000..03c27e4a5d --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITQueueTest.java @@ -0,0 +1,176 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.it; + +import com.google.cloud.ByteArray; +import com.google.cloud.spanner.*; +import com.google.cloud.spanner.connection.ConnectionOptions; +import org.junit.*; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.junit.Assume.assumeTrue; + +/** Integration test for Cloud Spanner Queue. */ +@Category(ParallelIntegrationTest.class) +@RunWith(Parameterized.class) +public class ITQueueTest { + @ClassRule + public static IntegrationTestEnv env = new IntegrationTestEnv(); + + @Parameterized.Parameters(name = "Dialect = {0}") + public static List data() { + List params = new ArrayList<>(); + params.add(new DialectTestParameter(Dialect.GOOGLE_STANDARD_SQL)); + return params; + } + + @Parameterized.Parameter() public DialectTestParameter dialect; + + + private static DatabaseClient googleStandardSQLClient; + + private static final String[] GOOGLE_STANDARD_SQL_SCHEMA = + new String[] { + "CREATE Queue Q1 (" + + " Id INT64 NOT NULL," + + " Payload BYTES(MAX) NOT NULL," + + ") PRIMARY KEY (Id), " + + "OPTIONS (receive_mode = 'PULL')", + "CREATE TABLE T1 (" + + " K1 INT64 NOT NULL," + + " K INT64 NOT NULL," + + ") PRIMARY KEY (K1)" + }; + + private static DatabaseClient client; + + private static boolean isUsingCloudDevel() { + String jobType = System.getenv("JOB_TYPE"); + + // Assumes that the jobType contains the string "cloud-devel" to signal that + // the environment is cloud-devel. + return !isNullOrEmpty(jobType) && jobType.contains("cloud-devel"); + } + + private Struct readRow(String queue, Key key, String... columns) { + return client + .singleUse(TimestampBound.strong()) + .readRow(queue, key, Arrays.asList(columns)); + } + + @BeforeClass + public static void setUpTestSuite() { + // TODO: remove once the feature is fully enabled in prod + assumeTrue("Queue is currently only supported in cloud-devel", isUsingCloudDevel()); + Database googleStandardSQLDatabase = + env.getTestHelper().createTestDatabase(GOOGLE_STANDARD_SQL_SCHEMA); + googleStandardSQLClient = env.getTestHelper().getDatabaseClient(googleStandardSQLDatabase); + System.out.println("Database created"); + } + + @Before + public void setUp() { + // TODO: add postgres schema & client after feature is enabled + client = googleStandardSQLClient; + } + + @AfterClass + public static void teardown() { + ConnectionOptions.closeSpanner(); + } + + @Test + public void testSendAndAckMutation() { + client.write(Arrays.asList( + Mutation.newSendBuilder("Q1") + .setKey(Key.of(1)) + .setPayload(Value.bytes(ByteArray.copyFrom("payload1"))) + .build(), + Mutation.newSendBuilder("Q1") + .setKey(Key.of(2)) + .setPayload(Value.bytes(ByteArray.copyFrom("payload2"))) + .build(), + Mutation.newSendBuilder("Q1") + .setKey(Key.of(3)) + .setPayload(Value.bytes(ByteArray.copyFrom("payload3"))) + .setDeliveryTime(Instant.now()) + .build())); + + // Verifying messages are in the queue. + Struct row = readRow("Q1", Key.of(1), "Payload"); + assertThat(row == null).isFalse(); + assertThat(row.isNull(0)).isFalse(); + assertThat(row.getBytes(0)).isEqualTo(ByteArray.copyFrom("payload1")); + + row = readRow("Q1", Key.of(2), "Payload"); + assertThat(row.isNull(0)).isFalse(); + assertThat(row.getBytes(0)).isEqualTo(ByteArray.copyFrom("payload2")); + + row = readRow("Q1", Key.of(3), "Payload"); + assertThat(row.isNull(0)).isFalse(); + assertThat(row.getBytes(0)).isEqualTo(ByteArray.copyFrom("payload3")); + + // Ack-ing the first two messages. + client.write(Arrays.asList( + Mutation.newAckBuilder("Q1") + .setKey(Key.of(1)) + .build(), + Mutation.newAckBuilder("Q1") + .setKey(Key.of(2)) + .build())); + + // Verifying the first 2 messages are acked and remvoed from the queue + row = readRow("Q1", Key.of(1), "Payload"); + assertThat(row == null).isTrue(); + row = readRow("Q1", Key.of(2), "Payload"); + assertThat(row == null).isTrue(); + row = readRow("Q1", Key.of(3), "Payload"); + assertThat(row.isNull(0)).isFalse(); + assertThat(row.getBytes(0)).isEqualTo(ByteArray.copyFrom("payload3")); + } + + @Test + public void testAckNotFound() { + // Enable IgnoreNotFound. + client.write(Collections.singletonList( + Mutation.newAckBuilder("Q1") + .setKey(Key.of(1)) + .setIgnoreNotFound(true) + .build())); + Struct row = readRow("Q1", Key.of(1), "Payload"); + assertThat(row == null).isTrue(); + + // Disable IgnoreNotFound. + SpannerException thrown = assertThrows(SpannerException.class, () -> client.write(Collections.singletonList( + Mutation.newAckBuilder("Q1") + .setKey(Key.of(1)) + .setIgnoreNotFound(false) + .build()))); + assertThat(thrown).hasMessageThat().contains("NOT_FOUND: Message not found"); + } +}