Skip to content

Commit 535488e

Browse files
committed
[wip] auto heartbeater sample
Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io>
1 parent 1b1a632 commit 535488e

File tree

6 files changed

+385
-0
lines changed

6 files changed

+385
-0
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat;
21+
22+
import io.temporal.activity.ActivityInterface;
23+
24+
@ActivityInterface
25+
public interface AutoActivities {
26+
String runActivityOne(String input);
27+
28+
String runActivityTwo(String input);
29+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat;
21+
22+
import io.temporal.activity.Activity;
23+
import io.temporal.client.ActivityCompletionException;
24+
import java.util.concurrent.TimeUnit;
25+
26+
public class AutoActivitiesImpl implements AutoActivities {
27+
28+
@Override
29+
public String runActivityOne(String input) {
30+
return runActivity("runActivityOne - " + input);
31+
}
32+
33+
@Override
34+
public String runActivityTwo(String input) {
35+
return runActivity("runActivityTwo - " + input);
36+
}
37+
38+
@SuppressWarnings("FutureReturnValueIgnored")
39+
private String runActivity(String input) {
40+
// Calculate heartbeat period based on our heartbeat timeout
41+
// Start Autoheartbeater
42+
AutoHeartbeater autoHearbeater =
43+
new AutoHeartbeater(
44+
getHeartbeatPeriod(), 0, TimeUnit.SECONDS, Activity.getExecutionContext(), input);
45+
autoHearbeater.start();
46+
47+
// For sample our activity just sleeps for a second for 20 seconds
48+
for (int i = 0; i < 20; i++) {
49+
try {
50+
sleep(1);
51+
} catch (ActivityCompletionException e) {
52+
System.out.println(
53+
"Activity type:"
54+
+ e.getActivityType().get()
55+
+ "Activiy id: "
56+
+ e.getActivityId().get()
57+
+ "Workflow id: "
58+
+ e.getWorkflowId().get()
59+
+ "Workflow runid: "
60+
+ e.getRunId().get()
61+
+ " was canceled. Shutting down auto heartbeats");
62+
autoHearbeater.stop();
63+
// We want to rethrow the cancel failure
64+
throw e;
65+
}
66+
}
67+
return "Activity completed: " + input;
68+
}
69+
70+
private long getHeartbeatPeriod() {
71+
// Note you can add checks if heartbeat timeout is set if not and
72+
// decide to log / fail activity / not start autoheartbeater based on your business logic
73+
74+
// For sample we want to heartbeat 1 seconds less than heartbeat timeout
75+
return Activity.getExecutionContext().getInfo().getHeartbeatTimeout().getSeconds() <= 1
76+
? 1
77+
: Activity.getExecutionContext().getInfo().getHeartbeatTimeout().getSeconds() - 1;
78+
}
79+
80+
private void sleep(int seconds) {
81+
try {
82+
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
83+
} catch (InterruptedException ee) {
84+
// Empty
85+
}
86+
}
87+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat;
21+
22+
import io.temporal.activity.ActivityExecutionContext;
23+
import java.time.Instant;
24+
import java.time.ZoneId;
25+
import java.time.format.DateTimeFormatter;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.ScheduledFuture;
29+
import java.util.concurrent.TimeUnit;
30+
31+
public class AutoHeartbeater {
32+
private final long period;
33+
private final long initialDelay;
34+
private final TimeUnit periodTimeUnit;
35+
private final ScheduledExecutorService timerService =
36+
Executors.newSingleThreadScheduledExecutor();
37+
private final ActivityExecutionContext context;
38+
private final Object details;
39+
private String heartbeaterId;
40+
41+
public AutoHeartbeater(
42+
long period,
43+
long initialDelay,
44+
TimeUnit periodTimeUnit,
45+
ActivityExecutionContext context,
46+
Object details) {
47+
this.period = period;
48+
this.initialDelay = initialDelay;
49+
this.periodTimeUnit = periodTimeUnit;
50+
this.context = context;
51+
this.details = details;
52+
// Set to activity id better, for sample we just use type
53+
heartbeaterId = context.getInfo().getActivityType();
54+
}
55+
56+
public ScheduledFuture<?> start() {
57+
System.out.println("Autoheartbeater[" + heartbeaterId + "] starting...");
58+
return timerService.scheduleAtFixedRate(
59+
() -> {
60+
try {
61+
System.out.println(
62+
"Autoheartbeater["
63+
+ heartbeaterId
64+
+ "]"
65+
+ "heartbeating at: "
66+
+ printShortCurrentTime());
67+
context.heartbeat(details);
68+
} catch (Exception e) {
69+
System.out.println("Stopping: " + e.getMessage());
70+
stop();
71+
}
72+
},
73+
initialDelay,
74+
period,
75+
periodTimeUnit);
76+
}
77+
78+
public void stop() {
79+
System.out.println("Autoheartbeater being requested to stop.");
80+
// Try not to execute another heartbeat that could have been queued up
81+
timerService.shutdownNow();
82+
}
83+
84+
private String printShortCurrentTime() {
85+
return DateTimeFormatter.ofPattern("HH:mm:ss")
86+
.withZone(ZoneId.systemDefault())
87+
.format(Instant.now());
88+
}
89+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat;
21+
22+
import io.temporal.workflow.SignalMethod;
23+
import io.temporal.workflow.WorkflowInterface;
24+
import io.temporal.workflow.WorkflowMethod;
25+
26+
@WorkflowInterface
27+
public interface AutoWorkflow {
28+
@WorkflowMethod
29+
String exec(String input);
30+
31+
@SignalMethod
32+
void cancelActivity();
33+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat;
21+
22+
import io.temporal.activity.ActivityOptions;
23+
import io.temporal.failure.ActivityFailure;
24+
import io.temporal.failure.CanceledFailure;
25+
import io.temporal.workflow.Async;
26+
import io.temporal.workflow.CancellationScope;
27+
import io.temporal.workflow.Promise;
28+
import io.temporal.workflow.Workflow;
29+
import java.time.Duration;
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
33+
public class AutoWorkflowImpl implements AutoWorkflow {
34+
private CancellationScope scope;
35+
36+
@Override
37+
public String exec(String input) {
38+
// Crete separate workflow stubs for same interface so we can show
39+
// use of different heartbeat timeouts
40+
AutoActivities activitiesOne =
41+
Workflow.newActivityStub(
42+
AutoActivities.class,
43+
ActivityOptions.newBuilder()
44+
.setStartToCloseTimeout(Duration.ofSeconds(22))
45+
.setHeartbeatTimeout(Duration.ofSeconds(3))
46+
.build());
47+
48+
AutoActivities activitiesTwo =
49+
Workflow.newActivityStub(
50+
AutoActivities.class,
51+
ActivityOptions.newBuilder()
52+
.setStartToCloseTimeout(Duration.ofSeconds(22))
53+
.setHeartbeatTimeout(Duration.ofSeconds(5))
54+
.build());
55+
56+
// Start our activity in CancellationScope so we can cancel it if needed
57+
List<Promise<String>> activityPromises = new ArrayList<>();
58+
scope =
59+
Workflow.newCancellationScope(
60+
() -> {
61+
activityPromises.add(Async.function(activitiesOne::runActivityOne, input));
62+
activityPromises.add(Async.function(activitiesTwo::runActivityTwo, input));
63+
});
64+
65+
scope.run();
66+
67+
try {
68+
Promise.allOf(activityPromises).get();
69+
String result = "";
70+
for (Promise<String> pr : activityPromises) {
71+
result += pr.get();
72+
}
73+
return result;
74+
} catch (ActivityFailure e) {
75+
if (e.getCause() instanceof CanceledFailure) {
76+
// We dont want workflow to fail in we canceled our scope, just log and return
77+
return "Workflow result after activity cancellation";
78+
} else {
79+
// We want to fail execution on any other failures except cancellation
80+
throw e;
81+
}
82+
}
83+
}
84+
85+
@Override
86+
public void cancelActivity() {
87+
scope.cancel("Canceling scope from signal handler");
88+
}
89+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat;
21+
22+
import io.temporal.client.WorkflowClient;
23+
import io.temporal.client.WorkflowOptions;
24+
import io.temporal.serviceclient.WorkflowServiceStubs;
25+
import io.temporal.worker.Worker;
26+
import io.temporal.worker.WorkerFactory;
27+
28+
public class Starter {
29+
static final String TASK_QUEUE = "AutoheartbeatTaskQueue";
30+
static final String WORKFLOW_ID = "AutoHeartbeatWorkflow";
31+
32+
public static void main(String[] args) {
33+
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
34+
WorkflowClient client = WorkflowClient.newInstance(service);
35+
WorkerFactory factory = WorkerFactory.newInstance(client);
36+
Worker worker = factory.newWorker(TASK_QUEUE);
37+
38+
worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class);
39+
worker.registerActivitiesImplementations(new AutoActivitiesImpl());
40+
41+
factory.start();
42+
43+
AutoWorkflow workflow =
44+
client.newWorkflowStub(
45+
AutoWorkflow.class,
46+
WorkflowOptions.newBuilder()
47+
.setWorkflowId(WORKFLOW_ID)
48+
.setTaskQueue(TASK_QUEUE)
49+
.build());
50+
51+
try {
52+
String result = workflow.exec("Auto heartbeating is cool");
53+
System.out.println("Result: " + result);
54+
} catch (Exception e) {
55+
System.out.println("Workflow exec exception: " + e.getClass().getName());
56+
}
57+
}
58+
}

0 commit comments

Comments
 (0)