Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ See the README.md file in each main sample directory for cut/paste Gradle comman
- [**HelloDelayedStart**](/core/src/main/java/io/temporal/samples/hello/HelloDelayedStart.java): Demonstrates how to use delayed start config option when starting a Workflow Executions.
- [**HelloSignalWithTimer**](/core/src/main/java/io/temporal/samples/hello/HelloSignalWithTimer.java): Demonstrates how to use collect signals for certain amount of time and then process last one.
- [**HelloWorkflowTimer**](/core/src/main/java/io/temporal/samples/hello/HelloWorkflowTimer.java): Demonstrates how we can use workflow timer to restrict duration of workflow execution instead of workflow run/execution timeouts.
- [**Auto-Heartbeating**](/core/src/main/java/io/temporal/samples/autoheartbeat/): Demonstrates use of Auto-heartbeating utility via activity interceptor.


#### Scenario-based samples
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat;

import io.temporal.activity.ActivityExecutionContext;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class AutoHeartbeatUtil {
private final long period;
private final long initialDelay;
private final TimeUnit periodTimeUnit;
private final ScheduledExecutorService timerService =
Executors.newSingleThreadScheduledExecutor();
private final ActivityExecutionContext context;
private final Object details;
private String heartbeaterId;

public AutoHeartbeatUtil(
long period,
long initialDelay,
TimeUnit periodTimeUnit,
ActivityExecutionContext context,
Object details) {
this.period = period;
this.initialDelay = initialDelay;
this.periodTimeUnit = periodTimeUnit;
this.context = context;
this.details = details;
// Set to activity id better, for sample we just use type
heartbeaterId = context.getInfo().getActivityType();
}

public ScheduledFuture<?> start() {
System.out.println("Autoheartbeater[" + heartbeaterId + "] starting...");
return timerService.scheduleAtFixedRate(
() -> {
// try {
System.out.println(
"Autoheartbeater["
+ heartbeaterId
+ "]"
+ "heartbeating at: "
+ printShortCurrentTime());
context.heartbeat(details);
},
initialDelay,
period,
periodTimeUnit);
}

public void stop() {
System.out.println("Autoheartbeater[" + heartbeaterId + "] being requested to stop.");
// Try not to execute another heartbeat that could have been queued up
// Note this can at times take a second or two so make sure to test this out on your workers
// So can set best heartbeat timeout (sometimes might need larger value to accomodate)
timerService.shutdownNow();
}

private String printShortCurrentTime() {
return DateTimeFormatter.ofPattern("HH:mm:ss")
.withZone(ZoneId.systemDefault())
.format(Instant.now());
}
}
21 changes: 21 additions & 0 deletions core/src/main/java/io/temporal/samples/autoheartbeat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Auto-heartbeating sample for activities that define HeartbeatTimeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sample should be linked from the top level readmen


This sample shows an implementation of an "auto-heartbeating" utility that can be applied via interceptor to all
activities where you define HeartbeatTimeout. Use case where this can be helpful include situations where you have
long-running activities where you want to heartbeat but its difficult to explicitly call heartbeat api in activity code
directly.
Another useful scenario for this is where you have activity that at times can complete in very short amount of time,
but then at times can take for example minutes. In this case you have to set longer StartToClose timeout
but you might not want first heartbeat to be sent right away but send it after the "shorter" duration of activity
execution.

Warning: make sure to test this sample for your use case. This includes load testing. This sample was not
tested on large scale workloads. In addition note that it is recommended to heartbeat from activity code itself. Using
this type of autoheartbeating utility does have disatvantage that activity code itself can continue running after
a handled activity cancelation. Please be aware of these warnings when applying this sample.

1. Start the Sample:

```bash
./gradlew -q execute -PmainClass=io.temporal.samples.autoheartbeat.Starter
```
167 changes: 167 additions & 0 deletions core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.failure.CanceledFailure;
import io.temporal.samples.autoheartbeat.activities.AutoActivitiesImpl;
import io.temporal.samples.autoheartbeat.interceptor.AutoHeartbeatWorkerInterceptor;
import io.temporal.samples.autoheartbeat.workflows.AutoWorkflow;
import io.temporal.samples.autoheartbeat.workflows.AutoWorkflowImpl;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.worker.WorkerFactoryOptions;

public class Starter {
static final String TASK_QUEUE = "AutoheartbeatTaskQueue";
static final String WORKFLOW_ID = "AutoHeartbeatWorkflow";

public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);

