diff --git a/README.md b/README.md index b04d1f31e..07d381d88 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ See the README.md file in each main sample directory for cut/paste Gradle comman - [**HelloDelayedStart**](/core/src/main/java/io/temporal/samples/hello/HelloDelayedStart.java): Demonstrates how to use delayed start config option when starting a Workflow Executions. - [**HelloSignalWithTimer**](/core/src/main/java/io/temporal/samples/hello/HelloSignalWithTimer.java): Demonstrates how to use collect signals for certain amount of time and then process last one. - [**HelloWorkflowTimer**](/core/src/main/java/io/temporal/samples/hello/HelloWorkflowTimer.java): Demonstrates how we can use workflow timer to restrict duration of workflow execution instead of workflow run/execution timeouts. + - [**Auto-Heartbeating**](/core/src/main/java/io/temporal/samples/autoheartbeat/): Demonstrates use of Auto-heartbeating utility via activity interceptor. #### Scenario-based samples diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java new file mode 100644 index 000000000..ca2b0a5f4 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat; + +import io.temporal.activity.ActivityExecutionContext; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class AutoHeartbeatUtil { + private final long period; + private final long initialDelay; + private final TimeUnit periodTimeUnit; + private final ScheduledExecutorService timerService = + Executors.newSingleThreadScheduledExecutor(); + private final ActivityExecutionContext context; + private final Object details; + private String heartbeaterId; + + public AutoHeartbeatUtil( + long period, + long initialDelay, + TimeUnit periodTimeUnit, + ActivityExecutionContext context, + Object details) { + this.period = period; + this.initialDelay = initialDelay; + this.periodTimeUnit = periodTimeUnit; + this.context = context; + this.details = details; + // Set to activity id better, for sample we just use type + heartbeaterId = context.getInfo().getActivityType(); + } + + public ScheduledFuture start() { + System.out.println("Autoheartbeater[" + heartbeaterId + "] starting..."); + return timerService.scheduleAtFixedRate( + () -> { + // try { + System.out.println( + "Autoheartbeater[" + + heartbeaterId + + "]" + + "heartbeating at: " + + printShortCurrentTime()); + context.heartbeat(details); + }, + initialDelay, + period, + periodTimeUnit); + } + + public void stop() { + System.out.println("Autoheartbeater[" + heartbeaterId + "] being requested to stop."); + // Try not to execute another heartbeat that could have been queued up + // Note this can at times take a second or two so make sure to test this out on your workers + // So can set best heartbeat timeout (sometimes might need larger value to accomodate) + timerService.shutdownNow(); + } + + private String printShortCurrentTime() { + return DateTimeFormatter.ofPattern("HH:mm:ss") + .withZone(ZoneId.systemDefault()) + .format(Instant.now()); + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/README.md b/core/src/main/java/io/temporal/samples/autoheartbeat/README.md new file mode 100644 index 000000000..946e24fc5 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/README.md @@ -0,0 +1,21 @@ +# Auto-heartbeating sample for activities that define HeartbeatTimeout + +This sample shows an implementation of an "auto-heartbeating" utility that can be applied via interceptor to all +activities where you define HeartbeatTimeout. Use case where this can be helpful include situations where you have +long-running activities where you want to heartbeat but its difficult to explicitly call heartbeat api in activity code +directly. +Another useful scenario for this is where you have activity that at times can complete in very short amount of time, +but then at times can take for example minutes. In this case you have to set longer StartToClose timeout +but you might not want first heartbeat to be sent right away but send it after the "shorter" duration of activity +execution. + +Warning: make sure to test this sample for your use case. This includes load testing. This sample was not +tested on large scale workloads. In addition note that it is recommended to heartbeat from activity code itself. Using +this type of autoheartbeating utility does have disatvantage that activity code itself can continue running after +a handled activity cancelation. Please be aware of these warnings when applying this sample. + +1. Start the Sample: + +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.autoheartbeat.Starter +``` \ No newline at end of file diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java new file mode 100644 index 000000000..67bb97ec3 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.failure.CanceledFailure; +import io.temporal.samples.autoheartbeat.activities.AutoActivitiesImpl; +import io.temporal.samples.autoheartbeat.interceptor.AutoHeartbeatWorkerInterceptor; +import io.temporal.samples.autoheartbeat.workflows.AutoWorkflow; +import io.temporal.samples.autoheartbeat.workflows.AutoWorkflowImpl; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerFactoryOptions; + +public class Starter { + static final String TASK_QUEUE = "AutoheartbeatTaskQueue"; + static final String WORKFLOW_ID = "AutoHeartbeatWorkflow"; + + public static void main(String[] args) { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = WorkflowClient.newInstance(service); + + // Configure our auto heartbeat workflow interceptor which will apply + // AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat + // timeout configured + WorkerFactoryOptions wfo = + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor()) + .build(); + + WorkerFactory factory = WorkerFactory.newInstance(client, wfo); + Worker worker = factory.newWorker(TASK_QUEUE); + + worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class); + worker.registerActivitiesImplementations(new AutoActivitiesImpl()); + + factory.start(); + + // first run completes execution with autoheartbeat utils + firstRun(client); + // second run cancels running (pending) activity via signal (specific scope cancel) + secondRun(client); + // third run cancels running execution which cancels activity as well + thirdRun(client); + // fourth run turns off autoheartbeat for activities and lets activity time out on heartbeat + // timeout + fourthRun(client); + + System.exit(0); + } + + @SuppressWarnings("unused") + private static void firstRun(WorkflowClient client) { + System.out.println("**** First Run: run workflow to completion"); + AutoWorkflow firstRun = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + + try { + String firstRunResult = firstRun.exec("Auto heartbeating is cool"); + System.out.println("First run result: " + firstRunResult); + } catch (Exception e) { + System.out.println("First run - Workflow exec exception: " + e.getClass().getName()); + } + } + + @SuppressWarnings("unused") + private static void secondRun(WorkflowClient client) { + System.out.println("\n\n**** Second Run: cancel activities via signal"); + AutoWorkflow secondRun = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + WorkflowClient.start(secondRun::exec, "Auto heartbeating is cool"); + doSleeps(4); + secondRun.cancelActivity(); + + try { + String secondRunResult = WorkflowStub.fromTyped(secondRun).getResult(String.class); + System.out.println("Second run result: " + secondRunResult); + } catch (Exception e) { + System.out.println("Second run - Workflow exec exception: " + e.getClass().getName()); + } + } + + @SuppressWarnings("unused") + private static void thirdRun(WorkflowClient client) { + System.out.println("\n\n**** Third Run: cancel workflow execution"); + AutoWorkflow thirdRun = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + WorkflowClient.start(thirdRun::exec, "Auto heartbeating is cool"); + doSleeps(10); + try { + WorkflowStub.fromTyped(thirdRun).cancel(); + String thirdRunResult = WorkflowStub.fromTyped(thirdRun).getResult(String.class); + System.out.println("Third run result: " + thirdRunResult); + } catch (Exception e) { + // we are expecting workflow cancelation + if (e.getCause() instanceof CanceledFailure) { + System.out.println("Third run - Workflow execution canceled."); + } else { + System.out.println("Third run - Workflow exec exception: " + e.getMessage()); + } + } + } + + @SuppressWarnings("unused") + private static void fourthRun(WorkflowClient client) { + System.out.println("\n\n**** Fourth Run: cause heartbeat timeout"); + // we disable autoheartbeat via env var + System.setProperty("sample.disableAutoHeartbeat", "true"); + AutoWorkflow fourth = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + + try { + String fourthRunResult = fourth.exec("Auto heartbeating is cool"); + System.out.println("Fourth run result: " + fourthRunResult); + } catch (Exception e) { + System.out.println("Fourth run - Workflow exec exception: " + e.getClass().getName()); + } + } + + private static void doSleeps(int seconds) { + try { + Thread.sleep(seconds * 1000L); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java new file mode 100644 index 000000000..81726e05a --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.activities; + +import io.temporal.activity.ActivityInterface; + +@ActivityInterface +public interface AutoActivities { + String runActivityOne(String input); + + String runActivityTwo(String input); +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java new file mode 100644 index 000000000..26a30162d --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.activities; + +import java.util.concurrent.TimeUnit; + +public class AutoActivitiesImpl implements AutoActivities { + + @Override + public String runActivityOne(String input) { + return runActivity("runActivityOne - " + input, 10); + } + + @Override + public String runActivityTwo(String input) { + return runActivity("runActivityTwo - " + input, 5); + } + + @SuppressWarnings("FutureReturnValueIgnored") + private String runActivity(String input, int seconds) { + for (int i = 0; i < seconds; i++) { + sleep(1); + } + return "Activity completed: " + input; + } + + private void sleep(int seconds) { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); + } catch (InterruptedException ee) { + // Empty + } + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java new file mode 100644 index 000000000..60a2d5427 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.interceptor; + +import io.temporal.activity.Activity; +import io.temporal.activity.ActivityExecutionContext; +import io.temporal.client.ActivityCanceledException; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptorBase; +import io.temporal.samples.autoheartbeat.AutoHeartbeatUtil; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class AutoHeartbeatActivityInboundCallsInterceptor + extends ActivityInboundCallsInterceptorBase { + private ActivityExecutionContext activityExecutionContext; + private Duration activityHeartbeatTimeout; + private AutoHeartbeatUtil autoHeartbeater; + private ScheduledFuture scheduledFuture; + + public AutoHeartbeatActivityInboundCallsInterceptor(ActivityInboundCallsInterceptor next) { + super(next); + } + + @Override + public void init(ActivityExecutionContext context) { + this.activityExecutionContext = context; + activityHeartbeatTimeout = activityExecutionContext.getInfo().getHeartbeatTimeout(); + super.init(context); + } + + @Override + @SuppressWarnings({"FutureReturnValueIgnored", "CatchAndPrintStackTrace"}) + public ActivityOutput execute(ActivityInput input) { + // If activity has heartbeat timeout defined we want to apply auto-heartbeter + // Unless we explicitly disabled autoheartbeating via system property + if (activityHeartbeatTimeout != null + && activityHeartbeatTimeout.getSeconds() > 0 + && !Boolean.parseBoolean(System.getProperty("sample.disableAutoHeartbeat"))) { + System.out.println( + "Auto heartbeating applied for activity: " + + activityExecutionContext.getInfo().getActivityType()); + autoHeartbeater = + new AutoHeartbeatUtil(2, 0, TimeUnit.SECONDS, activityExecutionContext, input); + scheduledFuture = autoHeartbeater.start(); + } else { + System.out.println( + "Auto heartbeating not being applied for activity: " + + activityExecutionContext.getInfo().getActivityType()); + } + + if (scheduledFuture != null) { + CompletableFuture activityExecFuture = + CompletableFuture.supplyAsync(() -> super.execute(input)); + CompletableFuture autoHeartbeatFuture = + CompletableFuture.supplyAsync( + () -> { + try { + return scheduledFuture.get(); + } catch (Exception e) { + throw new ActivityCanceledException(activityExecutionContext.getInfo()); + } + }); + try { + return (ActivityOutput) + CompletableFuture.anyOf(autoHeartbeatFuture, activityExecFuture).get(); + } catch (Exception e) { + if (e instanceof ExecutionException) { + ExecutionException ee = (ExecutionException) e; + if (ee.getCause() instanceof ActivityCanceledException) { + throw new ActivityCanceledException(activityExecutionContext.getInfo()); + } + } + throw Activity.wrap(e); + } finally { + if (autoHeartbeater != null) { + autoHeartbeater.stop(); + } + } + } else { + return super.execute(input); + } + } + + public interface AutoHeartbeaterCancellationCallback { + void handle(Exception e); + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java new file mode 100644 index 000000000..9816fe644 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.interceptor; + +import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.WorkerInterceptorBase; + +public class AutoHeartbeatWorkerInterceptor extends WorkerInterceptorBase { + @Override + public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) { + return new AutoHeartbeatActivityInboundCallsInterceptor(next); + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java new file mode 100644 index 000000000..e8816c6a2 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.workflows; + +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface AutoWorkflow { + @WorkflowMethod + String exec(String input); + + @SignalMethod + void cancelActivity(); +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java new file mode 100644 index 000000000..6ff8100eb --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.workflows; + +import io.temporal.activity.ActivityCancellationType; +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; +import io.temporal.failure.ActivityFailure; +import io.temporal.failure.CanceledFailure; +import io.temporal.failure.TimeoutFailure; +import io.temporal.samples.autoheartbeat.activities.AutoActivities; +import io.temporal.workflow.CancellationScope; +import io.temporal.workflow.Workflow; +import java.time.Duration; + +public class AutoWorkflowImpl implements AutoWorkflow { + private CancellationScope scope; + + @Override + public String exec(String input) { + AutoActivities activitiesOne = + Workflow.newActivityStub( + AutoActivities.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(22)) + .setHeartbeatTimeout(Duration.ofSeconds(8)) + .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) + // for sample purposes + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(3).build()) + .build()); + + AutoActivities activitiesTwo = + Workflow.newActivityStub( + AutoActivities.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(20)) + .setHeartbeatTimeout(Duration.ofSeconds(7)) + .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) + // for sample purposes + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(3).build()) + .build()); + + // Start our activity in CancellationScope so we can cancel it if needed + scope = + Workflow.newCancellationScope( + () -> { + activitiesOne.runActivityOne(input); + activitiesTwo.runActivityTwo(input); + }); + + try { + scope.run(); + } catch (ActivityFailure e) { + if (e.getCause() instanceof CanceledFailure) { + // We dont want workflow to fail in we canceled our scope, just log and return + return "Workflow result after activity cancellation"; + } else if (e.getCause() instanceof TimeoutFailure) { + return "Workflow result after activity timeout of type: " + + ((TimeoutFailure) e.getCause()).getTimeoutType().name(); + } else { + throw e; + } + } + return "completed"; + } + + @Override + public void cancelActivity() { + scope.cancel("Canceling scope from signal handler"); + } +}