From cff098dd2e28088f3b14d06c53a3e6572eaa3c1c Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Fri, 6 Dec 2024 15:22:40 -0800 Subject: [PATCH 1/4] Update Java SDK --- build.gradle | 2 +- .../SimpleCountWorkerInterceptor.java | 8 ++++++++ .../samples/earlyreturn/EarlyReturnClient.java | 14 +++++--------- .../interceptor/MyWorkerInterceptor.java | 8 ++++++++ .../RetryOnSignalWorkerInterceptor.java | 8 ++++++++ 5 files changed, 30 insertions(+), 10 deletions(-) diff --git a/build.gradle b/build.gradle index 43c50ce5b..92d13ca62 100644 --- a/build.gradle +++ b/build.gradle @@ -28,7 +28,7 @@ subprojects { ext { otelVersion = '1.30.1' otelVersionAlpha = "${otelVersion}-alpha" - javaSDKVersion = '1.26.1' + javaSDKVersion = '1.27.0' camelVersion = '3.22.1' jarVersion = '1.0.0' } diff --git a/core/src/main/java/io/temporal/samples/countinterceptor/SimpleCountWorkerInterceptor.java b/core/src/main/java/io/temporal/samples/countinterceptor/SimpleCountWorkerInterceptor.java index 268d2620a..3b7e70a4f 100644 --- a/core/src/main/java/io/temporal/samples/countinterceptor/SimpleCountWorkerInterceptor.java +++ b/core/src/main/java/io/temporal/samples/countinterceptor/SimpleCountWorkerInterceptor.java @@ -19,7 +19,9 @@ package io.temporal.samples.countinterceptor; +import io.nexusrpc.handler.OperationContext; import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; import io.temporal.common.interceptors.WorkerInterceptor; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; @@ -34,4 +36,10 @@ public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInt public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) { return new SimpleCountActivityInboundCallsInterceptor(next); } + + @Override + public NexusOperationInboundCallsInterceptor interceptNexusOperation( + OperationContext context, NexusOperationInboundCallsInterceptor next) { + return next; + } } diff --git a/core/src/main/java/io/temporal/samples/earlyreturn/EarlyReturnClient.java b/core/src/main/java/io/temporal/samples/earlyreturn/EarlyReturnClient.java index 6ccb11da1..c9456874b 100644 --- a/core/src/main/java/io/temporal/samples/earlyreturn/EarlyReturnClient.java +++ b/core/src/main/java/io/temporal/samples/earlyreturn/EarlyReturnClient.java @@ -49,17 +49,13 @@ private static void runWorkflowWithUpdateWithStart(WorkflowClient client) { System.out.println("Starting workflow with UpdateWithStart"); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder(workflow::returnInitResult) - .setWaitForStage(WorkflowUpdateStage.COMPLETED) // Wait for update to complete - .build(); - TxResult updateResult = null; try { - WorkflowUpdateHandle updateHandle = - WorkflowClient.updateWithStart(workflow::processTransaction, txRequest, updateOp); - - updateResult = updateHandle.getResultAsync().get(); + updateResult = + WorkflowClient.executeUpdateWithStart( + workflow::returnInitResult, + UpdateOptions.newBuilder().build(), + new WithStartWorkflowOperation<>(workflow::processTransaction, txRequest)); System.out.println( "Workflow initialized with result: " diff --git a/core/src/main/java/io/temporal/samples/excludefrominterceptor/interceptor/MyWorkerInterceptor.java b/core/src/main/java/io/temporal/samples/excludefrominterceptor/interceptor/MyWorkerInterceptor.java index 7674013dc..797765998 100644 --- a/core/src/main/java/io/temporal/samples/excludefrominterceptor/interceptor/MyWorkerInterceptor.java +++ b/core/src/main/java/io/temporal/samples/excludefrominterceptor/interceptor/MyWorkerInterceptor.java @@ -19,7 +19,9 @@ package io.temporal.samples.excludefrominterceptor.interceptor; +import io.nexusrpc.handler.OperationContext; import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; import io.temporal.common.interceptors.WorkerInterceptor; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; import java.util.ArrayList; @@ -49,4 +51,10 @@ public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInt public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) { return new MyActivityInboundCallsInterceptor(excludeActivityTypes, next); } + + @Override + public NexusOperationInboundCallsInterceptor interceptNexusOperation( + OperationContext context, NexusOperationInboundCallsInterceptor next) { + return next; + } } diff --git a/core/src/main/java/io/temporal/samples/retryonsignalinterceptor/RetryOnSignalWorkerInterceptor.java b/core/src/main/java/io/temporal/samples/retryonsignalinterceptor/RetryOnSignalWorkerInterceptor.java index 6e4ba7db0..993f48957 100644 --- a/core/src/main/java/io/temporal/samples/retryonsignalinterceptor/RetryOnSignalWorkerInterceptor.java +++ b/core/src/main/java/io/temporal/samples/retryonsignalinterceptor/RetryOnSignalWorkerInterceptor.java @@ -19,7 +19,9 @@ package io.temporal.samples.retryonsignalinterceptor; +import io.nexusrpc.handler.OperationContext; import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; import io.temporal.common.interceptors.WorkerInterceptor; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; @@ -34,4 +36,10 @@ public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInt public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) { return next; } + + @Override + public NexusOperationInboundCallsInterceptor interceptNexusOperation( + OperationContext context, NexusOperationInboundCallsInterceptor next) { + return next; + } } From 8ba9017408f0f0ecb3db82bbefd922be0fd3e1bc Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Fri, 6 Dec 2024 17:16:23 -0800 Subject: [PATCH 2/4] use WorkerInterceptorBase --- .../SimpleCountWorkerInterceptor.java | 14 +++----------- .../interceptor/MyWorkerInterceptor.java | 14 +++----------- .../RetryOnSignalWorkerInterceptor.java | 13 ++----------- 3 files changed, 8 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/countinterceptor/SimpleCountWorkerInterceptor.java b/core/src/main/java/io/temporal/samples/countinterceptor/SimpleCountWorkerInterceptor.java index 3b7e70a4f..c4713256c 100644 --- a/core/src/main/java/io/temporal/samples/countinterceptor/SimpleCountWorkerInterceptor.java +++ b/core/src/main/java/io/temporal/samples/countinterceptor/SimpleCountWorkerInterceptor.java @@ -20,12 +20,10 @@ package io.temporal.samples.countinterceptor; import io.nexusrpc.handler.OperationContext; -import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; -import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; -import io.temporal.common.interceptors.WorkerInterceptor; -import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; +import io.temporal.common.interceptors.*; +import io.temporal.internal.sync.BaseRootWorkflowInboundCallsInterceptor; -public class SimpleCountWorkerInterceptor implements WorkerInterceptor { +public class SimpleCountWorkerInterceptor extends WorkerInterceptorBase { @Override public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) { @@ -36,10 +34,4 @@ public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInt public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) { return new SimpleCountActivityInboundCallsInterceptor(next); } - - @Override - public NexusOperationInboundCallsInterceptor interceptNexusOperation( - OperationContext context, NexusOperationInboundCallsInterceptor next) { - return next; - } } diff --git a/core/src/main/java/io/temporal/samples/excludefrominterceptor/interceptor/MyWorkerInterceptor.java b/core/src/main/java/io/temporal/samples/excludefrominterceptor/interceptor/MyWorkerInterceptor.java index 797765998..2b3f7c6c0 100644 --- a/core/src/main/java/io/temporal/samples/excludefrominterceptor/interceptor/MyWorkerInterceptor.java +++ b/core/src/main/java/io/temporal/samples/excludefrominterceptor/interceptor/MyWorkerInterceptor.java @@ -20,14 +20,12 @@ package io.temporal.samples.excludefrominterceptor.interceptor; import io.nexusrpc.handler.OperationContext; -import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; -import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; -import io.temporal.common.interceptors.WorkerInterceptor; -import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; +import io.temporal.common.interceptors.*; + import java.util.ArrayList; import java.util.List; -public class MyWorkerInterceptor implements WorkerInterceptor { +public class MyWorkerInterceptor extends WorkerInterceptorBase { private List excludeWorkflowTypes = new ArrayList<>(); private List excludeActivityTypes = new ArrayList<>(); @@ -51,10 +49,4 @@ public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInt public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) { return new MyActivityInboundCallsInterceptor(excludeActivityTypes, next); } - - @Override - public NexusOperationInboundCallsInterceptor interceptNexusOperation( - OperationContext context, NexusOperationInboundCallsInterceptor next) { - return next; - } } diff --git a/core/src/main/java/io/temporal/samples/retryonsignalinterceptor/RetryOnSignalWorkerInterceptor.java b/core/src/main/java/io/temporal/samples/retryonsignalinterceptor/RetryOnSignalWorkerInterceptor.java index 993f48957..b0d48a426 100644 --- a/core/src/main/java/io/temporal/samples/retryonsignalinterceptor/RetryOnSignalWorkerInterceptor.java +++ b/core/src/main/java/io/temporal/samples/retryonsignalinterceptor/RetryOnSignalWorkerInterceptor.java @@ -20,13 +20,10 @@ package io.temporal.samples.retryonsignalinterceptor; import io.nexusrpc.handler.OperationContext; -import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; -import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; -import io.temporal.common.interceptors.WorkerInterceptor; -import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; +import io.temporal.common.interceptors.*; /** Should be registered through WorkerFactoryOptions. */ -public class RetryOnSignalWorkerInterceptor implements WorkerInterceptor { +public class RetryOnSignalWorkerInterceptor extends WorkerInterceptorBase { @Override public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) { return new RetryOnSignalWorkflowInboundCallsInterceptor(next); @@ -36,10 +33,4 @@ public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInt public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) { return next; } - - @Override - public NexusOperationInboundCallsInterceptor interceptNexusOperation( - OperationContext context, NexusOperationInboundCallsInterceptor next) { - return next; - } } From e103d92f80a92c5c3b49b1e7ef30607df85cc1fc Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 9 Dec 2024 09:07:02 -0800 Subject: [PATCH 3/4] format code --- .../samples/countinterceptor/SimpleCountWorkerInterceptor.java | 2 -- .../excludefrominterceptor/interceptor/MyWorkerInterceptor.java | 2 -- .../RetryOnSignalWorkerInterceptor.java | 1 - 3 files changed, 5 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/countinterceptor/SimpleCountWorkerInterceptor.java b/core/src/main/java/io/temporal/samples/countinterceptor/SimpleCountWorkerInterceptor.java index c4713256c..156002e42 100644 --- a/core/src/main/java/io/temporal/samples/countinterceptor/SimpleCountWorkerInterceptor.java +++ b/core/src/main/java/io/temporal/samples/countinterceptor/SimpleCountWorkerInterceptor.java @@ -19,9 +19,7 @@ package io.temporal.samples.countinterceptor; -import io.nexusrpc.handler.OperationContext; import io.temporal.common.interceptors.*; -import io.temporal.internal.sync.BaseRootWorkflowInboundCallsInterceptor; public class SimpleCountWorkerInterceptor extends WorkerInterceptorBase { diff --git a/core/src/main/java/io/temporal/samples/excludefrominterceptor/interceptor/MyWorkerInterceptor.java b/core/src/main/java/io/temporal/samples/excludefrominterceptor/interceptor/MyWorkerInterceptor.java index 2b3f7c6c0..761f39b70 100644 --- a/core/src/main/java/io/temporal/samples/excludefrominterceptor/interceptor/MyWorkerInterceptor.java +++ b/core/src/main/java/io/temporal/samples/excludefrominterceptor/interceptor/MyWorkerInterceptor.java @@ -19,9 +19,7 @@ package io.temporal.samples.excludefrominterceptor.interceptor; -import io.nexusrpc.handler.OperationContext; import io.temporal.common.interceptors.*; - import java.util.ArrayList; import java.util.List; diff --git a/core/src/main/java/io/temporal/samples/retryonsignalinterceptor/RetryOnSignalWorkerInterceptor.java b/core/src/main/java/io/temporal/samples/retryonsignalinterceptor/RetryOnSignalWorkerInterceptor.java index b0d48a426..180306447 100644 --- a/core/src/main/java/io/temporal/samples/retryonsignalinterceptor/RetryOnSignalWorkerInterceptor.java +++ b/core/src/main/java/io/temporal/samples/retryonsignalinterceptor/RetryOnSignalWorkerInterceptor.java @@ -19,7 +19,6 @@ package io.temporal.samples.retryonsignalinterceptor; -import io.nexusrpc.handler.OperationContext; import io.temporal.common.interceptors.*; /** Should be registered through WorkerFactoryOptions. */ From 9dba5652b7f9faab35e7ab3628826379f6b2cbb1 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 9 Dec 2024 09:13:24 -0800 Subject: [PATCH 4/4] fix test --- .../earlyreturn/EarlyReturnClient.java | 2 + .../earlyreturn/TransactionWorkflowTest.java | 52 +++++++++---------- .../ClusterManagerWorkflowWorkerTest.java | 5 +- 3 files changed, 28 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/earlyreturn/EarlyReturnClient.java b/core/src/main/java/io/temporal/samples/earlyreturn/EarlyReturnClient.java index c9456874b..36da78d87 100644 --- a/core/src/main/java/io/temporal/samples/earlyreturn/EarlyReturnClient.java +++ b/core/src/main/java/io/temporal/samples/earlyreturn/EarlyReturnClient.java @@ -19,6 +19,7 @@ package io.temporal.samples.earlyreturn; +import io.temporal.api.enums.v1.WorkflowIdConflictPolicy; import io.temporal.client.*; import io.temporal.serviceclient.WorkflowServiceStubs; @@ -80,6 +81,7 @@ private static void runWorkflowWithUpdateWithStart(WorkflowClient client) { private static WorkflowOptions buildWorkflowOptions() { return WorkflowOptions.newBuilder() .setTaskQueue(TASK_QUEUE) + .setWorkflowIdConflictPolicy(WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL) .setWorkflowId(WORKFLOW_ID_PREFIX + System.currentTimeMillis()) .build(); } diff --git a/core/src/test/java/io/temporal/samples/earlyreturn/TransactionWorkflowTest.java b/core/src/test/java/io/temporal/samples/earlyreturn/TransactionWorkflowTest.java index 5661d28a9..97cda3583 100644 --- a/core/src/test/java/io/temporal/samples/earlyreturn/TransactionWorkflowTest.java +++ b/core/src/test/java/io/temporal/samples/earlyreturn/TransactionWorkflowTest.java @@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import io.temporal.api.enums.v1.WorkflowIdConflictPolicy; import io.temporal.client.*; import io.temporal.failure.ActivityFailure; import io.temporal.failure.ApplicationFailure; @@ -63,23 +64,22 @@ public void testUpdateWithStartValidAmount() throws Exception { TransactionWorkflow workflow = workflowClient.newWorkflowStub( TransactionWorkflow.class, - WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build()); - - // Create update operation - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder(workflow::returnInitResult) - .setWaitForStage(WorkflowUpdateStage.COMPLETED) - .build(); + WorkflowOptions.newBuilder() + .setWorkflowIdConflictPolicy( + WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .build()); // Execute UpdateWithStart - WorkflowUpdateHandle handle = - WorkflowClient.updateWithStart( - workflow::processTransaction, - new TransactionRequest(SOURCE_ACCOUNT, TARGET_ACCOUNT, VALID_AMOUNT), - updateOp); + TransactionRequest txRequest = + new TransactionRequest(SOURCE_ACCOUNT, TARGET_ACCOUNT, VALID_AMOUNT); + TxResult updateResult = + WorkflowClient.executeUpdateWithStart( + workflow::returnInitResult, + UpdateOptions.newBuilder().build(), + new WithStartWorkflowOperation<>(workflow::processTransaction, txRequest)); // Verify both update and final results - TxResult updateResult = handle.getResultAsync().get(); assertEquals(TEST_TRANSACTION_ID, updateResult.getTransactionId()); TxResult finalResult = WorkflowStub.fromTyped(workflow).getResult(TxResult.class); @@ -111,6 +111,7 @@ public void testUpdateWithStartInvalidAmount() throws Exception { String workflowId = "test-workflow-" + UUID.randomUUID(); WorkflowOptions options = WorkflowOptions.newBuilder() + .setWorkflowIdConflictPolicy(WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL) .setTaskQueue(testWorkflowRule.getTaskQueue()) .setWorkflowId(workflowId) .build(); @@ -118,26 +119,21 @@ public void testUpdateWithStartInvalidAmount() throws Exception { TransactionWorkflow workflow = workflowClient.newWorkflowStub(TransactionWorkflow.class, options); - // Create update operation - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder(workflow::returnInitResult) - .setWaitForStage(WorkflowUpdateStage.COMPLETED) - .build(); - // Execute UpdateWithStart and expect the exception - WorkflowServiceException exception = + TransactionRequest txRequest = + new TransactionRequest(SOURCE_ACCOUNT, TARGET_ACCOUNT, INVALID_AMOUNT); + WorkflowUpdateException exception = assertThrows( - WorkflowServiceException.class, + WorkflowUpdateException.class, () -> - WorkflowClient.updateWithStart( - workflow::processTransaction, - new TransactionRequest(SOURCE_ACCOUNT, TARGET_ACCOUNT, INVALID_AMOUNT), - updateOp)); + WorkflowClient.executeUpdateWithStart( + workflow::returnInitResult, + UpdateOptions.newBuilder().build(), + new WithStartWorkflowOperation<>(workflow::processTransaction, txRequest))); // Verify the exception chain - assertTrue(exception.getCause() instanceof WorkflowUpdateException); - assertTrue(exception.getCause().getCause() instanceof ActivityFailure); - ApplicationFailure appFailure = (ApplicationFailure) exception.getCause().getCause().getCause(); + assertTrue(exception.getCause() instanceof ActivityFailure); + ApplicationFailure appFailure = (ApplicationFailure) exception.getCause().getCause(); assertEquals("InvalidAmount", appFailure.getType()); assertTrue(appFailure.getMessage().contains("Invalid Amount")); diff --git a/core/src/test/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowWorkerTest.java b/core/src/test/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowWorkerTest.java index 382ea1107..55ea41fd9 100644 --- a/core/src/test/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowWorkerTest.java +++ b/core/src/test/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowWorkerTest.java @@ -105,9 +105,8 @@ public void testUpdateIdempotency() { .newWorkflowStub( ClusterManagerWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build()); - CompletableFuture result = - WorkflowClient.execute( - cluster::run, new ClusterManagerWorkflow.ClusterManagerInput(Optional.empty(), false)); + WorkflowClient.execute( + cluster::run, new ClusterManagerWorkflow.ClusterManagerInput(Optional.empty(), false)); cluster.startCluster();