// Configure our auto heartbeat workflow interceptor which will apply
// AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat
// timeout configured
WorkerFactoryOptions wfo =
WorkerFactoryOptions.newBuilder()
.setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor())
.build();

WorkerFactory factory = WorkerFactory.newInstance(client, wfo);
Worker worker = factory.newWorker(TASK_QUEUE);

worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class);
worker.registerActivitiesImplementations(new AutoActivitiesImpl());

factory.start();

// first run completes execution with autoheartbeat utils
firstRun(client);
// second run cancels running (pending) activity via signal (specific scope cancel)
secondRun(client);
// third run cancels running execution which cancels activity as well
thirdRun(client);
// fourth run turns off autoheartbeat for activities and lets activity time out on heartbeat
// timeout
fourthRun(client);

System.exit(0);
}

@SuppressWarnings("unused")
private static void firstRun(WorkflowClient client) {
System.out.println("**** First Run: run workflow to completion");
AutoWorkflow firstRun =
client.newWorkflowStub(
AutoWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(WORKFLOW_ID)
.setTaskQueue(TASK_QUEUE)
.build());

try {
String firstRunResult = firstRun.exec("Auto heartbeating is cool");
System.out.println("First run result: " + firstRunResult);
} catch (Exception e) {
System.out.println("First run - Workflow exec exception: " + e.getClass().getName());
}
}

@SuppressWarnings("unused")
private static void secondRun(WorkflowClient client) {
System.out.println("\n\n**** Second Run: cancel activities via signal");
AutoWorkflow secondRun =
client.newWorkflowStub(
AutoWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(WORKFLOW_ID)
.setTaskQueue(TASK_QUEUE)
.build());
WorkflowClient.start(secondRun::exec, "Auto heartbeating is cool");
doSleeps(4);
secondRun.cancelActivity();

try {
String secondRunResult = WorkflowStub.fromTyped(secondRun).getResult(String.class);
System.out.println("Second run result: " + secondRunResult);
} catch (Exception e) {
System.out.println("Second run - Workflow exec exception: " + e.getClass().getName());
}
}

@SuppressWarnings("unused")
private static void thirdRun(WorkflowClient client) {
System.out.println("\n\n**** Third Run: cancel workflow execution");
AutoWorkflow thirdRun =
client.newWorkflowStub(
AutoWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(WORKFLOW_ID)
.setTaskQueue(TASK_QUEUE)
.build());
WorkflowClient.start(thirdRun::exec, "Auto heartbeating is cool");
doSleeps(10);
try {
WorkflowStub.fromTyped(thirdRun).cancel();
String thirdRunResult = WorkflowStub.fromTyped(thirdRun).getResult(String.class);
System.out.println("Third run result: " + thirdRunResult);
} catch (Exception e) {
// we are expecting workflow cancelation
if (e.getCause() instanceof CanceledFailure) {
System.out.println("Third run - Workflow execution canceled.");
} else {
System.out.println("Third run - Workflow exec exception: " + e.getMessage());
}
}
}

@SuppressWarnings("unused")
private static void fourthRun(WorkflowClient client) {
System.out.println("\n\n**** Fourth Run: cause heartbeat timeout");
// we disable autoheartbeat via env var
System.setProperty("sample.disableAutoHeartbeat", "true");
AutoWorkflow fourth =
client.newWorkflowStub(
AutoWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(WORKFLOW_ID)
.setTaskQueue(TASK_QUEUE)
.build());

try {
String fourthRunResult = fourth.exec("Auto heartbeating is cool");
System.out.println("Fourth run result: " + fourthRunResult);
} catch (Exception e) {
System.out.println("Fourth run - Workflow exec exception: " + e.getClass().getName());
}
}

private static void doSleeps(int seconds) {
try {
Thread.sleep(seconds * 1000L);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat.activities;

import io.temporal.activity.ActivityInterface;

@ActivityInterface
public interface AutoActivities {
String runActivityOne(String input);

String runActivityTwo(String input);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat.activities;

import java.util.concurrent.TimeUnit;

public class AutoActivitiesImpl implements AutoActivities {

@Override
public String runActivityOne(String input) {
return runActivity("runActivityOne - " + input, 10);
}

@Override
public String runActivityTwo(String input) {
return runActivity("runActivityTwo - " + input, 5);
}

@SuppressWarnings("FutureReturnValueIgnored")
private String runActivity(String input, int seconds) {
for (int i = 0; i < seconds; i++) {
sleep(1);
}
return "Activity completed: " + input;
}

private void sleep(int seconds) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
} catch (InterruptedException ee) {
// Empty
}
}
}
Loading
Loading