From 535488ececbb2e9f880284ac4bca2ffa08b27520 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Mon, 10 Feb 2025 00:23:41 -0500 Subject: [PATCH 1/7] [wip] auto heartbeater sample Signed-off-by: Tihomir Surdilovic --- .../samples/autoheartbeat/AutoActivities.java | 29 ++++++ .../autoheartbeat/AutoActivitiesImpl.java | 87 ++++++++++++++++++ .../autoheartbeat/AutoHeartbeater.java | 89 +++++++++++++++++++ .../samples/autoheartbeat/AutoWorkflow.java | 33 +++++++ .../autoheartbeat/AutoWorkflowImpl.java | 89 +++++++++++++++++++ .../samples/autoheartbeat/Starter.java | 58 ++++++++++++ 6 files changed, 385 insertions(+) create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivities.java create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflow.java create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflowImpl.java create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivities.java b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivities.java new file mode 100644 index 000000000..5b01950f5 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivities.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat; + +import io.temporal.activity.ActivityInterface; + +@ActivityInterface +public interface AutoActivities { + String runActivityOne(String input); + + String runActivityTwo(String input); +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java new file mode 100644 index 000000000..6357be196 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat; + +import io.temporal.activity.Activity; +import io.temporal.client.ActivityCompletionException; +import java.util.concurrent.TimeUnit; + +public class AutoActivitiesImpl implements AutoActivities { + + @Override + public String runActivityOne(String input) { + return runActivity("runActivityOne - " + input); + } + + @Override + public String runActivityTwo(String input) { + return runActivity("runActivityTwo - " + input); + } + + @SuppressWarnings("FutureReturnValueIgnored") + private String runActivity(String input) { + // Calculate heartbeat period based on our heartbeat timeout + // Start Autoheartbeater + AutoHeartbeater autoHearbeater = + new AutoHeartbeater( + getHeartbeatPeriod(), 0, TimeUnit.SECONDS, Activity.getExecutionContext(), input); + autoHearbeater.start(); + + // For sample our activity just sleeps for a second for 20 seconds + for (int i = 0; i < 20; i++) { + try { + sleep(1); + } catch (ActivityCompletionException e) { + System.out.println( + "Activity type:" + + e.getActivityType().get() + + "Activiy id: " + + e.getActivityId().get() + + "Workflow id: " + + e.getWorkflowId().get() + + "Workflow runid: " + + e.getRunId().get() + + " was canceled. Shutting down auto heartbeats"); + autoHearbeater.stop(); + // We want to rethrow the cancel failure + throw e; + } + } + return "Activity completed: " + input; + } + + private long getHeartbeatPeriod() { + // Note you can add checks if heartbeat timeout is set if not and + // decide to log / fail activity / not start autoheartbeater based on your business logic + + // For sample we want to heartbeat 1 seconds less than heartbeat timeout + return Activity.getExecutionContext().getInfo().getHeartbeatTimeout().getSeconds() <= 1 + ? 1 + : Activity.getExecutionContext().getInfo().getHeartbeatTimeout().getSeconds() - 1; + } + + private void sleep(int seconds) { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); + } catch (InterruptedException ee) { + // Empty + } + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java new file mode 100644 index 000000000..a4a0fea82 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat; + +import io.temporal.activity.ActivityExecutionContext; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class AutoHeartbeater { + private final long period; + private final long initialDelay; + private final TimeUnit periodTimeUnit; + private final ScheduledExecutorService timerService = + Executors.newSingleThreadScheduledExecutor(); + private final ActivityExecutionContext context; + private final Object details; + private String heartbeaterId; + + public AutoHeartbeater( + long period, + long initialDelay, + TimeUnit periodTimeUnit, + ActivityExecutionContext context, + Object details) { + this.period = period; + this.initialDelay = initialDelay; + this.periodTimeUnit = periodTimeUnit; + this.context = context; + this.details = details; + // Set to activity id better, for sample we just use type + heartbeaterId = context.getInfo().getActivityType(); + } + + public ScheduledFuture start() { + System.out.println("Autoheartbeater[" + heartbeaterId + "] starting..."); + return timerService.scheduleAtFixedRate( + () -> { + try { + System.out.println( + "Autoheartbeater[" + + heartbeaterId + + "]" + + "heartbeating at: " + + printShortCurrentTime()); + context.heartbeat(details); + } catch (Exception e) { + System.out.println("Stopping: " + e.getMessage()); + stop(); + } + }, + initialDelay, + period, + periodTimeUnit); + } + + public void stop() { + System.out.println("Autoheartbeater being requested to stop."); + // Try not to execute another heartbeat that could have been queued up + timerService.shutdownNow(); + } + + private String printShortCurrentTime() { + return DateTimeFormatter.ofPattern("HH:mm:ss") + .withZone(ZoneId.systemDefault()) + .format(Instant.now()); + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflow.java b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflow.java new file mode 100644 index 000000000..d749afe2f --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflow.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat; + +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface AutoWorkflow { + @WorkflowMethod + String exec(String input); + + @SignalMethod + void cancelActivity(); +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflowImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflowImpl.java new file mode 100644 index 000000000..df9133373 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflowImpl.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat; + +import io.temporal.activity.ActivityOptions; +import io.temporal.failure.ActivityFailure; +import io.temporal.failure.CanceledFailure; +import io.temporal.workflow.Async; +import io.temporal.workflow.CancellationScope; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +public class AutoWorkflowImpl implements AutoWorkflow { + private CancellationScope scope; + + @Override + public String exec(String input) { + // Crete separate workflow stubs for same interface so we can show + // use of different heartbeat timeouts + AutoActivities activitiesOne = + Workflow.newActivityStub( + AutoActivities.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(22)) + .setHeartbeatTimeout(Duration.ofSeconds(3)) + .build()); + + AutoActivities activitiesTwo = + Workflow.newActivityStub( + AutoActivities.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(22)) + .setHeartbeatTimeout(Duration.ofSeconds(5)) + .build()); + + // Start our activity in CancellationScope so we can cancel it if needed + List> activityPromises = new ArrayList<>(); + scope = + Workflow.newCancellationScope( + () -> { + activityPromises.add(Async.function(activitiesOne::runActivityOne, input)); + activityPromises.add(Async.function(activitiesTwo::runActivityTwo, input)); + }); + + scope.run(); + + try { + Promise.allOf(activityPromises).get(); + String result = ""; + for (Promise pr : activityPromises) { + result += pr.get(); + } + return result; + } catch (ActivityFailure e) { + if (e.getCause() instanceof CanceledFailure) { + // We dont want workflow to fail in we canceled our scope, just log and return + return "Workflow result after activity cancellation"; + } else { + // We want to fail execution on any other failures except cancellation + throw e; + } + } + } + + @Override + public void cancelActivity() { + scope.cancel("Canceling scope from signal handler"); + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java new file mode 100644 index 000000000..0e7d799f8 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; + +public class Starter { + static final String TASK_QUEUE = "AutoheartbeatTaskQueue"; + static final String WORKFLOW_ID = "AutoHeartbeatWorkflow"; + + public static void main(String[] args) { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = WorkflowClient.newInstance(service); + WorkerFactory factory = WorkerFactory.newInstance(client); + Worker worker = factory.newWorker(TASK_QUEUE); + + worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class); + worker.registerActivitiesImplementations(new AutoActivitiesImpl()); + + factory.start(); + + AutoWorkflow workflow = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + + try { + String result = workflow.exec("Auto heartbeating is cool"); + System.out.println("Result: " + result); + } catch (Exception e) { + System.out.println("Workflow exec exception: " + e.getClass().getName()); + } + } +} From f6f3ce1c4c2e9f47454bc3a7099bc23af593ce78 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Mon, 10 Feb 2025 00:27:47 -0500 Subject: [PATCH 2/7] update to info Signed-off-by: Tihomir Surdilovic --- .../io/temporal/samples/autoheartbeat/AutoHeartbeater.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java index a4a0fea82..5da521269 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java @@ -66,7 +66,7 @@ public ScheduledFuture start() { + printShortCurrentTime()); context.heartbeat(details); } catch (Exception e) { - System.out.println("Stopping: " + e.getMessage()); + System.out.println("Stopping Autoheartbeater[" + heartbeaterId + "]: " + e.getMessage()); stop(); } }, @@ -76,7 +76,7 @@ public ScheduledFuture start() { } public void stop() { - System.out.println("Autoheartbeater being requested to stop."); + System.out.println("Autoheartbeater[" + heartbeaterId + "] being requested to stop."); // Try not to execute another heartbeat that could have been queued up timerService.shutdownNow(); } From a232e1e310b5e41ce4014f0adacc56f9e4e39d65 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Mon, 10 Feb 2025 00:47:11 -0500 Subject: [PATCH 3/7] updated comments Signed-off-by: Tihomir Surdilovic --- .../io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java index 6357be196..6357e158e 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java @@ -37,7 +37,6 @@ public String runActivityTwo(String input) { @SuppressWarnings("FutureReturnValueIgnored") private String runActivity(String input) { - // Calculate heartbeat period based on our heartbeat timeout // Start Autoheartbeater AutoHeartbeater autoHearbeater = new AutoHeartbeater( From c4b02d40d58d8fa6baba5005431a2ab8e5d26380 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Mon, 10 Feb 2025 00:59:50 -0500 Subject: [PATCH 4/7] fixed formatting Signed-off-by: Tihomir Surdilovic --- .../io/temporal/samples/autoheartbeat/AutoHeartbeater.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java index 5da521269..e5cffe87c 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java @@ -66,7 +66,8 @@ public ScheduledFuture start() { + printShortCurrentTime()); context.heartbeat(details); } catch (Exception e) { - System.out.println("Stopping Autoheartbeater[" + heartbeaterId + "]: " + e.getMessage()); + System.out.println( + "Stopping Autoheartbeater[" + heartbeaterId + "]: " + e.getMessage()); stop(); } }, From f7aba205e30fe7314b56260001f41198c63d8c3f Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Mon, 10 Feb 2025 08:53:10 -0500 Subject: [PATCH 5/7] refactor to use interceptor Signed-off-by: Tihomir Surdilovic --- ...eartbeater.java => AutoHeartbeatUtil.java} | 4 +- .../temporal/samples/autoheartbeat/README.md | 0 .../samples/autoheartbeat/Starter.java | 16 +++- .../{ => activities}/AutoActivities.java | 2 +- .../{ => activities}/AutoActivitiesImpl.java | 29 ++----- ...rtbeatActivityInboundCallsInterceptor.java | 85 +++++++++++++++++++ .../AutoHeartbeatWorkerInterceptor.java | 30 +++++++ .../{ => workflows}/AutoWorkflow.java | 2 +- .../{ => workflows}/AutoWorkflowImpl.java | 9 +- 9 files changed, 144 insertions(+), 33 deletions(-) rename core/src/main/java/io/temporal/samples/autoheartbeat/{AutoHeartbeater.java => AutoHeartbeatUtil.java} (97%) create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/README.md rename core/src/main/java/io/temporal/samples/autoheartbeat/{ => activities}/AutoActivities.java (94%) rename core/src/main/java/io/temporal/samples/autoheartbeat/{ => activities}/AutoActivitiesImpl.java (63%) create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java rename core/src/main/java/io/temporal/samples/autoheartbeat/{ => workflows}/AutoWorkflow.java (95%) rename core/src/main/java/io/temporal/samples/autoheartbeat/{ => workflows}/AutoWorkflowImpl.java (94%) diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java similarity index 97% rename from core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java rename to core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java index e5cffe87c..1bda80e85 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java @@ -28,7 +28,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -public class AutoHeartbeater { +public class AutoHeartbeatUtil { private final long period; private final long initialDelay; private final TimeUnit periodTimeUnit; @@ -38,7 +38,7 @@ public class AutoHeartbeater { private final Object details; private String heartbeaterId; - public AutoHeartbeater( + public AutoHeartbeatUtil( long period, long initialDelay, TimeUnit periodTimeUnit, diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/README.md b/core/src/main/java/io/temporal/samples/autoheartbeat/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java index 0e7d799f8..a488bcb6f 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java @@ -21,9 +21,14 @@ import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; +import io.temporal.samples.autoheartbeat.activities.AutoActivitiesImpl; +import io.temporal.samples.autoheartbeat.interceptor.AutoHeartbeatWorkerInterceptor; +import io.temporal.samples.autoheartbeat.workflows.AutoWorkflow; +import io.temporal.samples.autoheartbeat.workflows.AutoWorkflowImpl; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerFactoryOptions; public class Starter { static final String TASK_QUEUE = "AutoheartbeatTaskQueue"; @@ -32,7 +37,16 @@ public class Starter { public static void main(String[] args) { WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); WorkflowClient client = WorkflowClient.newInstance(service); - WorkerFactory factory = WorkerFactory.newInstance(client); + + // Configure our auto heartbeat workflow interceptor which will apply + // AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat + // timeout configured + WorkerFactoryOptions wfo = + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor()) + .build(); + + WorkerFactory factory = WorkerFactory.newInstance(client, wfo); Worker worker = factory.newWorker(TASK_QUEUE); worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class); diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivities.java b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java similarity index 94% rename from core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivities.java rename to core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java index 5b01950f5..81726e05a 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivities.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java @@ -17,7 +17,7 @@ * permissions and limitations under the License. */ -package io.temporal.samples.autoheartbeat; +package io.temporal.samples.autoheartbeat.activities; import io.temporal.activity.ActivityInterface; diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java similarity index 63% rename from core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java rename to core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java index 6357e158e..5a9507ab0 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java @@ -17,9 +17,8 @@ * permissions and limitations under the License. */ -package io.temporal.samples.autoheartbeat; +package io.temporal.samples.autoheartbeat.activities; -import io.temporal.activity.Activity; import io.temporal.client.ActivityCompletionException; import java.util.concurrent.TimeUnit; @@ -27,24 +26,17 @@ public class AutoActivitiesImpl implements AutoActivities { @Override public String runActivityOne(String input) { - return runActivity("runActivityOne - " + input); + return runActivity("runActivityOne - " + input, 20); } @Override public String runActivityTwo(String input) { - return runActivity("runActivityTwo - " + input); + return runActivity("runActivityTwo - " + input, 10); } @SuppressWarnings("FutureReturnValueIgnored") - private String runActivity(String input) { - // Start Autoheartbeater - AutoHeartbeater autoHearbeater = - new AutoHeartbeater( - getHeartbeatPeriod(), 0, TimeUnit.SECONDS, Activity.getExecutionContext(), input); - autoHearbeater.start(); - - // For sample our activity just sleeps for a second for 20 seconds - for (int i = 0; i < 20; i++) { + private String runActivity(String input, int seconds) { + for (int i = 0; i < seconds; i++) { try { sleep(1); } catch (ActivityCompletionException e) { @@ -58,7 +50,6 @@ private String runActivity(String input) { + "Workflow runid: " + e.getRunId().get() + " was canceled. Shutting down auto heartbeats"); - autoHearbeater.stop(); // We want to rethrow the cancel failure throw e; } @@ -66,16 +57,6 @@ private String runActivity(String input) { return "Activity completed: " + input; } - private long getHeartbeatPeriod() { - // Note you can add checks if heartbeat timeout is set if not and - // decide to log / fail activity / not start autoheartbeater based on your business logic - - // For sample we want to heartbeat 1 seconds less than heartbeat timeout - return Activity.getExecutionContext().getInfo().getHeartbeatTimeout().getSeconds() <= 1 - ? 1 - : Activity.getExecutionContext().getInfo().getHeartbeatTimeout().getSeconds() - 1; - } - private void sleep(int seconds) { try { Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java new file mode 100644 index 000000000..58d557366 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.interceptor; + +import io.temporal.activity.ActivityExecutionContext; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptorBase; +import io.temporal.samples.autoheartbeat.AutoHeartbeatUtil; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +public class AutoHeartbeatActivityInboundCallsInterceptor + extends ActivityInboundCallsInterceptorBase { + private ActivityExecutionContext activityExecutionContext; + private Duration activityHeartbeatTimeout; + + public AutoHeartbeatActivityInboundCallsInterceptor(ActivityInboundCallsInterceptor next) { + super(next); + } + + @Override + public void init(ActivityExecutionContext context) { + this.activityExecutionContext = context; + activityHeartbeatTimeout = activityExecutionContext.getInfo().getHeartbeatTimeout(); + super.init(context); + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public ActivityOutput execute(ActivityInput input) { + // If activity has heartbeat timeout defined we want to apply auto-heartbeter + AutoHeartbeatUtil autoHearbeater = null; + if (activityHeartbeatTimeout != null) { + System.out.println( + "Auto heartbeating applied for activity: " + + activityExecutionContext.getInfo().getActivityType()); + autoHearbeater = + new AutoHeartbeatUtil( + getHeartbeatPeriod(activityHeartbeatTimeout), + 0, + TimeUnit.SECONDS, + activityExecutionContext, + input); + autoHearbeater.start(); + } else { + System.out.println( + "Auto heartbeating not being applied for activity: " + + activityExecutionContext.getInfo().getActivityType()); + } + + try { + return super.execute(input); + } catch (Exception e) { + throw e; + } finally { + if (autoHearbeater != null) { + autoHearbeater.stop(); + } + } + } + + private long getHeartbeatPeriod(Duration activityHeartbeatTimeout) { + // For sample we want to heartbeat 1 seconds less than heartbeat timeout + return activityHeartbeatTimeout.getSeconds() <= 1 + ? 1 + : activityHeartbeatTimeout.getSeconds() - 1; + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java new file mode 100644 index 000000000..9816fe644 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.interceptor; + +import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.WorkerInterceptorBase; + +public class AutoHeartbeatWorkerInterceptor extends WorkerInterceptorBase { + @Override + public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) { + return new AutoHeartbeatActivityInboundCallsInterceptor(next); + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflow.java b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java similarity index 95% rename from core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflow.java rename to core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java index d749afe2f..e8816c6a2 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflow.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java @@ -17,7 +17,7 @@ * permissions and limitations under the License. */ -package io.temporal.samples.autoheartbeat; +package io.temporal.samples.autoheartbeat.workflows; import io.temporal.workflow.SignalMethod; import io.temporal.workflow.WorkflowInterface; diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflowImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java similarity index 94% rename from core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflowImpl.java rename to core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java index df9133373..88b6969cb 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflowImpl.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java @@ -17,11 +17,12 @@ * permissions and limitations under the License. */ -package io.temporal.samples.autoheartbeat; +package io.temporal.samples.autoheartbeat.workflows; import io.temporal.activity.ActivityOptions; import io.temporal.failure.ActivityFailure; import io.temporal.failure.CanceledFailure; +import io.temporal.samples.autoheartbeat.activities.AutoActivities; import io.temporal.workflow.Async; import io.temporal.workflow.CancellationScope; import io.temporal.workflow.Promise; @@ -42,15 +43,15 @@ public String exec(String input) { AutoActivities.class, ActivityOptions.newBuilder() .setStartToCloseTimeout(Duration.ofSeconds(22)) - .setHeartbeatTimeout(Duration.ofSeconds(3)) + .setHeartbeatTimeout(Duration.ofSeconds(5)) .build()); AutoActivities activitiesTwo = Workflow.newActivityStub( AutoActivities.class, ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofSeconds(22)) - .setHeartbeatTimeout(Duration.ofSeconds(5)) + .setStartToCloseTimeout(Duration.ofSeconds(12)) + .setHeartbeatTimeout(Duration.ofSeconds(3)) .build()); // Start our activity in CancellationScope so we can cancel it if needed From 466594e24f2e1566a081de80e0c158e34bc89f30 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Mon, 10 Feb 2025 09:10:44 -0500 Subject: [PATCH 6/7] fix check for heartbeat timeout set Signed-off-by: Tihomir Surdilovic --- .../autoheartbeat/activities/AutoActivities.java | 2 ++ .../autoheartbeat/activities/AutoActivitiesImpl.java | 5 +++++ .../AutoHeartbeatActivityInboundCallsInterceptor.java | 2 +- .../autoheartbeat/workflows/AutoWorkflowImpl.java | 11 ++++++++++- 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java index 81726e05a..1ea9a52ae 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java @@ -26,4 +26,6 @@ public interface AutoActivities { String runActivityOne(String input); String runActivityTwo(String input); + + String runActivityThree(String input); } diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java index 5a9507ab0..10883bca7 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java @@ -34,6 +34,11 @@ public String runActivityTwo(String input) { return runActivity("runActivityTwo - " + input, 10); } + @Override + public String runActivityThree(String input) { + return runActivity("runActivityThree - " + input, 3); + } + @SuppressWarnings("FutureReturnValueIgnored") private String runActivity(String input, int seconds) { for (int i = 0; i < seconds; i++) { diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java index 58d557366..036f09b86 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java @@ -47,7 +47,7 @@ public void init(ActivityExecutionContext context) { public ActivityOutput execute(ActivityInput input) { // If activity has heartbeat timeout defined we want to apply auto-heartbeter AutoHeartbeatUtil autoHearbeater = null; - if (activityHeartbeatTimeout != null) { + if (activityHeartbeatTimeout != null && activityHeartbeatTimeout.getSeconds() > 0) { System.out.println( "Auto heartbeating applied for activity: " + activityExecutionContext.getInfo().getActivityType()); diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java index 88b6969cb..52337ca41 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java @@ -37,7 +37,9 @@ public class AutoWorkflowImpl implements AutoWorkflow { @Override public String exec(String input) { // Crete separate workflow stubs for same interface so we can show - // use of different heartbeat timeouts + // use of different heartbeat timeouts and activit that does not heartbeat + // Note you can do this also via WorkflowImplementationOptions instead of using different + // activity stubs if you wanted AutoActivities activitiesOne = Workflow.newActivityStub( AutoActivities.class, @@ -54,6 +56,12 @@ public String exec(String input) { .setHeartbeatTimeout(Duration.ofSeconds(3)) .build()); + // Activity three does not heartbeat so autoheartbeat should not be applied to it + AutoActivities activitiesThree = + Workflow.newActivityStub( + AutoActivities.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(5)).build()); + // Start our activity in CancellationScope so we can cancel it if needed List> activityPromises = new ArrayList<>(); scope = @@ -61,6 +69,7 @@ public String exec(String input) { () -> { activityPromises.add(Async.function(activitiesOne::runActivityOne, input)); activityPromises.add(Async.function(activitiesTwo::runActivityTwo, input)); + activityPromises.add(Async.function(activitiesThree::runActivityThree, input)); }); scope.run(); From 997afc6c1fd85607b13be3713114859c83e1dbce Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Mon, 10 Feb 2025 11:56:11 -0500 Subject: [PATCH 7/7] add readme Signed-off-by: Tihomir Surdilovic --- .../io/temporal/samples/autoheartbeat/README.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/README.md b/core/src/main/java/io/temporal/samples/autoheartbeat/README.md index e69de29bb..eeebd93b2 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/README.md +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/README.md @@ -0,0 +1,15 @@ +# Auto-heartbeating sample for activities that define HeartbeatTimeout + +This sample shows an implementation of an "auto-heartbeating" utility that can be applied via interceptor +to all activities where you define HeartbeatTimeout. Use case where this can be helpful include +situations where you have long-running activities where you want to heartbeat but its difficult +to explicitly call heartbeat api in activity code directly. + +1. Start the Sample: +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.autoheartbeat.Starter +``` + +The sample workflow starts three activities async, two of which define heartbeat timeout. +Activity interceptor in this sample applies the auto-heartbeating util to the two that define heartbeat timeout +and auto-heartbeats at its HeartbeatTimeout - 1s intervals. \ No newline at end of file