-
Notifications
You must be signed in to change notification settings - Fork 175
Sample: Auto Heartbeat via Interceptor #745
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
87 changes: 87 additions & 0 deletions
87
core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| /* | ||
| * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved | ||
| * | ||
| * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| * | ||
| * Modifications copyright (C) 2017 Uber Technologies, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"). You may not | ||
| * use this file except in compliance with the License. A copy of the License is | ||
| * located at | ||
| * | ||
| * http://aws.amazon.com/apache2.0 | ||
| * | ||
| * or in the "license" file accompanying this file. This file is distributed on | ||
| * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
| * express or implied. See the License for the specific language governing | ||
| * permissions and limitations under the License. | ||
| */ | ||
|
|
||
| package io.temporal.samples.autoheartbeat; | ||
|
|
||
| import io.temporal.activity.ActivityExecutionContext; | ||
| import java.time.Instant; | ||
| import java.time.ZoneId; | ||
| import java.time.format.DateTimeFormatter; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.ScheduledFuture; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| public class AutoHeartbeatUtil { | ||
| private final long period; | ||
| private final long initialDelay; | ||
| private final TimeUnit periodTimeUnit; | ||
| private final ScheduledExecutorService timerService = | ||
| Executors.newSingleThreadScheduledExecutor(); | ||
| private final ActivityExecutionContext context; | ||
| private final Object details; | ||
| private String heartbeaterId; | ||
|
|
||
| public AutoHeartbeatUtil( | ||
| long period, | ||
| long initialDelay, | ||
| TimeUnit periodTimeUnit, | ||
| ActivityExecutionContext context, | ||
| Object details) { | ||
| this.period = period; | ||
| this.initialDelay = initialDelay; | ||
| this.periodTimeUnit = periodTimeUnit; | ||
| this.context = context; | ||
| this.details = details; | ||
| // Set to activity id better, for sample we just use type | ||
| heartbeaterId = context.getInfo().getActivityType(); | ||
| } | ||
|
|
||
| public ScheduledFuture<?> start() { | ||
| System.out.println("Autoheartbeater[" + heartbeaterId + "] starting..."); | ||
| return timerService.scheduleAtFixedRate( | ||
| () -> { | ||
| // try { | ||
| System.out.println( | ||
| "Autoheartbeater[" | ||
| + heartbeaterId | ||
| + "]" | ||
| + "heartbeating at: " | ||
| + printShortCurrentTime()); | ||
| context.heartbeat(details); | ||
| }, | ||
| initialDelay, | ||
| period, | ||
| periodTimeUnit); | ||
| } | ||
|
|
||
| public void stop() { | ||
| System.out.println("Autoheartbeater[" + heartbeaterId + "] being requested to stop."); | ||
| // Try not to execute another heartbeat that could have been queued up | ||
| // Note this can at times take a second or two so make sure to test this out on your workers | ||
| // So can set best heartbeat timeout (sometimes might need larger value to accomodate) | ||
| timerService.shutdownNow(); | ||
| } | ||
|
|
||
| private String printShortCurrentTime() { | ||
| return DateTimeFormatter.ofPattern("HH:mm:ss") | ||
| .withZone(ZoneId.systemDefault()) | ||
| .format(Instant.now()); | ||
| } | ||
| } |
21 changes: 21 additions & 0 deletions
21
core/src/main/java/io/temporal/samples/autoheartbeat/README.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| # Auto-heartbeating sample for activities that define HeartbeatTimeout | ||
|
|
||
| This sample shows an implementation of an "auto-heartbeating" utility that can be applied via interceptor to all | ||
| activities where you define HeartbeatTimeout. Use case where this can be helpful include situations where you have | ||
| long-running activities where you want to heartbeat but its difficult to explicitly call heartbeat api in activity code | ||
| directly. | ||
| Another useful scenario for this is where you have activity that at times can complete in very short amount of time, | ||
| but then at times can take for example minutes. In this case you have to set longer StartToClose timeout | ||
| but you might not want first heartbeat to be sent right away but send it after the "shorter" duration of activity | ||
| execution. | ||
|
|
||
| Warning: make sure to test this sample for your use case. This includes load testing. This sample was not | ||
| tested on large scale workloads. In addition note that it is recommended to heartbeat from activity code itself. Using | ||
| this type of autoheartbeating utility does have disatvantage that activity code itself can continue running after | ||
| a handled activity cancelation. Please be aware of these warnings when applying this sample. | ||
|
|
||
| 1. Start the Sample: | ||
|
|
||
| ```bash | ||
| ./gradlew -q execute -PmainClass=io.temporal.samples.autoheartbeat.Starter | ||
| ``` | ||
167 changes: 167 additions & 0 deletions
167
core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,167 @@ | ||
| /* | ||
| * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved | ||
| * | ||
| * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| * | ||
| * Modifications copyright (C) 2017 Uber Technologies, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"). You may not | ||
| * use this file except in compliance with the License. A copy of the License is | ||
| * located at | ||
| * | ||
| * http://aws.amazon.com/apache2.0 | ||
| * | ||
| * or in the "license" file accompanying this file. This file is distributed on | ||
| * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
| * express or implied. See the License for the specific language governing | ||
| * permissions and limitations under the License. | ||
| */ | ||
|
|
||
| package io.temporal.samples.autoheartbeat; | ||
|
|
||
| import io.temporal.client.WorkflowClient; | ||
| import io.temporal.client.WorkflowOptions; | ||
| import io.temporal.client.WorkflowStub; | ||
| import io.temporal.failure.CanceledFailure; | ||
| import io.temporal.samples.autoheartbeat.activities.AutoActivitiesImpl; | ||
| import io.temporal.samples.autoheartbeat.interceptor.AutoHeartbeatWorkerInterceptor; | ||
| import io.temporal.samples.autoheartbeat.workflows.AutoWorkflow; | ||
| import io.temporal.samples.autoheartbeat.workflows.AutoWorkflowImpl; | ||
| import io.temporal.serviceclient.WorkflowServiceStubs; | ||
| import io.temporal.worker.Worker; | ||
| import io.temporal.worker.WorkerFactory; | ||
| import io.temporal.worker.WorkerFactoryOptions; | ||
|
|
||
| public class Starter { | ||
| static final String TASK_QUEUE = "AutoheartbeatTaskQueue"; | ||
| static final String WORKFLOW_ID = "AutoHeartbeatWorkflow"; | ||
|
|
||
| public static void main(String[] args) { | ||
| WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); | ||
| WorkflowClient client = WorkflowClient.newInstance(service); | ||
|
|
||
| // Configure our auto heartbeat workflow interceptor which will apply | ||
| // AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat | ||
| // timeout configured | ||
| WorkerFactoryOptions wfo = | ||
| WorkerFactoryOptions.newBuilder() | ||
| .setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor()) | ||
| .build(); | ||
|
|
||
| WorkerFactory factory = WorkerFactory.newInstance(client, wfo); | ||
| Worker worker = factory.newWorker(TASK_QUEUE); | ||
|
|
||
| worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class); | ||
| worker.registerActivitiesImplementations(new AutoActivitiesImpl()); | ||
|
|
||
| factory.start(); | ||
|
|
||
| // first run completes execution with autoheartbeat utils | ||
| firstRun(client); | ||
| // second run cancels running (pending) activity via signal (specific scope cancel) | ||
| secondRun(client); | ||
| // third run cancels running execution which cancels activity as well | ||
| thirdRun(client); | ||
| // fourth run turns off autoheartbeat for activities and lets activity time out on heartbeat | ||
| // timeout | ||
| fourthRun(client); | ||
|
|
||
| System.exit(0); | ||
| } | ||
|
|
||
| @SuppressWarnings("unused") | ||
| private static void firstRun(WorkflowClient client) { | ||
| System.out.println("**** First Run: run workflow to completion"); | ||
| AutoWorkflow firstRun = | ||
| client.newWorkflowStub( | ||
| AutoWorkflow.class, | ||
| WorkflowOptions.newBuilder() | ||
| .setWorkflowId(WORKFLOW_ID) | ||
| .setTaskQueue(TASK_QUEUE) | ||
| .build()); | ||
|
|
||
| try { | ||
| String firstRunResult = firstRun.exec("Auto heartbeating is cool"); | ||
| System.out.println("First run result: " + firstRunResult); | ||
| } catch (Exception e) { | ||
| System.out.println("First run - Workflow exec exception: " + e.getClass().getName()); | ||
| } | ||
| } | ||
|
|
||
| @SuppressWarnings("unused") | ||
| private static void secondRun(WorkflowClient client) { | ||
| System.out.println("\n\n**** Second Run: cancel activities via signal"); | ||
| AutoWorkflow secondRun = | ||
| client.newWorkflowStub( | ||
| AutoWorkflow.class, | ||
| WorkflowOptions.newBuilder() | ||
| .setWorkflowId(WORKFLOW_ID) | ||
| .setTaskQueue(TASK_QUEUE) | ||
| .build()); | ||
| WorkflowClient.start(secondRun::exec, "Auto heartbeating is cool"); | ||
| doSleeps(4); | ||
| secondRun.cancelActivity(); | ||
|
|
||
| try { | ||
| String secondRunResult = WorkflowStub.fromTyped(secondRun).getResult(String.class); | ||
| System.out.println("Second run result: " + secondRunResult); | ||
| } catch (Exception e) { | ||
| System.out.println("Second run - Workflow exec exception: " + e.getClass().getName()); | ||
| } | ||
| } | ||
|
|
||
| @SuppressWarnings("unused") | ||
| private static void thirdRun(WorkflowClient client) { | ||
| System.out.println("\n\n**** Third Run: cancel workflow execution"); | ||
| AutoWorkflow thirdRun = | ||
| client.newWorkflowStub( | ||
| AutoWorkflow.class, | ||
| WorkflowOptions.newBuilder() | ||
| .setWorkflowId(WORKFLOW_ID) | ||
| .setTaskQueue(TASK_QUEUE) | ||
| .build()); | ||
| WorkflowClient.start(thirdRun::exec, "Auto heartbeating is cool"); | ||
| doSleeps(10); | ||
| try { | ||
| WorkflowStub.fromTyped(thirdRun).cancel(); | ||
| String thirdRunResult = WorkflowStub.fromTyped(thirdRun).getResult(String.class); | ||
| System.out.println("Third run result: " + thirdRunResult); | ||
| } catch (Exception e) { | ||
| // we are expecting workflow cancelation | ||
| if (e.getCause() instanceof CanceledFailure) { | ||
| System.out.println("Third run - Workflow execution canceled."); | ||
| } else { | ||
| System.out.println("Third run - Workflow exec exception: " + e.getMessage()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @SuppressWarnings("unused") | ||
| private static void fourthRun(WorkflowClient client) { | ||
| System.out.println("\n\n**** Fourth Run: cause heartbeat timeout"); | ||
| // we disable autoheartbeat via env var | ||
| System.setProperty("sample.disableAutoHeartbeat", "true"); | ||
| AutoWorkflow fourth = | ||
| client.newWorkflowStub( | ||
| AutoWorkflow.class, | ||
| WorkflowOptions.newBuilder() | ||
| .setWorkflowId(WORKFLOW_ID) | ||
| .setTaskQueue(TASK_QUEUE) | ||
| .build()); | ||
|
|
||
| try { | ||
| String fourthRunResult = fourth.exec("Auto heartbeating is cool"); | ||
| System.out.println("Fourth run result: " + fourthRunResult); | ||
| } catch (Exception e) { | ||
| System.out.println("Fourth run - Workflow exec exception: " + e.getClass().getName()); | ||
| } | ||
| } | ||
|
|
||
| private static void doSleeps(int seconds) { | ||
| try { | ||
| Thread.sleep(seconds * 1000L); | ||
| } catch (Exception e) { | ||
| System.out.println(e.getMessage()); | ||
| } | ||
| } | ||
| } |
29 changes: 29 additions & 0 deletions
29
core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| /* | ||
| * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved | ||
| * | ||
| * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| * | ||
| * Modifications copyright (C) 2017 Uber Technologies, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"). You may not | ||
| * use this file except in compliance with the License. A copy of the License is | ||
| * located at | ||
| * | ||
| * http://aws.amazon.com/apache2.0 | ||
| * | ||
| * or in the "license" file accompanying this file. This file is distributed on | ||
| * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
| * express or implied. See the License for the specific language governing | ||
| * permissions and limitations under the License. | ||
| */ | ||
|
|
||
| package io.temporal.samples.autoheartbeat.activities; | ||
|
|
||
| import io.temporal.activity.ActivityInterface; | ||
|
|
||
| @ActivityInterface | ||
| public interface AutoActivities { | ||
| String runActivityOne(String input); | ||
|
|
||
| String runActivityTwo(String input); | ||
| } |
51 changes: 51 additions & 0 deletions
51
core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| /* | ||
| * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved | ||
| * | ||
| * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| * | ||
| * Modifications copyright (C) 2017 Uber Technologies, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"). You may not | ||
| * use this file except in compliance with the License. A copy of the License is | ||
| * located at | ||
| * | ||
| * http://aws.amazon.com/apache2.0 | ||
| * | ||
| * or in the "license" file accompanying this file. This file is distributed on | ||
| * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
| * express or implied. See the License for the specific language governing | ||
| * permissions and limitations under the License. | ||
| */ | ||
|
|
||
| package io.temporal.samples.autoheartbeat.activities; | ||
|
|
||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| public class AutoActivitiesImpl implements AutoActivities { | ||
|
|
||
| @Override | ||
| public String runActivityOne(String input) { | ||
| return runActivity("runActivityOne - " + input, 10); | ||
| } | ||
|
|
||
| @Override | ||
| public String runActivityTwo(String input) { | ||
| return runActivity("runActivityTwo - " + input, 5); | ||
| } | ||
|
|
||
| @SuppressWarnings("FutureReturnValueIgnored") | ||
| private String runActivity(String input, int seconds) { | ||
| for (int i = 0; i < seconds; i++) { | ||
| sleep(1); | ||
| } | ||
| return "Activity completed: " + input; | ||
| } | ||
|
|
||
| private void sleep(int seconds) { | ||
| try { | ||
| Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); | ||
| } catch (InterruptedException ee) { | ||
| // Empty | ||
| } | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sample should be linked from the top level readmen