Skip to content

Commit d6629ab

Browse files
Add interceptor to propagate headers to Nexus operations (#719)
Add interceptor to propagate headers to Nexus operations
1 parent 46e0a6a commit d6629ab

File tree

12 files changed

+680
-2
lines changed

12 files changed

+680
-2
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,14 @@ See the README.md file in each main sample directory for cut/paste Gradle comman
149149

150150
- [**AWS Encryption SDK**](/core/src/main/java/io/temporal/samples/keymanagementencryption/awsencryptionsdk): Demonstrates how to use the AWS Encryption SDK to encrypt and decrypt payloads with AWS KMS.
151151

152+
#### Nexus Samples
153+
154+
- [**Getting Started**](/core/src/main/java/io/temporal/samples/nexus): Demonstrates how to get started with Temporal and Nexus.
155+
156+
- [**Cancellation**](/core/src/main/java/io/temporal/samples/nexuscancellation): Demonstrates how to cancel an async Nexus operation.
157+
158+
- [**Context/Header Propagation**](/core/src/main/java/io/temporal/samples/nexuscontextpropagation): Demonstrates how to propagate context through Nexus operation headers.
159+
152160
<!-- @@@SNIPEND -->
153161

154162
### Running SpringBoot Samples

core/src/main/java/io/temporal/samples/nexus/options/ClientOptions.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,13 @@
3232
import org.apache.commons.cli.*;
3333

3434
public class ClientOptions {
35+
3536
public static WorkflowClient getWorkflowClient(String[] args) {
37+
return getWorkflowClient(args, WorkflowClientOptions.newBuilder());
38+
}
39+
40+
public static WorkflowClient getWorkflowClient(
41+
String[] args, WorkflowClientOptions.Builder clientOptions) {
3642
Options options = new Options();
3743
Option targetHostOption = new Option("target-host", true, "Host:port for the Temporal service");
3844
targetHostOption.setRequired(false);
@@ -139,7 +145,6 @@ public static WorkflowClient getWorkflowClient(String[] args) {
139145

140146
WorkflowServiceStubs service =
141147
WorkflowServiceStubs.newServiceStubs(serviceStubOptionsBuilder.build());
142-
return WorkflowClient.newInstance(
143-
service, WorkflowClientOptions.newBuilder().setNamespace(namespace).build());
148+
return WorkflowClient.newInstance(service, clientOptions.setNamespace(namespace).build());
144149
}
145150
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Nexus Context Propagation
2+
3+
This sample shows how to propagate MDC context values from Workflows to Nexus operations.
4+
Nexus does not support `ContextPropagator` since the header format is not compatible. Users should look at `NexusMDCContextInterceptor` for propagating MDC context values.
5+
6+
From more details on Nexus and how to setup to run this samples please see the [Nexus Sample](../nexus/README.MD).
7+
8+
In separate terminal windows:
9+
10+
### Nexus handler worker
11+
12+
```
13+
./gradlew -q execute -PmainClass=io.temporal.samples.nexuscontextpropagation.handler.HandlerWorker \
14+
--args="-target-host localhost:7233 -namespace my-target-namespace"
15+
```
16+
17+
### Nexus caller worker
18+
19+
```
20+
./gradlew -q execute -PmainClass=io.temporal.samples.nexuscontextpropagation.caller.CallerWorker \
21+
--args="-target-host localhost:7233 -namespace my-caller-namespace"
22+
```
23+
24+
### Start caller workflow
25+
26+
```
27+
./gradlew -q execute -PmainClass=io.temporal.samples.nexuscontextpropagation.caller.CallerStarter \
28+
--args="-target-host localhost:7233 -namespace my-caller-namespace"
29+
```
30+
31+
### Output
32+
33+
which should result in this on the caller side:
34+
```
35+
INFO i.t.s.n.caller.CallerStarter - Started EchoCallerWorkflow workflowId: 7ac97cb9-b457-4052-af94-d82478c35c5e runId: 01954eb9-6963-7b52-9a1d-b74e64643846
36+
INFO i.t.s.n.caller.CallerStarter - Workflow result: Nexus Echo 👋
37+
INFO i.t.s.n.caller.CallerStarter - Started HelloCallerWorkflow workflowId: 9e0bc89c-5709-4742-b7c0-868464c2fccf runId: 01954eb9-6ae3-7d6d-b355-71545688309d
38+
INFO i.t.s.n.caller.CallerStarter - Workflow result: Hello Nexus 👋
39+
```
40+
41+
And this on the handler side:
42+
```
43+
INFO i.t.s.n.handler.NexusServiceImpl - Echo called from a workflow with ID : 7ac97cb9-b457-4052-af94-d82478c35c5e
44+
INFO i.t.s.n.h.HelloHandlerWorkflowImpl - HelloHandlerWorkflow called from a workflow with ID : 9e0bc89c-5709-4742-b7c0-868464c2fccf
45+
```
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.nexuscontextpropagation.caller;
21+
22+
import io.temporal.api.common.v1.WorkflowExecution;
23+
import io.temporal.client.WorkflowClient;
24+
import io.temporal.client.WorkflowClientOptions;
25+
import io.temporal.client.WorkflowOptions;
26+
import io.temporal.samples.nexus.caller.EchoCallerWorkflow;
27+
import io.temporal.samples.nexus.caller.HelloCallerWorkflow;
28+
import io.temporal.samples.nexus.options.ClientOptions;
29+
import io.temporal.samples.nexus.service.NexusService;
30+
import io.temporal.samples.nexuscontextpropagation.propogation.MDCContextPropagator;
31+
import java.util.Collections;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
public class CallerStarter {
36+
private static final Logger logger = LoggerFactory.getLogger(CallerStarter.class);
37+
38+
public static void main(String[] args) {
39+
WorkflowClient client =
40+
ClientOptions.getWorkflowClient(
41+
args,
42+
WorkflowClientOptions.newBuilder()
43+
.setContextPropagators(Collections.singletonList(new MDCContextPropagator())));
44+
45+
WorkflowOptions workflowOptions =
46+
WorkflowOptions.newBuilder().setTaskQueue(CallerWorker.DEFAULT_TASK_QUEUE_NAME).build();
47+
EchoCallerWorkflow echoWorkflow =
48+
client.newWorkflowStub(EchoCallerWorkflow.class, workflowOptions);
49+
WorkflowExecution execution = WorkflowClient.start(echoWorkflow::echo, "Nexus Echo 👋");
50+
logger.info(
51+
"Started EchoCallerWorkflow workflowId: {} runId: {}",
52+
execution.getWorkflowId(),
53+
execution.getRunId());
54+
logger.info("Workflow result: {}", echoWorkflow.echo("Nexus Echo 👋"));
55+
HelloCallerWorkflow helloWorkflow =
56+
client.newWorkflowStub(HelloCallerWorkflow.class, workflowOptions);
57+
execution = WorkflowClient.start(helloWorkflow::hello, "Nexus", NexusService.Language.EN);
58+
logger.info(
59+
"Started HelloCallerWorkflow workflowId: {} runId: {}",
60+
execution.getWorkflowId(),
61+
execution.getRunId());
62+
logger.info("Workflow result: {}", helloWorkflow.hello("Nexus", NexusService.Language.ES));
63+
}
64+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.nexuscontextpropagation.caller;
21+
22+
import io.temporal.client.WorkflowClient;
23+
import io.temporal.samples.nexus.options.ClientOptions;
24+
import io.temporal.samples.nexuscontextpropagation.propogation.NexusMDCContextInterceptor;
25+
import io.temporal.worker.Worker;
26+
import io.temporal.worker.WorkerFactory;
27+
import io.temporal.worker.WorkerFactoryOptions;
28+
import io.temporal.worker.WorkflowImplementationOptions;
29+
import io.temporal.workflow.NexusServiceOptions;
30+
import java.util.Collections;
31+
32+
public class CallerWorker {
33+
public static final String DEFAULT_TASK_QUEUE_NAME = "my-caller-workflow-task-queue";
34+
35+
public static void main(String[] args) {
36+
WorkflowClient client = ClientOptions.getWorkflowClient(args);
37+
38+
WorkerFactory factory =
39+
WorkerFactory.newInstance(
40+
client,
41+
WorkerFactoryOptions.newBuilder()
42+
.setWorkerInterceptors(new NexusMDCContextInterceptor())
43+
.build());
44+
45+
Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME);
46+
worker.registerWorkflowImplementationTypes(
47+
WorkflowImplementationOptions.newBuilder()
48+
.setNexusServiceOptions(
49+
Collections.singletonMap(
50+
"NexusService",
51+
NexusServiceOptions.newBuilder().setEndpoint("my-nexus-endpoint-name").build()))
52+
.build(),
53+
EchoCallerWorkflowImpl.class,
54+
HelloCallerWorkflowImpl.class);
55+
56+
factory.start();
57+
}
58+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.nexuscontextpropagation.caller;
21+
22+
import io.temporal.samples.nexus.caller.EchoCallerWorkflow;
23+
import io.temporal.samples.nexus.service.NexusService;
24+
import io.temporal.workflow.NexusOperationOptions;
25+
import io.temporal.workflow.NexusServiceOptions;
26+
import io.temporal.workflow.Workflow;
27+
import java.time.Duration;
28+
import org.slf4j.MDC;
29+
30+
public class EchoCallerWorkflowImpl implements EchoCallerWorkflow {
31+
NexusService nexusService =
32+
Workflow.newNexusServiceStub(
33+
NexusService.class,
34+
NexusServiceOptions.newBuilder()
35+
.setOperationOptions(
36+
NexusOperationOptions.newBuilder()
37+
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
38+
.build())
39+
.build());
40+
41+
@Override
42+
public String echo(String message) {
43+
MDC.put("x-nexus-caller-workflow-id", Workflow.getInfo().getWorkflowId());
44+
return nexusService.echo(new NexusService.EchoInput(message)).getMessage();
45+
}
46+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.nexuscontextpropagation.caller;
21+
22+
import io.temporal.samples.nexus.caller.HelloCallerWorkflow;
23+
import io.temporal.samples.nexus.service.NexusService;
24+
import io.temporal.workflow.NexusOperationHandle;
25+
import io.temporal.workflow.NexusOperationOptions;
26+
import io.temporal.workflow.NexusServiceOptions;
27+
import io.temporal.workflow.Workflow;
28+
import java.time.Duration;
29+
import org.slf4j.MDC;
30+
31+
public class HelloCallerWorkflowImpl implements HelloCallerWorkflow {
32+
NexusService nexusService =
33+
Workflow.newNexusServiceStub(
34+
NexusService.class,
35+
NexusServiceOptions.newBuilder()
36+
.setOperationOptions(
37+
NexusOperationOptions.newBuilder()
38+
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
39+
.build())
40+
.build());
41+
42+
@Override
43+
public String hello(String message, NexusService.Language language) {
44+
MDC.put("x-nexus-caller-workflow-id", Workflow.getInfo().getWorkflowId());
45+
NexusOperationHandle<NexusService.HelloOutput> handle =
46+
Workflow.startNexusOperation(
47+
nexusService::hello, new NexusService.HelloInput(message, language));
48+
// Optionally wait for the operation to be started. NexusOperationExecution will contain the
49+
// operation token in case this operation is asynchronous.
50+
handle.getExecution().get();
51+
return handle.getResult().get().getMessage();
52+
}
53+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.nexuscontextpropagation.handler;
21+
22+
import io.temporal.client.WorkflowClient;
23+
import io.temporal.client.WorkflowClientOptions;
24+
import io.temporal.samples.nexus.options.ClientOptions;
25+
import io.temporal.samples.nexuscontextpropagation.propogation.MDCContextPropagator;
26+
import io.temporal.samples.nexuscontextpropagation.propogation.NexusMDCContextInterceptor;
27+
import io.temporal.worker.Worker;
28+
import io.temporal.worker.WorkerFactory;
29+
import io.temporal.worker.WorkerFactoryOptions;
30+
import java.util.Collections;
31+
32+
public class HandlerWorker {
33+
public static final String DEFAULT_TASK_QUEUE_NAME = "my-handler-task-queue";
34+
35+
public static void main(String[] args) {
36+
WorkflowClient client =
37+
ClientOptions.getWorkflowClient(
38+
args,
39+
WorkflowClientOptions.newBuilder()
40+
.setContextPropagators(Collections.singletonList(new MDCContextPropagator())));
41+
42+
WorkerFactory factory =
43+
WorkerFactory.newInstance(
44+
client,
45+
WorkerFactoryOptions.newBuilder()
46+
.setWorkerInterceptors(new NexusMDCContextInterceptor())
47+
.build());
48+
49+
Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME);
50+
worker.registerWorkflowImplementationTypes(HelloHandlerWorkflowImpl.class);
51+
worker.registerNexusServiceImplementation(new NexusServiceImpl());
52+
53+
factory.start();
54+
}
55+
}

0 commit comments

Comments
 (0)