From d9b43cb26e367ef8b9f930dfe0586ff1c91fe566 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Fri, 20 Jun 2025 00:39:40 -0400 Subject: [PATCH 1/4] Sample: Auto Heartbeat via Interceptor Signed-off-by: Tihomir Surdilovic --- .../autoheartbeat/AutoHeartbeatUtil.java | 87 ++++++++++++++ .../temporal/samples/autoheartbeat/README.md | 16 +++ .../samples/autoheartbeat/Starter.java | 103 +++++++++++++++++ .../activities/AutoActivities.java | 29 +++++ .../activities/AutoActivitiesImpl.java | 51 +++++++++ ...rtbeatActivityInboundCallsInterceptor.java | 106 ++++++++++++++++++ .../AutoHeartbeatWorkerInterceptor.java | 30 +++++ .../autoheartbeat/workflows/AutoWorkflow.java | 33 ++++++ .../workflows/AutoWorkflowImpl.java | 79 +++++++++++++ 9 files changed, 534 insertions(+) create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/README.md create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java new file mode 100644 index 000000000..ca2b0a5f4 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat; + +import io.temporal.activity.ActivityExecutionContext; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class AutoHeartbeatUtil { + private final long period; + private final long initialDelay; + private final TimeUnit periodTimeUnit; + private final ScheduledExecutorService timerService = + Executors.newSingleThreadScheduledExecutor(); + private final ActivityExecutionContext context; + private final Object details; + private String heartbeaterId; + + public AutoHeartbeatUtil( + long period, + long initialDelay, + TimeUnit periodTimeUnit, + ActivityExecutionContext context, + Object details) { + this.period = period; + this.initialDelay = initialDelay; + this.periodTimeUnit = periodTimeUnit; + this.context = context; + this.details = details; + // Set to activity id better, for sample we just use type + heartbeaterId = context.getInfo().getActivityType(); + } + + public ScheduledFuture start() { + System.out.println("Autoheartbeater[" + heartbeaterId + "] starting..."); + return timerService.scheduleAtFixedRate( + () -> { + // try { + System.out.println( + "Autoheartbeater[" + + heartbeaterId + + "]" + + "heartbeating at: " + + printShortCurrentTime()); + context.heartbeat(details); + }, + initialDelay, + period, + periodTimeUnit); + } + + public void stop() { + System.out.println("Autoheartbeater[" + heartbeaterId + "] being requested to stop."); + // Try not to execute another heartbeat that could have been queued up + // Note this can at times take a second or two so make sure to test this out on your workers + // So can set best heartbeat timeout (sometimes might need larger value to accomodate) + timerService.shutdownNow(); + } + + private String printShortCurrentTime() { + return DateTimeFormatter.ofPattern("HH:mm:ss") + .withZone(ZoneId.systemDefault()) + .format(Instant.now()); + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/README.md b/core/src/main/java/io/temporal/samples/autoheartbeat/README.md new file mode 100644 index 000000000..1dd8c5e40 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/README.md @@ -0,0 +1,16 @@ +# Auto-heartbeating sample for activities that define HeartbeatTimeout + +This sample shows an implementation of an "auto-heartbeating" utility that can be applied via interceptor to all +activities where you define HeartbeatTimeout. Use case where this can be helpful include situations where you have +long-running activities where you want to heartbeat but its difficult to explicitly call heartbeat api in activity code +directly. +Another useful scenario for this is where you have activity that at times can complete in very short amount of time, +but then at times can take for example minutes. In this case you have to set longer StartToClose timeout +but you might not want first heartbeat to be sent right away but send it after the "shorter" duration of activity +execution. + +1. Start the Sample: + +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.autoheartbeat.Starter +``` \ No newline at end of file diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java new file mode 100644 index 000000000..9ac265bba --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.samples.autoheartbeat.activities.AutoActivitiesImpl; +import io.temporal.samples.autoheartbeat.interceptor.AutoHeartbeatWorkerInterceptor; +import io.temporal.samples.autoheartbeat.workflows.AutoWorkflow; +import io.temporal.samples.autoheartbeat.workflows.AutoWorkflowImpl; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerFactoryOptions; + +public class Starter { + static final String TASK_QUEUE = "AutoheartbeatTaskQueue"; + static final String WORKFLOW_ID = "AutoHeartbeatWorkflow"; + + public static void main(String[] args) { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = WorkflowClient.newInstance(service); + + // Configure our auto heartbeat workflow interceptor which will apply + // AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat + // timeout configured + WorkerFactoryOptions wfo = + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor()) + .build(); + + WorkerFactory factory = WorkerFactory.newInstance(client, wfo); + Worker worker = factory.newWorker(TASK_QUEUE); + + worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class); + worker.registerActivitiesImplementations(new AutoActivitiesImpl()); + + factory.start(); + + System.out.println("**** First Run: run workflow to completion"); + + AutoWorkflow firstRun = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + + try { + String firstRunResult = firstRun.exec("Auto heartbeating is cool"); + System.out.println("First run result: " + firstRunResult); + } catch (Exception e) { + System.out.println("First run - Workflow exec exception: " + e.getClass().getName()); + } + + System.out.println("\n\n**** Second Run: cancel activities"); + + AutoWorkflow secondRun = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + WorkflowClient.start(secondRun::exec, "Auto heartbeating is cool"); + doSleeps(4); + secondRun.cancelActivity(); + + try { + String secondRunResult = WorkflowStub.fromTyped(secondRun).getResult(String.class); + System.out.println("Second run result: " + secondRunResult); + } catch (Exception e) { + System.out.println("Second run - Workflow exec exception: " + e.getClass().getName()); + } + } + + private static void doSleeps(int seconds) { + try { + Thread.sleep(seconds * 1000L); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java new file mode 100644 index 000000000..81726e05a --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.activities; + +import io.temporal.activity.ActivityInterface; + +@ActivityInterface +public interface AutoActivities { + String runActivityOne(String input); + + String runActivityTwo(String input); +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java new file mode 100644 index 000000000..26a30162d --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.activities; + +import java.util.concurrent.TimeUnit; + +public class AutoActivitiesImpl implements AutoActivities { + + @Override + public String runActivityOne(String input) { + return runActivity("runActivityOne - " + input, 10); + } + + @Override + public String runActivityTwo(String input) { + return runActivity("runActivityTwo - " + input, 5); + } + + @SuppressWarnings("FutureReturnValueIgnored") + private String runActivity(String input, int seconds) { + for (int i = 0; i < seconds; i++) { + sleep(1); + } + return "Activity completed: " + input; + } + + private void sleep(int seconds) { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); + } catch (InterruptedException ee) { + // Empty + } + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java new file mode 100644 index 000000000..5ca4006a0 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.interceptor; + +import io.temporal.activity.Activity; +import io.temporal.activity.ActivityExecutionContext; +import io.temporal.client.ActivityCanceledException; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptorBase; +import io.temporal.samples.autoheartbeat.AutoHeartbeatUtil; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class AutoHeartbeatActivityInboundCallsInterceptor + extends ActivityInboundCallsInterceptorBase { + private ActivityExecutionContext activityExecutionContext; + private Duration activityHeartbeatTimeout; + private AutoHeartbeatUtil autoHeartbeater; + // private CompletableFuture autoHeartbeatFuture; + private ScheduledFuture scheduledFuture; + + // private ScheduledFuture scheduledFuture; + + public AutoHeartbeatActivityInboundCallsInterceptor(ActivityInboundCallsInterceptor next) { + super(next); + } + + @Override + public void init(ActivityExecutionContext context) { + this.activityExecutionContext = context; + activityHeartbeatTimeout = activityExecutionContext.getInfo().getHeartbeatTimeout(); + super.init(context); + } + + @Override + @SuppressWarnings({"FutureReturnValueIgnored", "CatchAndPrintStackTrace"}) + public ActivityOutput execute(ActivityInput input) { + // If activity has heartbeat timeout defined we want to apply auto-heartbeter + if (activityHeartbeatTimeout != null && activityHeartbeatTimeout.getSeconds() > 0) { + System.out.println( + "Auto heartbeating applied for activity: " + + activityExecutionContext.getInfo().getActivityType()); + autoHeartbeater = + new AutoHeartbeatUtil(2, 0, TimeUnit.SECONDS, activityExecutionContext, input); + scheduledFuture = autoHeartbeater.start(); + } else { + System.out.println( + "Auto heartbeating not being applied for activity: " + + activityExecutionContext.getInfo().getActivityType()); + } + + if (scheduledFuture != null) { + CompletableFuture activityExecFuture = + CompletableFuture.supplyAsync(() -> super.execute(input)); + CompletableFuture autoHeartbeatFuture = + CompletableFuture.supplyAsync( + () -> { + try { + return scheduledFuture.get(); + } catch (Exception e) { + throw new ActivityCanceledException(activityExecutionContext.getInfo()); + } + }); + try { + CompletableFuture.anyOf(autoHeartbeatFuture, activityExecFuture).get(); + } catch (Exception e) { + if (e instanceof ExecutionException) { + ExecutionException ee = (ExecutionException) e; + if (ee.getCause() instanceof ActivityCanceledException) { + throw new ActivityCanceledException(activityExecutionContext.getInfo()); + } + } + throw Activity.wrap(e); + } finally { + if (autoHeartbeater != null) { + autoHeartbeater.stop(); + } + } + } + return super.execute(input); + } + + public interface AutoHeartbeaterCancellationCallback { + void handle(Exception e); + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java new file mode 100644 index 000000000..9816fe644 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.interceptor; + +import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.WorkerInterceptorBase; + +public class AutoHeartbeatWorkerInterceptor extends WorkerInterceptorBase { + @Override + public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) { + return new AutoHeartbeatActivityInboundCallsInterceptor(next); + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java new file mode 100644 index 000000000..e8816c6a2 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.workflows; + +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface AutoWorkflow { + @WorkflowMethod + String exec(String input); + + @SignalMethod + void cancelActivity(); +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java new file mode 100644 index 000000000..30cf41c8f --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.workflows; + +import io.temporal.activity.ActivityCancellationType; +import io.temporal.activity.ActivityOptions; +import io.temporal.failure.ActivityFailure; +import io.temporal.failure.CanceledFailure; +import io.temporal.samples.autoheartbeat.activities.AutoActivities; +import io.temporal.workflow.CancellationScope; +import io.temporal.workflow.Workflow; +import java.time.Duration; + +public class AutoWorkflowImpl implements AutoWorkflow { + private CancellationScope scope; + + @Override + public String exec(String input) { + AutoActivities activitiesOne = + Workflow.newActivityStub( + AutoActivities.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(22)) + .setHeartbeatTimeout(Duration.ofSeconds(8)) + .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) + .build()); + + AutoActivities activitiesTwo = + Workflow.newActivityStub( + AutoActivities.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(20)) + .setHeartbeatTimeout(Duration.ofSeconds(7)) + .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) + .build()); + + // Start our activity in CancellationScope so we can cancel it if needed + scope = + Workflow.newCancellationScope( + () -> { + activitiesOne.runActivityOne(input); + activitiesTwo.runActivityTwo(input); + }); + + try { + scope.run(); + } catch (ActivityFailure e) { + if (e.getCause() instanceof CanceledFailure) { + // We dont want workflow to fail in we canceled our scope, just log and return + return "Workflow result after activity cancellation"; + } else { + throw e; + } + } + return "completed"; + } + + @Override + public void cancelActivity() { + scope.cancel("Canceling scope from signal handler"); + } +} From e5eaab57c70644b8fd125bac0cccaf85e6db746e Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Wed, 25 Jun 2025 15:46:11 -0400 Subject: [PATCH 2/4] updates Signed-off-by: Tihomir Surdilovic --- .../samples/autoheartbeat/Starter.java | 18 ++++++++++++++++++ ...artbeatActivityInboundCallsInterceptor.java | 14 ++++++++------ .../workflows/AutoWorkflowImpl.java | 9 +++++++++ 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java index 9ac265bba..cc324c460 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java @@ -91,6 +91,24 @@ public static void main(String[] args) { } catch (Exception e) { System.out.println("Second run - Workflow exec exception: " + e.getClass().getName()); } + + System.out.println("\n\n**** Third Run: cause heartbeat timeout"); + // we disable autoheartbeat via env var + System.setProperty("sample.disableAutoHeartbeat", "true"); + AutoWorkflow thirdRun = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + + try { + String thirdRunResult = thirdRun.exec("Auto heartbeating is cool"); + System.out.println("Third run result: " + thirdRunResult); + } catch (Exception e) { + System.out.println("Third run - Workflow exec exception: " + e.getClass().getName()); + } } private static void doSleeps(int seconds) { diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java index 5ca4006a0..60a2d5427 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java @@ -36,11 +36,8 @@ public class AutoHeartbeatActivityInboundCallsInterceptor private ActivityExecutionContext activityExecutionContext; private Duration activityHeartbeatTimeout; private AutoHeartbeatUtil autoHeartbeater; - // private CompletableFuture autoHeartbeatFuture; private ScheduledFuture scheduledFuture; - // private ScheduledFuture scheduledFuture; - public AutoHeartbeatActivityInboundCallsInterceptor(ActivityInboundCallsInterceptor next) { super(next); } @@ -56,7 +53,10 @@ public void init(ActivityExecutionContext context) { @SuppressWarnings({"FutureReturnValueIgnored", "CatchAndPrintStackTrace"}) public ActivityOutput execute(ActivityInput input) { // If activity has heartbeat timeout defined we want to apply auto-heartbeter - if (activityHeartbeatTimeout != null && activityHeartbeatTimeout.getSeconds() > 0) { + // Unless we explicitly disabled autoheartbeating via system property + if (activityHeartbeatTimeout != null + && activityHeartbeatTimeout.getSeconds() > 0 + && !Boolean.parseBoolean(System.getProperty("sample.disableAutoHeartbeat"))) { System.out.println( "Auto heartbeating applied for activity: " + activityExecutionContext.getInfo().getActivityType()); @@ -82,7 +82,8 @@ public ActivityOutput execute(ActivityInput input) { } }); try { - CompletableFuture.anyOf(autoHeartbeatFuture, activityExecFuture).get(); + return (ActivityOutput) + CompletableFuture.anyOf(autoHeartbeatFuture, activityExecFuture).get(); } catch (Exception e) { if (e instanceof ExecutionException) { ExecutionException ee = (ExecutionException) e; @@ -96,8 +97,9 @@ public ActivityOutput execute(ActivityInput input) { autoHeartbeater.stop(); } } + } else { + return super.execute(input); } - return super.execute(input); } public interface AutoHeartbeaterCancellationCallback { diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java index 30cf41c8f..6ff8100eb 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java @@ -21,8 +21,10 @@ import io.temporal.activity.ActivityCancellationType; import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; import io.temporal.failure.ActivityFailure; import io.temporal.failure.CanceledFailure; +import io.temporal.failure.TimeoutFailure; import io.temporal.samples.autoheartbeat.activities.AutoActivities; import io.temporal.workflow.CancellationScope; import io.temporal.workflow.Workflow; @@ -40,6 +42,8 @@ public String exec(String input) { .setStartToCloseTimeout(Duration.ofSeconds(22)) .setHeartbeatTimeout(Duration.ofSeconds(8)) .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) + // for sample purposes + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(3).build()) .build()); AutoActivities activitiesTwo = @@ -49,6 +53,8 @@ public String exec(String input) { .setStartToCloseTimeout(Duration.ofSeconds(20)) .setHeartbeatTimeout(Duration.ofSeconds(7)) .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) + // for sample purposes + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(3).build()) .build()); // Start our activity in CancellationScope so we can cancel it if needed @@ -65,6 +71,9 @@ public String exec(String input) { if (e.getCause() instanceof CanceledFailure) { // We dont want workflow to fail in we canceled our scope, just log and return return "Workflow result after activity cancellation"; + } else if (e.getCause() instanceof TimeoutFailure) { + return "Workflow result after activity timeout of type: " + + ((TimeoutFailure) e.getCause()).getTimeoutType().name(); } else { throw e; } From e798802ef6919ec5ef7a498689e8a92d5f98f172 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Tue, 1 Jul 2025 11:57:01 -0400 Subject: [PATCH 3/4] updates per recommendations Signed-off-by: Tihomir Surdilovic --- README.md | 1 + .../temporal/samples/autoheartbeat/README.md | 5 + .../samples/autoheartbeat/Starter.java | 204 +++++++++++------- 3 files changed, 131 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index b04d1f31e..07d381d88 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ See the README.md file in each main sample directory for cut/paste Gradle comman - [**HelloDelayedStart**](/core/src/main/java/io/temporal/samples/hello/HelloDelayedStart.java): Demonstrates how to use delayed start config option when starting a Workflow Executions. - [**HelloSignalWithTimer**](/core/src/main/java/io/temporal/samples/hello/HelloSignalWithTimer.java): Demonstrates how to use collect signals for certain amount of time and then process last one. - [**HelloWorkflowTimer**](/core/src/main/java/io/temporal/samples/hello/HelloWorkflowTimer.java): Demonstrates how we can use workflow timer to restrict duration of workflow execution instead of workflow run/execution timeouts. + - [**Auto-Heartbeating**](/core/src/main/java/io/temporal/samples/autoheartbeat/): Demonstrates use of Auto-heartbeating utility via activity interceptor. #### Scenario-based samples diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/README.md b/core/src/main/java/io/temporal/samples/autoheartbeat/README.md index 1dd8c5e40..946e24fc5 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/README.md +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/README.md @@ -9,6 +9,11 @@ but then at times can take for example minutes. In this case you have to set lon but you might not want first heartbeat to be sent right away but send it after the "shorter" duration of activity execution. +Warning: make sure to test this sample for your use case. This includes load testing. This sample was not +tested on large scale workloads. In addition note that it is recommended to heartbeat from activity code itself. Using +this type of autoheartbeating utility does have disatvantage that activity code itself can continue running after +a handled activity cancelation. Please be aware of these warnings when applying this sample. + 1. Start the Sample: ```bash diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java index cc324c460..956eab29c 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java @@ -22,6 +22,7 @@ import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; +import io.temporal.failure.CanceledFailure; import io.temporal.samples.autoheartbeat.activities.AutoActivitiesImpl; import io.temporal.samples.autoheartbeat.interceptor.AutoHeartbeatWorkerInterceptor; import io.temporal.samples.autoheartbeat.workflows.AutoWorkflow; @@ -32,90 +33,135 @@ import io.temporal.worker.WorkerFactoryOptions; public class Starter { - static final String TASK_QUEUE = "AutoheartbeatTaskQueue"; - static final String WORKFLOW_ID = "AutoHeartbeatWorkflow"; - - public static void main(String[] args) { - WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); - WorkflowClient client = WorkflowClient.newInstance(service); - - // Configure our auto heartbeat workflow interceptor which will apply - // AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat - // timeout configured - WorkerFactoryOptions wfo = - WorkerFactoryOptions.newBuilder() - .setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor()) - .build(); - - WorkerFactory factory = WorkerFactory.newInstance(client, wfo); - Worker worker = factory.newWorker(TASK_QUEUE); - - worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class); - worker.registerActivitiesImplementations(new AutoActivitiesImpl()); - - factory.start(); - - System.out.println("**** First Run: run workflow to completion"); - - AutoWorkflow firstRun = - client.newWorkflowStub( - AutoWorkflow.class, - WorkflowOptions.newBuilder() - .setWorkflowId(WORKFLOW_ID) - .setTaskQueue(TASK_QUEUE) - .build()); - - try { - String firstRunResult = firstRun.exec("Auto heartbeating is cool"); - System.out.println("First run result: " + firstRunResult); - } catch (Exception e) { - System.out.println("First run - Workflow exec exception: " + e.getClass().getName()); + static final String TASK_QUEUE = "AutoheartbeatTaskQueue"; + static final String WORKFLOW_ID = "AutoHeartbeatWorkflow"; + + public static void main(String[] args) { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = WorkflowClient.newInstance(service); + + // Configure our auto heartbeat workflow interceptor which will apply + // AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat + // timeout configured + WorkerFactoryOptions wfo = + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor()) + .build(); + + WorkerFactory factory = WorkerFactory.newInstance(client, wfo); + Worker worker = factory.newWorker(TASK_QUEUE); + + worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class); + worker.registerActivitiesImplementations(new AutoActivitiesImpl()); + + factory.start(); + + // first run completes execution with autoheartbeat utils + firstRun(client); + // second run cancels running (pending) activity via signal (specific scope cancel) + secondRun(client); + // third run cancels running execution which cancels activity as well + thirdRun(client); + // fourth run turns off autoheartbeat for activities and lets activity time out on heartbeat + // timeout + fourthRun(client); + + System.exit(0); } - System.out.println("\n\n**** Second Run: cancel activities"); - - AutoWorkflow secondRun = - client.newWorkflowStub( - AutoWorkflow.class, - WorkflowOptions.newBuilder() - .setWorkflowId(WORKFLOW_ID) - .setTaskQueue(TASK_QUEUE) - .build()); - WorkflowClient.start(secondRun::exec, "Auto heartbeating is cool"); - doSleeps(4); - secondRun.cancelActivity(); - - try { - String secondRunResult = WorkflowStub.fromTyped(secondRun).getResult(String.class); - System.out.println("Second run result: " + secondRunResult); - } catch (Exception e) { - System.out.println("Second run - Workflow exec exception: " + e.getClass().getName()); + @SuppressWarnings("unused") + private static void firstRun(WorkflowClient client) { + System.out.println("**** First Run: run workflow to completion"); + AutoWorkflow firstRun = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + + try { + String firstRunResult = firstRun.exec("Auto heartbeating is cool"); + System.out.println("First run result: " + firstRunResult); + } catch (Exception e) { + System.out.println("First run - Workflow exec exception: " + e.getClass().getName()); + } } - System.out.println("\n\n**** Third Run: cause heartbeat timeout"); - // we disable autoheartbeat via env var - System.setProperty("sample.disableAutoHeartbeat", "true"); - AutoWorkflow thirdRun = - client.newWorkflowStub( - AutoWorkflow.class, - WorkflowOptions.newBuilder() - .setWorkflowId(WORKFLOW_ID) - .setTaskQueue(TASK_QUEUE) - .build()); - - try { - String thirdRunResult = thirdRun.exec("Auto heartbeating is cool"); - System.out.println("Third run result: " + thirdRunResult); - } catch (Exception e) { - System.out.println("Third run - Workflow exec exception: " + e.getClass().getName()); + @SuppressWarnings("unused") + private static void secondRun(WorkflowClient client) { + System.out.println("\n\n**** Second Run: cancel activities via signal"); + AutoWorkflow secondRun = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + WorkflowClient.start(secondRun::exec, "Auto heartbeating is cool"); + doSleeps(4); + secondRun.cancelActivity(); + + try { + String secondRunResult = WorkflowStub.fromTyped(secondRun).getResult(String.class); + System.out.println("Second run result: " + secondRunResult); + } catch (Exception e) { + System.out.println("Second run - Workflow exec exception: " + e.getClass().getName()); + } } - } - private static void doSleeps(int seconds) { - try { - Thread.sleep(seconds * 1000L); - } catch (Exception e) { - System.out.println(e.getMessage()); + @SuppressWarnings("unused") + private static void thirdRun(WorkflowClient client) { + System.out.println("\n\n**** Third Run: cancel workflow execution"); + AutoWorkflow thirdRun = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + WorkflowClient.start(thirdRun::exec, "Auto heartbeating is cool"); + doSleeps(10); + try { + WorkflowStub.fromTyped(thirdRun).cancel(); + String thirdRunResult = WorkflowStub.fromTyped(thirdRun).getResult(String.class); + System.out.println("Third run result: " + thirdRunResult); + } catch (Exception e) { + // we are expecting workflow cancelation + if (e.getCause() instanceof CanceledFailure) { + System.out.println("Third run - Workflow execution canceled."); + } else { + System.out.println("Third run - Workflow exec exception: " + e.getMessage()); + } + } + } + + @SuppressWarnings("unused") + private static void fourthRun(WorkflowClient client) { + System.out.println("\n\n**** Fourth Run: cause heartbeat timeout"); + // we disable autoheartbeat via env var + System.setProperty("sample.disableAutoHeartbeat", "true"); + AutoWorkflow fourth = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + + try { + String fourthRunResult = fourth.exec("Auto heartbeating is cool"); + System.out.println("Fourth run result: " + fourthRunResult); + } catch (Exception e) { + System.out.println("Fourth run - Workflow exec exception: " + e.getClass().getName()); + } + } + + private static void doSleeps(int seconds) { + try { + Thread.sleep(seconds * 1000L); + } catch (Exception e) { + System.out.println(e.getMessage()); + } } - } } From 13ca510eb2fdc2f8a76dff26abb8fb67207c41c6 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Tue, 1 Jul 2025 16:09:08 -0400 Subject: [PATCH 4/4] formatting fix Signed-off-by: Tihomir Surdilovic --- .../samples/autoheartbeat/Starter.java | 250 +++++++++--------- 1 file changed, 125 insertions(+), 125 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java index 956eab29c..67bb97ec3 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java @@ -33,135 +33,135 @@ import io.temporal.worker.WorkerFactoryOptions; public class Starter { - static final String TASK_QUEUE = "AutoheartbeatTaskQueue"; - static final String WORKFLOW_ID = "AutoHeartbeatWorkflow"; - - public static void main(String[] args) { - WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); - WorkflowClient client = WorkflowClient.newInstance(service); - - // Configure our auto heartbeat workflow interceptor which will apply - // AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat - // timeout configured - WorkerFactoryOptions wfo = - WorkerFactoryOptions.newBuilder() - .setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor()) - .build(); - - WorkerFactory factory = WorkerFactory.newInstance(client, wfo); - Worker worker = factory.newWorker(TASK_QUEUE); - - worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class); - worker.registerActivitiesImplementations(new AutoActivitiesImpl()); - - factory.start(); - - // first run completes execution with autoheartbeat utils - firstRun(client); - // second run cancels running (pending) activity via signal (specific scope cancel) - secondRun(client); - // third run cancels running execution which cancels activity as well - thirdRun(client); - // fourth run turns off autoheartbeat for activities and lets activity time out on heartbeat - // timeout - fourthRun(client); - - System.exit(0); + static final String TASK_QUEUE = "AutoheartbeatTaskQueue"; + static final String WORKFLOW_ID = "AutoHeartbeatWorkflow"; + + public static void main(String[] args) { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = WorkflowClient.newInstance(service); + + // Configure our auto heartbeat workflow interceptor which will apply + // AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat + // timeout configured + WorkerFactoryOptions wfo = + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor()) + .build(); + + WorkerFactory factory = WorkerFactory.newInstance(client, wfo); + Worker worker = factory.newWorker(TASK_QUEUE); + + worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class); + worker.registerActivitiesImplementations(new AutoActivitiesImpl()); + + factory.start(); + + // first run completes execution with autoheartbeat utils + firstRun(client); + // second run cancels running (pending) activity via signal (specific scope cancel) + secondRun(client); + // third run cancels running execution which cancels activity as well + thirdRun(client); + // fourth run turns off autoheartbeat for activities and lets activity time out on heartbeat + // timeout + fourthRun(client); + + System.exit(0); + } + + @SuppressWarnings("unused") + private static void firstRun(WorkflowClient client) { + System.out.println("**** First Run: run workflow to completion"); + AutoWorkflow firstRun = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + + try { + String firstRunResult = firstRun.exec("Auto heartbeating is cool"); + System.out.println("First run result: " + firstRunResult); + } catch (Exception e) { + System.out.println("First run - Workflow exec exception: " + e.getClass().getName()); } - - @SuppressWarnings("unused") - private static void firstRun(WorkflowClient client) { - System.out.println("**** First Run: run workflow to completion"); - AutoWorkflow firstRun = - client.newWorkflowStub( - AutoWorkflow.class, - WorkflowOptions.newBuilder() - .setWorkflowId(WORKFLOW_ID) - .setTaskQueue(TASK_QUEUE) - .build()); - - try { - String firstRunResult = firstRun.exec("Auto heartbeating is cool"); - System.out.println("First run result: " + firstRunResult); - } catch (Exception e) { - System.out.println("First run - Workflow exec exception: " + e.getClass().getName()); - } + } + + @SuppressWarnings("unused") + private static void secondRun(WorkflowClient client) { + System.out.println("\n\n**** Second Run: cancel activities via signal"); + AutoWorkflow secondRun = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + WorkflowClient.start(secondRun::exec, "Auto heartbeating is cool"); + doSleeps(4); + secondRun.cancelActivity(); + + try { + String secondRunResult = WorkflowStub.fromTyped(secondRun).getResult(String.class); + System.out.println("Second run result: " + secondRunResult); + } catch (Exception e) { + System.out.println("Second run - Workflow exec exception: " + e.getClass().getName()); } - - @SuppressWarnings("unused") - private static void secondRun(WorkflowClient client) { - System.out.println("\n\n**** Second Run: cancel activities via signal"); - AutoWorkflow secondRun = - client.newWorkflowStub( - AutoWorkflow.class, - WorkflowOptions.newBuilder() - .setWorkflowId(WORKFLOW_ID) - .setTaskQueue(TASK_QUEUE) - .build()); - WorkflowClient.start(secondRun::exec, "Auto heartbeating is cool"); - doSleeps(4); - secondRun.cancelActivity(); - - try { - String secondRunResult = WorkflowStub.fromTyped(secondRun).getResult(String.class); - System.out.println("Second run result: " + secondRunResult); - } catch (Exception e) { - System.out.println("Second run - Workflow exec exception: " + e.getClass().getName()); - } + } + + @SuppressWarnings("unused") + private static void thirdRun(WorkflowClient client) { + System.out.println("\n\n**** Third Run: cancel workflow execution"); + AutoWorkflow thirdRun = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + WorkflowClient.start(thirdRun::exec, "Auto heartbeating is cool"); + doSleeps(10); + try { + WorkflowStub.fromTyped(thirdRun).cancel(); + String thirdRunResult = WorkflowStub.fromTyped(thirdRun).getResult(String.class); + System.out.println("Third run result: " + thirdRunResult); + } catch (Exception e) { + // we are expecting workflow cancelation + if (e.getCause() instanceof CanceledFailure) { + System.out.println("Third run - Workflow execution canceled."); + } else { + System.out.println("Third run - Workflow exec exception: " + e.getMessage()); + } } - - @SuppressWarnings("unused") - private static void thirdRun(WorkflowClient client) { - System.out.println("\n\n**** Third Run: cancel workflow execution"); - AutoWorkflow thirdRun = - client.newWorkflowStub( - AutoWorkflow.class, - WorkflowOptions.newBuilder() - .setWorkflowId(WORKFLOW_ID) - .setTaskQueue(TASK_QUEUE) - .build()); - WorkflowClient.start(thirdRun::exec, "Auto heartbeating is cool"); - doSleeps(10); - try { - WorkflowStub.fromTyped(thirdRun).cancel(); - String thirdRunResult = WorkflowStub.fromTyped(thirdRun).getResult(String.class); - System.out.println("Third run result: " + thirdRunResult); - } catch (Exception e) { - // we are expecting workflow cancelation - if (e.getCause() instanceof CanceledFailure) { - System.out.println("Third run - Workflow execution canceled."); - } else { - System.out.println("Third run - Workflow exec exception: " + e.getMessage()); - } - } - } - - @SuppressWarnings("unused") - private static void fourthRun(WorkflowClient client) { - System.out.println("\n\n**** Fourth Run: cause heartbeat timeout"); - // we disable autoheartbeat via env var - System.setProperty("sample.disableAutoHeartbeat", "true"); - AutoWorkflow fourth = - client.newWorkflowStub( - AutoWorkflow.class, - WorkflowOptions.newBuilder() - .setWorkflowId(WORKFLOW_ID) - .setTaskQueue(TASK_QUEUE) - .build()); - - try { - String fourthRunResult = fourth.exec("Auto heartbeating is cool"); - System.out.println("Fourth run result: " + fourthRunResult); - } catch (Exception e) { - System.out.println("Fourth run - Workflow exec exception: " + e.getClass().getName()); - } + } + + @SuppressWarnings("unused") + private static void fourthRun(WorkflowClient client) { + System.out.println("\n\n**** Fourth Run: cause heartbeat timeout"); + // we disable autoheartbeat via env var + System.setProperty("sample.disableAutoHeartbeat", "true"); + AutoWorkflow fourth = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + + try { + String fourthRunResult = fourth.exec("Auto heartbeating is cool"); + System.out.println("Fourth run result: " + fourthRunResult); + } catch (Exception e) { + System.out.println("Fourth run - Workflow exec exception: " + e.getClass().getName()); } + } - private static void doSleeps(int seconds) { - try { - Thread.sleep(seconds * 1000L); - } catch (Exception e) { - System.out.println(e.getMessage()); - } + private static void doSleeps(int seconds) { + try { + Thread.sleep(seconds * 1000L); + } catch (Exception e) { + System.out.println(e.getMessage()); } + } }