Skip to content

Commit 30db7db

Browse files
authored
Workflow timer sample (#708)
Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io>
1 parent 69db545 commit 30db7db

File tree

3 files changed

+307
-0
lines changed

3 files changed

+307
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ See the README.md file in each main sample directory for cut/paste Gradle comman
7878
- [**HelloUpdate**](/core/src/main/java/io/temporal/samples/hello/HelloUpdate.java): Demonstrates how to create and interact with an Update.
7979
- [**HelloDelayedStart**](/core/src/main/java/io/temporal/samples/hello/HelloDelayedStart.java): Demonstrates how to use delayed start config option when starting a Workflow Executions.
8080
- [**HelloSignalWithTimer**](/core/src/main/java/io/temporal/samples/hello/HelloSignalWithTimer.java): Demonstrates how to use collect signals for certain amount of time and then process last one.
81+
- [**HelloWorkflowTimer**](/core/src/main/java/io/temporal/samples/hello/HelloWorkflowTimer.java): Demonstrates how we can use workflow timer to restrict duration of workflow execution instead of workflow run/execution timeouts.
8182

8283

8384
#### Scenario-based samples
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.hello;
21+
22+
import io.temporal.activity.*;
23+
import io.temporal.client.ActivityCompletionException;
24+
import io.temporal.client.WorkflowClient;
25+
import io.temporal.client.WorkflowOptions;
26+
import io.temporal.client.WorkflowStub;
27+
import io.temporal.failure.ActivityFailure;
28+
import io.temporal.failure.CanceledFailure;
29+
import io.temporal.failure.ChildWorkflowFailure;
30+
import io.temporal.serviceclient.WorkflowServiceStubs;
31+
import io.temporal.worker.Worker;
32+
import io.temporal.worker.WorkerFactory;
33+
import io.temporal.workflow.*;
34+
import java.time.Duration;
35+
36+
/** Sample shows how to use workflow timer instead of WorkflowOptions->Run/ExecutionTimeout */
37+
public class HelloWorkflowTimer {
38+
private static String WORKFLOW_ID = "HelloWorkflowWithTimer";
39+
private static String TASK_QUEUE = "HelloWorkflowWithTimerTaskQueue";
40+
// Change time to 12 to 20 seconds to handle cancellation while child workflow is running
41+
private static int TIME_SECS = 8;
42+
43+
// Workflow
44+
@WorkflowInterface
45+
public interface WorkflowWithTimer {
46+
@WorkflowMethod
47+
String execute(String input);
48+
}
49+
50+
public static class WorkflowWithTimerImpl implements WorkflowWithTimer {
51+
// Our timer cancellation scope
52+
private CancellationScope timerCancellationScope;
53+
// Our workflow cancellation scope
54+
private CancellationScope workflowCancellationScope;
55+
// Workflow result
56+
private String workflowResult = "";
57+
private Promise<Void> workflowTimerPromise;
58+
59+
@Override
60+
public String execute(String input) {
61+
// Create workflow timer (within timer cancel;ation scope so it can be canceled)
62+
// which denotes the max amount of time we allow this execution to run
63+
// Using workflow timer instead of workflow run/execution timeouts allow us to react to this
64+
// timer
65+
// fires, be able to chose if we want to fail or complete execution, and do some "cleanup"
66+
// tasks if
67+
// necessary before we do so. If we used workflow run/execution timeouts insted we would not
68+
// be able
69+
// to react to this timer firing (its server timer only)
70+
timerCancellationScope =
71+
Workflow.newCancellationScope(
72+
() -> {
73+
workflowTimerPromise =
74+
Workflow.newTimer(
75+
Duration.ofSeconds(TIME_SECS),
76+
TimerOptions.newBuilder().setSummary("Workflow Timer").build())
77+
// We can use thenApply here to cancel our cancelation scope when this timer
78+
// fires. Note we cannot complete the execution from here, see
79+
// https://github.com/temporalio/sdk-java/issues/87
80+
.thenApply(
81+
ignore -> {
82+
// Cancel the workflow cancellation scope allowing us to react to this
83+
// timer firing
84+
if (workflowCancellationScope != null) {
85+
workflowCancellationScope.cancel("Workflow timer fired");
86+
}
87+
return null;
88+
});
89+
});
90+
timerCancellationScope.run();
91+
92+
// Create workflow cancellation scope in which we put our core business logic
93+
workflowCancellationScope =
94+
Workflow.newCancellationScope(
95+
() -> {
96+
WorkflowWithTimerActivities activities =
97+
Workflow.newActivityStub(
98+
WorkflowWithTimerActivities.class,
99+
ActivityOptions.newBuilder()
100+
.setStartToCloseTimeout(Duration.ofSeconds(12))
101+
// Set heartbeat timeout to 1s
102+
.setHeartbeatTimeout(Duration.ofSeconds(2))
103+
// We want to wait for activity to complete cancellation
104+
.setCancellationType(
105+
ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
106+
.build());
107+
108+
WorkflowWithTimerChildWorkflow childWorkflow =
109+
Workflow.newChildWorkflowStub(
110+
WorkflowWithTimerChildWorkflow.class,
111+
ChildWorkflowOptions.newBuilder()
112+
.setWorkflowId(WORKFLOW_ID + "-Child")
113+
// We want to wait for child workflow cancellation completion
114+
.setCancellationType(
115+
ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED)
116+
.build());
117+
118+
try {
119+
// Run our activities
120+
workflowResult = activities.sayHello(input);
121+
// Then our child workflow
122+
childWorkflow.executeChild(input);
123+
} catch (ActivityFailure af) {
124+
// Handle cancellation of scope while activities are pending (running)
125+
if (af.getCause() instanceof CanceledFailure) {
126+
workflowResult = "Workflow timer fired while activities were executing.";
127+
// Here we can do more work if needed
128+
}
129+
} catch (ChildWorkflowFailure cwf) {
130+
// Handle cancellation of scope while child workflow is pending (running)
131+
if (cwf.getCause() instanceof CanceledFailure) {
132+
workflowResult = "Workflow timer fired while child workflow was executing.";
133+
// Here we can do more work if needed
134+
}
135+
}
136+
});
137+
// Run the workflow cancellation scope
138+
// We need to handle CanceledFailure here in case we cancel the scope
139+
// right before activity/child workflows are scheduled
140+
try {
141+
workflowCancellationScope.run();
142+
} catch (CanceledFailure e) {
143+
workflowResult = "Workflow cancelled.";
144+
}
145+
146+
// Cancel our workflow timer if it didnt fire
147+
if (!workflowTimerPromise.isCompleted()) {
148+
timerCancellationScope.cancel("Workflow completed before workflow timer.");
149+
}
150+
151+
return workflowResult;
152+
}
153+
}
154+
155+
// Activities
156+
@ActivityInterface
157+
public interface WorkflowWithTimerActivities {
158+
String sayHello(String input);
159+
}
160+
161+
public static class WorkflowWithTimerActivitiesImpl implements WorkflowWithTimerActivities {
162+
@Override
163+
public String sayHello(String input) {
164+
// here we just heartbeat then sleep for 1s
165+
for (int i = 0; i < 10; i++) {
166+
try {
167+
Activity.getExecutionContext().heartbeat("heartbeating: " + i);
168+
} catch (ActivityCompletionException e) {
169+
// Do some cleanup if needed, then re-throw
170+
throw e;
171+
}
172+
sleep(1);
173+
}
174+
return "Hello " + input;
175+
}
176+
177+
// Just sample sleep method
178+
private void sleep(int seconds) {
179+
try {
180+
Thread.sleep(seconds * 1000L);
181+
} catch (Exception e) {
182+
System.out.println(e.getMessage());
183+
}
184+
}
185+
}
186+
187+
// Child Workflows
188+
@WorkflowInterface
189+
public interface WorkflowWithTimerChildWorkflow {
190+
@WorkflowMethod
191+
String executeChild(String input);
192+
}
193+
194+
public static class WorkflowWithTimerChildWorkflowImpl implements WorkflowWithTimerChildWorkflow {
195+
@Override
196+
public String executeChild(String input) {
197+
// For sample we just sleep for 5 seconds and return some result
198+
try {
199+
Workflow.sleep(Duration.ofSeconds(5));
200+
return "From executeChild - " + input;
201+
// Note that similarly to parent workflow if child is running activities/child workflows
202+
// we need to handle this in same way as parent does
203+
// Fpr sample we can just handle CanceledFailure and rethrow
204+
} catch (CanceledFailure e) {
205+
// Can do cleanup if needed
206+
throw e;
207+
}
208+
}
209+
}
210+
211+
public static void main(String[] args) {
212+
// Create service stubs
213+
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
214+
// Crete workflow client
215+
WorkflowClient client = WorkflowClient.newInstance(service);
216+
// Create worker factory
217+
WorkerFactory factory = WorkerFactory.newInstance(client);
218+
219+
// Create worker
220+
Worker worker = factory.newWorker(TASK_QUEUE);
221+
// Register workflow and child workflow
222+
worker.registerWorkflowImplementationTypes(
223+
WorkflowWithTimerImpl.class, WorkflowWithTimerChildWorkflowImpl.class);
224+
// Register activities
225+
worker.registerActivitiesImplementations(new WorkflowWithTimerActivitiesImpl());
226+
227+
// Start factory (and worker)
228+
factory.start();
229+
230+
// Create workflow stub
231+
WorkflowWithTimer workflow =
232+
client.newWorkflowStub(
233+
WorkflowWithTimer.class,
234+
WorkflowOptions.newBuilder()
235+
// Note we do not set workflow run/execution timeouts
236+
// As its not recommended in most cases
237+
// In same we show how we can implement this with workflow timer instead
238+
.setWorkflowId(WORKFLOW_ID)
239+
.setTaskQueue(TASK_QUEUE)
240+
.build());
241+
242+
// Start workflow execution async
243+
WorkflowClient.start(workflow::execute, "Some Name Here");
244+
245+
// Wait for execution to complete (sync)
246+
WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow);
247+
String result = workflowStub.getResult(String.class);
248+
System.out.println("Workflow result: " + result);
249+
250+
// Stop main method
251+
System.exit(0);
252+
}
253+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.hello;
21+
22+
import io.temporal.client.WorkflowOptions;
23+
import io.temporal.testing.TestWorkflowRule;
24+
import org.junit.Assert;
25+
import org.junit.Rule;
26+
import org.junit.Test;
27+
28+
public class HelloWorkflowTimerTest {
29+
@Rule
30+
public TestWorkflowRule testWorkflowRule =
31+
TestWorkflowRule.newBuilder()
32+
.setWorkflowTypes(
33+
HelloWorkflowTimer.WorkflowWithTimerImpl.class,
34+
HelloWorkflowTimer.WorkflowWithTimerChildWorkflowImpl.class)
35+
.setActivityImplementations(new HelloWorkflowTimer.WorkflowWithTimerActivitiesImpl())
36+
.build();
37+
38+
@Test
39+
public void testWorkflowTimer() {
40+
HelloWorkflowTimer.WorkflowWithTimer workflow =
41+
testWorkflowRule
42+
.getWorkflowClient()
43+
.newWorkflowStub(
44+
HelloWorkflowTimer.WorkflowWithTimer.class,
45+
WorkflowOptions.newBuilder()
46+
.setWorkflowId("WorkflowWithTimerTestId")
47+
.setTaskQueue(testWorkflowRule.getTaskQueue())
48+
.build());
49+
50+
String result = workflow.execute("test input");
51+
Assert.assertEquals("Workflow timer fired while activities were executing.", result);
52+
}
53+
}

0 commit comments

Comments
 (0)