From ba8c3e1316bce6ccefc2b8e650d3e21dc7f3fe0c Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Mon, 30 Dec 2024 23:30:15 +0100 Subject: [PATCH] Workflow timer sample Signed-off-by: Tihomir Surdilovic --- README.md | 1 + .../samples/hello/HelloWorkflowTimer.java | 253 ++++++++++++++++++ .../samples/hello/HelloWorkflowTimerTest.java | 53 ++++ 3 files changed, 307 insertions(+) create mode 100644 core/src/main/java/io/temporal/samples/hello/HelloWorkflowTimer.java create mode 100644 core/src/test/java/io/temporal/samples/hello/HelloWorkflowTimerTest.java diff --git a/README.md b/README.md index ea4a0f263..b30743697 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,7 @@ See the README.md file in each main sample directory for cut/paste Gradle comman - [**HelloUpdate**](/core/src/main/java/io/temporal/samples/hello/HelloUpdate.java): Demonstrates how to create and interact with an Update. - [**HelloDelayedStart**](/core/src/main/java/io/temporal/samples/hello/HelloDelayedStart.java): Demonstrates how to use delayed start config option when starting a Workflow Executions. - [**HelloSignalWithTimer**](/core/src/main/java/io/temporal/samples/hello/HelloSignalWithTimer.java): Demonstrates how to use collect signals for certain amount of time and then process last one. + - [**HelloWorkflowTimer**](/core/src/main/java/io/temporal/samples/hello/HelloWorkflowTimer.java): Demonstrates how we can use workflow timer to restrict duration of workflow execution instead of workflow run/execution timeouts. #### Scenario-based samples diff --git a/core/src/main/java/io/temporal/samples/hello/HelloWorkflowTimer.java b/core/src/main/java/io/temporal/samples/hello/HelloWorkflowTimer.java new file mode 100644 index 000000000..a03a8104b --- /dev/null +++ b/core/src/main/java/io/temporal/samples/hello/HelloWorkflowTimer.java @@ -0,0 +1,253 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.hello; + +import io.temporal.activity.*; +import io.temporal.client.ActivityCompletionException; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.failure.ActivityFailure; +import io.temporal.failure.CanceledFailure; +import io.temporal.failure.ChildWorkflowFailure; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.workflow.*; +import java.time.Duration; + +/** Sample shows how to use workflow timer instead of WorkflowOptions->Run/ExecutionTimeout */ +public class HelloWorkflowTimer { + private static String WORKFLOW_ID = "HelloWorkflowWithTimer"; + private static String TASK_QUEUE = "HelloWorkflowWithTimerTaskQueue"; + // Change time to 12 to 20 seconds to handle cancellation while child workflow is running + private static int TIME_SECS = 8; + + // Workflow + @WorkflowInterface + public interface WorkflowWithTimer { + @WorkflowMethod + String execute(String input); + } + + public static class WorkflowWithTimerImpl implements WorkflowWithTimer { + // Our timer cancellation scope + private CancellationScope timerCancellationScope; + // Our workflow cancellation scope + private CancellationScope workflowCancellationScope; + // Workflow result + private String workflowResult = ""; + private Promise workflowTimerPromise; + + @Override + public String execute(String input) { + // Create workflow timer (within timer cancel;ation scope so it can be canceled) + // which denotes the max amount of time we allow this execution to run + // Using workflow timer instead of workflow run/execution timeouts allow us to react to this + // timer + // fires, be able to chose if we want to fail or complete execution, and do some "cleanup" + // tasks if + // necessary before we do so. If we used workflow run/execution timeouts insted we would not + // be able + // to react to this timer firing (its server timer only) + timerCancellationScope = + Workflow.newCancellationScope( + () -> { + workflowTimerPromise = + Workflow.newTimer( + Duration.ofSeconds(TIME_SECS), + TimerOptions.newBuilder().setSummary("Workflow Timer").build()) + // We can use thenApply here to cancel our cancelation scope when this timer + // fires. Note we cannot complete the execution from here, see + // https://github.com/temporalio/sdk-java/issues/87 + .thenApply( + ignore -> { + // Cancel the workflow cancellation scope allowing us to react to this + // timer firing + if (workflowCancellationScope != null) { + workflowCancellationScope.cancel("Workflow timer fired"); + } + return null; + }); + }); + timerCancellationScope.run(); + + // Create workflow cancellation scope in which we put our core business logic + workflowCancellationScope = + Workflow.newCancellationScope( + () -> { + WorkflowWithTimerActivities activities = + Workflow.newActivityStub( + WorkflowWithTimerActivities.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(12)) + // Set heartbeat timeout to 1s + .setHeartbeatTimeout(Duration.ofSeconds(2)) + // We want to wait for activity to complete cancellation + .setCancellationType( + ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) + .build()); + + WorkflowWithTimerChildWorkflow childWorkflow = + Workflow.newChildWorkflowStub( + WorkflowWithTimerChildWorkflow.class, + ChildWorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID + "-Child") + // We want to wait for child workflow cancellation completion + .setCancellationType( + ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED) + .build()); + + try { + // Run our activities + workflowResult = activities.sayHello(input); + // Then our child workflow + childWorkflow.executeChild(input); + } catch (ActivityFailure af) { + // Handle cancellation of scope while activities are pending (running) + if (af.getCause() instanceof CanceledFailure) { + workflowResult = "Workflow timer fired while activities were executing."; + // Here we can do more work if needed + } + } catch (ChildWorkflowFailure cwf) { + // Handle cancellation of scope while child workflow is pending (running) + if (cwf.getCause() instanceof CanceledFailure) { + workflowResult = "Workflow timer fired while child workflow was executing."; + // Here we can do more work if needed + } + } + }); + // Run the workflow cancellation scope + // We need to handle CanceledFailure here in case we cancel the scope + // right before activity/child workflows are scheduled + try { + workflowCancellationScope.run(); + } catch (CanceledFailure e) { + workflowResult = "Workflow cancelled."; + } + + // Cancel our workflow timer if it didnt fire + if (!workflowTimerPromise.isCompleted()) { + timerCancellationScope.cancel("Workflow completed before workflow timer."); + } + + return workflowResult; + } + } + + // Activities + @ActivityInterface + public interface WorkflowWithTimerActivities { + String sayHello(String input); + } + + public static class WorkflowWithTimerActivitiesImpl implements WorkflowWithTimerActivities { + @Override + public String sayHello(String input) { + // here we just heartbeat then sleep for 1s + for (int i = 0; i < 10; i++) { + try { + Activity.getExecutionContext().heartbeat("heartbeating: " + i); + } catch (ActivityCompletionException e) { + // Do some cleanup if needed, then re-throw + throw e; + } + sleep(1); + } + return "Hello " + input; + } + + // Just sample sleep method + private void sleep(int seconds) { + try { + Thread.sleep(seconds * 1000L); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } + } + + // Child Workflows + @WorkflowInterface + public interface WorkflowWithTimerChildWorkflow { + @WorkflowMethod + String executeChild(String input); + } + + public static class WorkflowWithTimerChildWorkflowImpl implements WorkflowWithTimerChildWorkflow { + @Override + public String executeChild(String input) { + // For sample we just sleep for 5 seconds and return some result + try { + Workflow.sleep(Duration.ofSeconds(5)); + return "From executeChild - " + input; + // Note that similarly to parent workflow if child is running activities/child workflows + // we need to handle this in same way as parent does + // Fpr sample we can just handle CanceledFailure and rethrow + } catch (CanceledFailure e) { + // Can do cleanup if needed + throw e; + } + } + } + + public static void main(String[] args) { + // Create service stubs + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + // Crete workflow client + WorkflowClient client = WorkflowClient.newInstance(service); + // Create worker factory + WorkerFactory factory = WorkerFactory.newInstance(client); + + // Create worker + Worker worker = factory.newWorker(TASK_QUEUE); + // Register workflow and child workflow + worker.registerWorkflowImplementationTypes( + WorkflowWithTimerImpl.class, WorkflowWithTimerChildWorkflowImpl.class); + // Register activities + worker.registerActivitiesImplementations(new WorkflowWithTimerActivitiesImpl()); + + // Start factory (and worker) + factory.start(); + + // Create workflow stub + WorkflowWithTimer workflow = + client.newWorkflowStub( + WorkflowWithTimer.class, + WorkflowOptions.newBuilder() + // Note we do not set workflow run/execution timeouts + // As its not recommended in most cases + // In same we show how we can implement this with workflow timer instead + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + + // Start workflow execution async + WorkflowClient.start(workflow::execute, "Some Name Here"); + + // Wait for execution to complete (sync) + WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow); + String result = workflowStub.getResult(String.class); + System.out.println("Workflow result: " + result); + + // Stop main method + System.exit(0); + } +} diff --git a/core/src/test/java/io/temporal/samples/hello/HelloWorkflowTimerTest.java b/core/src/test/java/io/temporal/samples/hello/HelloWorkflowTimerTest.java new file mode 100644 index 000000000..7e0954494 --- /dev/null +++ b/core/src/test/java/io/temporal/samples/hello/HelloWorkflowTimerTest.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.hello; + +import io.temporal.client.WorkflowOptions; +import io.temporal.testing.TestWorkflowRule; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class HelloWorkflowTimerTest { + @Rule + public TestWorkflowRule testWorkflowRule = + TestWorkflowRule.newBuilder() + .setWorkflowTypes( + HelloWorkflowTimer.WorkflowWithTimerImpl.class, + HelloWorkflowTimer.WorkflowWithTimerChildWorkflowImpl.class) + .setActivityImplementations(new HelloWorkflowTimer.WorkflowWithTimerActivitiesImpl()) + .build(); + + @Test + public void testWorkflowTimer() { + HelloWorkflowTimer.WorkflowWithTimer workflow = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub( + HelloWorkflowTimer.WorkflowWithTimer.class, + WorkflowOptions.newBuilder() + .setWorkflowId("WorkflowWithTimerTestId") + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .build()); + + String result = workflow.execute("test input"); + Assert.assertEquals("Workflow timer fired while activities were executing.", result); + } +}