Skip to content

Commit c6a092e

Browse files
Add Nexus cancellation sample (#718)
* Update Nexus Sample for v1.28.0 * fix typo * Small updates * Add Nexus Cancellation Sample * Update cancellation sample * fix typo * run license check * add log * run spotless
1 parent f51ee95 commit c6a092e

File tree

7 files changed

+347
-0
lines changed

7 files changed

+347
-0
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Nexus Cancellation
2+
3+
This sample shows how to cancel a Nexus operation from a caller workflow.
4+
5+
From more details on Nexus and how to setup to run this samples please see the [Nexus Sample](../nexus/README.MD).
6+
7+
In separate terminal windows:
8+
9+
### Nexus handler worker
10+
11+
```
12+
./gradlew -q execute -PmainClass=io.temporal.samples.nexuscancellation.handler.HandlerWorker \
13+
--args="-target-host localhost:7233 -namespace my-target-namespace"
14+
```
15+
16+
### Nexus caller worker
17+
18+
```
19+
./gradlew -q execute -PmainClass=io.temporal.samples.nexuscancellation.caller.CallerWorker \
20+
--args="-target-host localhost:7233 -namespace my-caller-namespace"
21+
```
22+
23+
### Start caller workflow
24+
25+
```
26+
./gradlew -q execute -PmainClass=io.temporal.samples.nexuscancellation.caller.CallerStarter \
27+
--args="-target-host localhost:7233 -namespace my-caller-namespace"
28+
```
29+
30+
### Output
31+
32+
which should result in:
33+
```
34+
INFO i.t.s.n.caller.CallerStarter - Started workflow workflowId: 326732dd-a2b1-4de7-9ddd-dcee4f9f0229 runId: d580499f-79d5-461d-bd49-6248b4e522ae
35+
INFO i.t.s.n.caller.CallerStarter - Workflow result: Hallo Nexus 👋
36+
```
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.nexuscancellation.caller;
21+
22+
import io.temporal.api.common.v1.WorkflowExecution;
23+
import io.temporal.client.WorkflowClient;
24+
import io.temporal.client.WorkflowOptions;
25+
import io.temporal.samples.nexus.options.ClientOptions;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
public class CallerStarter {
30+
private static final Logger logger = LoggerFactory.getLogger(CallerStarter.class);
31+
32+
public static void main(String[] args) {
33+
WorkflowClient client = ClientOptions.getWorkflowClient(args);
34+
35+
WorkflowOptions workflowOptions =
36+
WorkflowOptions.newBuilder().setTaskQueue(CallerWorker.DEFAULT_TASK_QUEUE_NAME).build();
37+
HelloCallerWorkflow helloWorkflow =
38+
client.newWorkflowStub(HelloCallerWorkflow.class, workflowOptions);
39+
WorkflowExecution execution = WorkflowClient.start(helloWorkflow::hello, "Nexus");
40+
logger.info(
41+
"Started workflow workflowId: {} runId: {}",
42+
execution.getWorkflowId(),
43+
execution.getRunId());
44+
logger.info("Workflow result: {}", helloWorkflow.hello("Nexus"));
45+
}
46+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.nexuscancellation.caller;
21+
22+
import io.temporal.client.WorkflowClient;
23+
import io.temporal.samples.nexus.options.ClientOptions;
24+
import io.temporal.samples.nexus.service.NexusService;
25+
import io.temporal.worker.Worker;
26+
import io.temporal.worker.WorkerFactory;
27+
import io.temporal.worker.WorkflowImplementationOptions;
28+
import io.temporal.workflow.NexusServiceOptions;
29+
import java.util.Collections;
30+
31+
public class CallerWorker {
32+
public static final String DEFAULT_TASK_QUEUE_NAME = "my-caller-workflow-task-queue";
33+
34+
public static void main(String[] args) {
35+
WorkflowClient client = ClientOptions.getWorkflowClient(args);
36+
37+
WorkerFactory factory = WorkerFactory.newInstance(client);
38+
39+
Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME);
40+
worker.registerWorkflowImplementationTypes(
41+
WorkflowImplementationOptions.newBuilder()
42+
.setNexusServiceOptions(
43+
Collections.singletonMap(
44+
NexusService.class.getSimpleName(),
45+
NexusServiceOptions.newBuilder().setEndpoint("my-nexus-endpoint-name").build()))
46+
.build(),
47+
HelloCallerWorkflowImpl.class);
48+
49+
factory.start();
50+
}
51+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.nexuscancellation.caller;
21+
22+
import io.temporal.workflow.WorkflowInterface;
23+
import io.temporal.workflow.WorkflowMethod;
24+
25+
@WorkflowInterface
26+
public interface HelloCallerWorkflow {
27+
@WorkflowMethod
28+
String hello(String message);
29+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.nexuscancellation.caller;
21+
22+
import static io.temporal.samples.nexus.service.NexusService.Language.*;
23+
24+
import io.temporal.failure.CanceledFailure;
25+
import io.temporal.failure.NexusOperationFailure;
26+
import io.temporal.samples.nexus.service.NexusService;
27+
import io.temporal.workflow.*;
28+
import java.time.Duration;
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
import org.slf4j.Logger;
32+
33+
public class HelloCallerWorkflowImpl implements HelloCallerWorkflow {
34+
public static final Logger log = Workflow.getLogger(HelloCallerWorkflowImpl.class);
35+
private static final NexusService.Language[] languages =
36+
new NexusService.Language[] {EN, FR, DE, ES, TR};
37+
NexusService nexusService =
38+
Workflow.newNexusServiceStub(
39+
NexusService.class,
40+
NexusServiceOptions.newBuilder()
41+
.setOperationOptions(
42+
NexusOperationOptions.newBuilder()
43+
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
44+
.build())
45+
.build());
46+
47+
@Override
48+
public String hello(String message) {
49+
List<Promise<NexusService.HelloOutput>> results = new ArrayList<>(languages.length);
50+
51+
/*
52+
* Create our CancellationScope. Within this scope we call the nexus operation asynchronously
53+
* hello method asynchronously for each of our defined languages.
54+
*/
55+
CancellationScope scope =
56+
Workflow.newCancellationScope(
57+
() -> {
58+
for (NexusService.Language language : languages) {
59+
results.add(
60+
Async.function(
61+
nexusService::hello, new NexusService.HelloInput(message, language)));
62+
}
63+
});
64+
65+
/*
66+
* Execute all nexus operations within the CancellationScope. Note that this execution is
67+
* non-blocking as the code inside our cancellation scope is also non-blocking.
68+
*/
69+
scope.run();
70+
71+
// We use "anyOf" here to wait for one of the nexus operation invocations to return
72+
NexusService.HelloOutput result = Promise.anyOf(results).get();
73+
74+
// Trigger cancellation of all uncompleted nexus operations invocations within the cancellation
75+
// scope
76+
scope.cancel();
77+
// Optionally, wait for all nexus operations to complete
78+
//
79+
// Note: Once the workflow completes any pending cancellation requests are dropped by the
80+
// server.
81+
for (Promise<NexusService.HelloOutput> promise : results) {
82+
try {
83+
promise.get();
84+
} catch (NexusOperationFailure e) {
85+
// If the operation was cancelled, we can ignore the failure
86+
if (e.getCause() instanceof CanceledFailure) {
87+
log.info("Operation was cancelled");
88+
continue;
89+
}
90+
throw e;
91+
}
92+
}
93+
return result.getMessage();
94+
}
95+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.nexuscancellation.handler;
21+
22+
import io.temporal.client.WorkflowClient;
23+
import io.temporal.samples.nexus.handler.NexusServiceImpl;
24+
import io.temporal.samples.nexus.options.ClientOptions;
25+
import io.temporal.worker.Worker;
26+
import io.temporal.worker.WorkerFactory;
27+
28+
public class HandlerWorker {
29+
public static final String DEFAULT_TASK_QUEUE_NAME = "my-handler-task-queue";
30+
31+
public static void main(String[] args) {
32+
WorkflowClient client = ClientOptions.getWorkflowClient(args);
33+
34+
WorkerFactory factory = WorkerFactory.newInstance(client);
35+
36+
Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME);
37+
worker.registerWorkflowImplementationTypes(HelloHandlerWorkflowImpl.class);
38+
worker.registerNexusServiceImplementation(new NexusServiceImpl());
39+
40+
factory.start();
41+
}
42+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.nexuscancellation.handler;
21+
22+
import io.temporal.failure.ApplicationFailure;
23+
import io.temporal.samples.nexus.handler.HelloHandlerWorkflow;
24+
import io.temporal.samples.nexus.service.NexusService;
25+
import io.temporal.workflow.Workflow;
26+
import java.time.Duration;
27+
28+
public class HelloHandlerWorkflowImpl implements HelloHandlerWorkflow {
29+
@Override
30+
public NexusService.HelloOutput hello(NexusService.HelloInput input) {
31+
// Sleep for a random duration to simulate some work
32+
Workflow.sleep(Duration.ofSeconds(Workflow.newRandom().nextInt(5)));
33+
switch (input.getLanguage()) {
34+
case EN:
35+
return new NexusService.HelloOutput("Hello " + input.getName() + " 👋");
36+
case FR:
37+
return new NexusService.HelloOutput("Bonjour " + input.getName() + " 👋");
38+
case DE:
39+
return new NexusService.HelloOutput("Hallo " + input.getName() + " 👋");
40+
case ES:
41+
return new NexusService.HelloOutput("¡Hola! " + input.getName() + " 👋");
42+
case TR:
43+
return new NexusService.HelloOutput("Merhaba " + input.getName() + " 👋");
44+
}
45+
throw ApplicationFailure.newFailure(
46+
"Unsupported language: " + input.getLanguage(), "UNSUPPORTED_LANGUAGE");
47+
}
48+
}

0 commit comments

Comments
 (0)