Skip to content

Commit d9b43cb

Browse files
committed
Sample: Auto Heartbeat via Interceptor
Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io>
1 parent c4217c9 commit d9b43cb

File tree

9 files changed

+534
-0
lines changed

9 files changed

+534
-0
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat;
21+
22+
import io.temporal.activity.ActivityExecutionContext;
23+
import java.time.Instant;
24+
import java.time.ZoneId;
25+
import java.time.format.DateTimeFormatter;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.ScheduledFuture;
29+
import java.util.concurrent.TimeUnit;
30+
31+
public class AutoHeartbeatUtil {
32+
private final long period;
33+
private final long initialDelay;
34+
private final TimeUnit periodTimeUnit;
35+
private final ScheduledExecutorService timerService =
36+
Executors.newSingleThreadScheduledExecutor();
37+
private final ActivityExecutionContext context;
38+
private final Object details;
39+
private String heartbeaterId;
40+
41+
public AutoHeartbeatUtil(
42+
long period,
43+
long initialDelay,
44+
TimeUnit periodTimeUnit,
45+
ActivityExecutionContext context,
46+
Object details) {
47+
this.period = period;
48+
this.initialDelay = initialDelay;
49+
this.periodTimeUnit = periodTimeUnit;
50+
this.context = context;
51+
this.details = details;
52+
// Set to activity id better, for sample we just use type
53+
heartbeaterId = context.getInfo().getActivityType();
54+
}
55+
56+
public ScheduledFuture<?> start() {
57+
System.out.println("Autoheartbeater[" + heartbeaterId + "] starting...");
58+
return timerService.scheduleAtFixedRate(
59+
() -> {
60+
// try {
61+
System.out.println(
62+
"Autoheartbeater["
63+
+ heartbeaterId
64+
+ "]"
65+
+ "heartbeating at: "
66+
+ printShortCurrentTime());
67+
context.heartbeat(details);
68+
},
69+
initialDelay,
70+
period,
71+
periodTimeUnit);
72+
}
73+
74+
public void stop() {
75+
System.out.println("Autoheartbeater[" + heartbeaterId + "] being requested to stop.");
76+
// Try not to execute another heartbeat that could have been queued up
77+
// Note this can at times take a second or two so make sure to test this out on your workers
78+
// So can set best heartbeat timeout (sometimes might need larger value to accomodate)
79+
timerService.shutdownNow();
80+
}
81+
82+
private String printShortCurrentTime() {
83+
return DateTimeFormatter.ofPattern("HH:mm:ss")
84+
.withZone(ZoneId.systemDefault())
85+
.format(Instant.now());
86+
}
87+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Auto-heartbeating sample for activities that define HeartbeatTimeout
2+
3+
This sample shows an implementation of an "auto-heartbeating" utility that can be applied via interceptor to all
4+
activities where you define HeartbeatTimeout. Use case where this can be helpful include situations where you have
5+
long-running activities where you want to heartbeat but its difficult to explicitly call heartbeat api in activity code
6+
directly.
7+
Another useful scenario for this is where you have activity that at times can complete in very short amount of time,
8+
but then at times can take for example minutes. In this case you have to set longer StartToClose timeout
9+
but you might not want first heartbeat to be sent right away but send it after the "shorter" duration of activity
10+
execution.
11+
12+
1. Start the Sample:
13+
14+
```bash
15+
./gradlew -q execute -PmainClass=io.temporal.samples.autoheartbeat.Starter
16+
```
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat;
21+
22+
import io.temporal.client.WorkflowClient;
23+
import io.temporal.client.WorkflowOptions;
24+
import io.temporal.client.WorkflowStub;
25+
import io.temporal.samples.autoheartbeat.activities.AutoActivitiesImpl;
26+
import io.temporal.samples.autoheartbeat.interceptor.AutoHeartbeatWorkerInterceptor;
27+
import io.temporal.samples.autoheartbeat.workflows.AutoWorkflow;
28+
import io.temporal.samples.autoheartbeat.workflows.AutoWorkflowImpl;
29+
import io.temporal.serviceclient.WorkflowServiceStubs;
30+
import io.temporal.worker.Worker;
31+
import io.temporal.worker.WorkerFactory;
32+
import io.temporal.worker.WorkerFactoryOptions;
33+
34+
public class Starter {
35+
static final String TASK_QUEUE = "AutoheartbeatTaskQueue";
36+
static final String WORKFLOW_ID = "AutoHeartbeatWorkflow";
37+
38+
public static void main(String[] args) {
39+
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
40+
WorkflowClient client = WorkflowClient.newInstance(service);
41+
42+
// Configure our auto heartbeat workflow interceptor which will apply
43+
// AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat
44+
// timeout configured
45+
WorkerFactoryOptions wfo =
46+
WorkerFactoryOptions.newBuilder()
47+
.setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor())
48+
.build();
49+
50+
WorkerFactory factory = WorkerFactory.newInstance(client, wfo);
51+
Worker worker = factory.newWorker(TASK_QUEUE);
52+
53+
worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class);
54+
worker.registerActivitiesImplementations(new AutoActivitiesImpl());
55+
56+
factory.start();
57+
58+
System.out.println("**** First Run: run workflow to completion");
59+
60+
AutoWorkflow firstRun =
61+
client.newWorkflowStub(
62+
AutoWorkflow.class,
63+
WorkflowOptions.newBuilder()
64+
.setWorkflowId(WORKFLOW_ID)
65+
.setTaskQueue(TASK_QUEUE)
66+
.build());
67+
68+
try {
69+
String firstRunResult = firstRun.exec("Auto heartbeating is cool");
70+
System.out.println("First run result: " + firstRunResult);
71+
} catch (Exception e) {
72+
System.out.println("First run - Workflow exec exception: " + e.getClass().getName());
73+
}
74+
75+
System.out.println("\n\n**** Second Run: cancel activities");
76+
77+
AutoWorkflow secondRun =
78+
client.newWorkflowStub(
79+
AutoWorkflow.class,
80+
WorkflowOptions.newBuilder()
81+
.setWorkflowId(WORKFLOW_ID)
82+
.setTaskQueue(TASK_QUEUE)
83+
.build());
84+
WorkflowClient.start(secondRun::exec, "Auto heartbeating is cool");
85+
doSleeps(4);
86+
secondRun.cancelActivity();
87+
88+
try {
89+
String secondRunResult = WorkflowStub.fromTyped(secondRun).getResult(String.class);
90+
System.out.println("Second run result: " + secondRunResult);
91+
} catch (Exception e) {
92+
System.out.println("Second run - Workflow exec exception: " + e.getClass().getName());
93+
}
94+
}
95+
96+
private static void doSleeps(int seconds) {
97+
try {
98+
Thread.sleep(seconds * 1000L);
99+
} catch (Exception e) {
100+
System.out.println(e.getMessage());
101+
}
102+
}
103+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat.activities;
21+
22+
import io.temporal.activity.ActivityInterface;
23+
24+
@ActivityInterface
25+
public interface AutoActivities {
26+
String runActivityOne(String input);
27+
28+
String runActivityTwo(String input);
29+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat.activities;
21+
22+
import java.util.concurrent.TimeUnit;
23+
24+
public class AutoActivitiesImpl implements AutoActivities {
25+
26+
@Override
27+
public String runActivityOne(String input) {
28+
return runActivity("runActivityOne - " + input, 10);
29+
}
30+
31+
@Override
32+
public String runActivityTwo(String input) {
33+
return runActivity("runActivityTwo - " + input, 5);
34+
}
35+
36+
@SuppressWarnings("FutureReturnValueIgnored")
37+
private String runActivity(String input, int seconds) {
38+
for (int i = 0; i < seconds; i++) {
39+
sleep(1);
40+
}
41+
return "Activity completed: " + input;
42+
}
43+
44+
private void sleep(int seconds) {
45+
try {
46+
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
47+
} catch (InterruptedException ee) {
48+
// Empty
49+
}
50+
}
51+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat.interceptor;
21+
22+
import io.temporal.activity.Activity;
23+
import io.temporal.activity.ActivityExecutionContext;
24+
import io.temporal.client.ActivityCanceledException;
25+
import io.temporal.common.interceptors.ActivityInboundCallsInterceptor;
26+
import io.temporal.common.interceptors.ActivityInboundCallsInterceptorBase;
27+
import io.temporal.samples.autoheartbeat.AutoHeartbeatUtil;
28+
import java.time.Duration;
29+
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.ExecutionException;
31+
import java.util.concurrent.ScheduledFuture;
32+
import java.util.concurrent.TimeUnit;
33+
34+
public class AutoHeartbeatActivityInboundCallsInterceptor
35+
extends ActivityInboundCallsInterceptorBase {
36+
private ActivityExecutionContext activityExecutionContext;
37+
private Duration activityHeartbeatTimeout;
38+
private AutoHeartbeatUtil autoHeartbeater;
39+
// private CompletableFuture autoHeartbeatFuture;
40+
private ScheduledFuture scheduledFuture;
41+
42+
// private ScheduledFuture scheduledFuture;
43+
44+
public AutoHeartbeatActivityInboundCallsInterceptor(ActivityInboundCallsInterceptor next) {
45+
super(next);
46+
}
47+
48+
@Override
49+
public void init(ActivityExecutionContext context) {
50+
this.activityExecutionContext = context;
51+
activityHeartbeatTimeout = activityExecutionContext.getInfo().getHeartbeatTimeout();
52+
super.init(context);
53+
}
54+
55+
@Override
56+
@SuppressWarnings({"FutureReturnValueIgnored", "CatchAndPrintStackTrace"})
57+
public ActivityOutput execute(ActivityInput input) {
58+
// If activity has heartbeat timeout defined we want to apply auto-heartbeter
59+
if (activityHeartbeatTimeout != null && activityHeartbeatTimeout.getSeconds() > 0) {
60+
System.out.println(
61+
"Auto heartbeating applied for activity: "
62+
+ activityExecutionContext.getInfo().getActivityType());
63+
autoHeartbeater =
64+
new AutoHeartbeatUtil(2, 0, TimeUnit.SECONDS, activityExecutionContext, input);
65+
scheduledFuture = autoHeartbeater.start();
66+
} else {
67+
System.out.println(
68+
"Auto heartbeating not being applied for activity: "
69+
+ activityExecutionContext.getInfo().getActivityType());
70+
}
71+
72+
if (scheduledFuture != null) {
73+
CompletableFuture activityExecFuture =
74+
CompletableFuture.supplyAsync(() -> super.execute(input));
75+
CompletableFuture autoHeartbeatFuture =
76+
CompletableFuture.supplyAsync(
77+
() -> {
78+
try {
79+
return scheduledFuture.get();
80+
} catch (Exception e) {
81+
throw new ActivityCanceledException(activityExecutionContext.getInfo());
82+
}
83+
});
84+
try {
85+
CompletableFuture.anyOf(autoHeartbeatFuture, activityExecFuture).get();
86+
} catch (Exception e) {
87+
if (e instanceof ExecutionException) {
88+
ExecutionException ee = (ExecutionException) e;
89+
if (ee.getCause() instanceof ActivityCanceledException) {
90+
throw new ActivityCanceledException(activityExecutionContext.getInfo());
91+
}
92+
}
93+
throw Activity.wrap(e);
94+
} finally {
95+
if (autoHeartbeater != null) {
96+
autoHeartbeater.stop();
97+
}
98+
}
99+
}
100+
return super.execute(input);
101+
}
102+
103+
public interface AutoHeartbeaterCancellationCallback {
104+
void handle(Exception e);
105+
}
106+
}

0 commit comments

Comments
 (0)