Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ See the README.md file in each main sample directory for cut/paste Gradle comman

- [**Custom Annotation**](/core/src/main/java/io/temporal/samples/customannotation): Demonstrates how to create a custom annotation using an interceptor.

- [**Asnyc Packet Delivery**](/core/src/main/java/io/temporal/samples/packetdelivery): Demonstrates running multiple execution paths async within single execution.


#### API demonstrations

- [**Async Untyped Child Workflow**](/core/src/main/java/io/temporal/samples/asyncuntypedchild): Demonstrates how to invoke an untyped child workflow async, that can complete after parent workflow is already completed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.temporal.samples.packetdelivery;

public class Packet {
private int id;
private String content;

public Packet() {}

public Packet(int id, String content) {
this.id = id;
this.content = content;
}

public int getId() {
return id;
}

public String getContent() {
return content;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package io.temporal.samples.packetdelivery;

import io.temporal.activity.ActivityOptions;
import io.temporal.failure.ActivityFailure;
import io.temporal.failure.CanceledFailure;
import io.temporal.workflow.*;
import java.time.Duration;
import org.slf4j.Logger;

public class PacketDelivery {
private Packet packet;
private boolean deliveryConfirmation = false;
private boolean needDeliveryConfirmation = false;
private CompletablePromise delivered = Workflow.newPromise();
private CancellationScope cancellationScope;

private Logger logger = Workflow.getLogger(this.getClass().getName());

private final PacketDeliveryActivities activities =
Workflow.newActivityStub(
PacketDeliveryActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(5))
.setHeartbeatTimeout(Duration.ofSeconds(2))
.build());

private final PacketDeliveryActivities compensationActivities =
Workflow.newActivityStub(
PacketDeliveryActivities.class,
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(3)).build());

public PacketDelivery(Packet packet) {
this.packet = packet;
processDeliveryAsync();
}

public Promise<Void> getDelivered() {
return delivered;
}

public void processDeliveryAsync() {
delivered.completeFrom(Async.procedure(this::processDelivery));
}

public void processDelivery() {
cancellationScope =
Workflow.newCancellationScope(
() -> {
String deliveryConfirmationResult = "";
while (!deliveryConfirmationResult.equals(PacketUtils.COMPLETION_SUCCESS)) {
// Step 1 perform delivery
logger.info(
"** Performing delivery for packet: "
+ packet.getId()
+ " - "
+ packet.getContent());
activities.performDelivery(packet);
// Step 2 wait for delivery confirmation
logger.info(
"** Delivery for packet: "
+ packet.getId()
+ " - "
+ packet.getContent()
+ " awaiting delivery confirmation");
needDeliveryConfirmation = true;
Workflow.await(() -> deliveryConfirmation);
logger.info(
"** Delivery for packet: "
+ packet.getId()
+ " - "
+ packet.getContent()
+ " received confirmation");
// Step 3 complete delivery processing
logger.info(
"** Completing delivery for packet: "
+ packet.getId()
+ " - "
+ packet.getContent());
deliveryConfirmationResult = activities.completeDelivery(packet);
// Reset deliveryConfirmation and needDeliveryConfirmation
deliveryConfirmation = false;
needDeliveryConfirmation = false;
}
});

try {
cancellationScope.run();
} catch (Exception e) {
if (e instanceof ActivityFailure) {
ActivityFailure activityFailure = (ActivityFailure) e;
if (activityFailure.getCause() instanceof CanceledFailure) {
// Run compensation activity and complete
compensationActivities.compensateDelivery(packet);
}
}
// Just for show for example that cancel could come in while we are waiting on approval signal
// too
else if (e instanceof CanceledFailure) {
needDeliveryConfirmation = false;
// Run compensation activity and complete
compensationActivities.compensateDelivery(packet);
}
return;
}
}

public void confirmDelivery() {
this.deliveryConfirmation = true;
}

public void cancelDelivery(String reason) {
if (cancellationScope != null) {
cancellationScope.cancel(reason);
}
}

public boolean isNeedDeliveryConfirmation() {
return needDeliveryConfirmation;
}

public Packet getPacket() {
return packet;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.temporal.samples.packetdelivery;

import io.temporal.activity.ActivityInterface;
import java.util.List;

@ActivityInterface
public interface PacketDeliveryActivities {
List<Packet> generatePackets();

void performDelivery(Packet packet);

String completeDelivery(Packet packet);

String compensateDelivery(Packet packet);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package io.temporal.samples.packetdelivery;

import io.temporal.activity.Activity;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.client.ActivityCompletionException;
import io.temporal.client.WorkflowClient;
import java.util.*;

public class PacketDeliveryActivitiesImpl implements PacketDeliveryActivities {
private List<Packet> packets =
Arrays.asList(
new Packet(1, "books"),
new Packet(2, "jewelry"),
new Packet(3, "furniture"),
new Packet(4, "food"),
new Packet(5, "electronics"));
private WorkflowClient client;

public PacketDeliveryActivitiesImpl(WorkflowClient client) {
this.client = client;
}

@Override
public List<Packet> generatePackets() {
return packets;
}

@Override
public void performDelivery(Packet packet) {
ActivityExecutionContext context = Activity.getExecutionContext();
System.out.println(
"** Activity - Performing delivery for packet: "
+ packet.getId()
+ " with content: "
+ packet.getContent());
for (int i = 0; i < 4; i++) {
try {
// Perform the heartbeat. Used to notify the workflow that activity execution is alive
context.heartbeat(i);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea here to have a "long-running" activity that can be cancelled? Activity execution latency in this example is between 5-7 ms and heartbeatTimeout is 2 seconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its to handle cancelation in activity, similar to HelloCancellation sample

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understand, . Of all the times I’ve run the example, none have resulted in a cancelled activity. maybe 7 ms is not enough, but I will try again later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you can change the "percentages" of cancel / failure to increase change for cancelation locally

} catch (ActivityCompletionException e) {
System.out.println(
"** Activity - Canceling delivery activity for packet: "
+ packet.getId()
+ " with content: "
+ packet.getContent());
throw e;
}
}
}

@Override
public String completeDelivery(Packet packet) {
ActivityExecutionContext context = Activity.getExecutionContext();
System.out.println(
"** Activity - Completing delivery for package: "
+ packet.getId()
+ " with content: "
+ packet.getContent());
for (int i = 0; i < 4; i++) {
try {
// Perform the heartbeat. Used to notify the workflow that activity execution is alive
context.heartbeat(i);
} catch (ActivityCompletionException e) {
System.out.println(
"** Activity - Canceling complete delivery activity for packet: "
+ packet.getId()
+ " with content: "
+ packet.getContent());
throw e;
}
}
// For sample we just confirm
return randomCompletionDeliveryResult(packet);
}

@Override
public String compensateDelivery(Packet packet) {
System.out.println(
"** Activity - Compensating delivery for package: "
+ packet.getId()
+ " with content: "
+ packet.getContent());
sleep(1);
return PacketUtils.COMPENSATION_COMPLETED;
}

/**
* For this sample activity completion result can drive if 1. Delivery confirmation is completed,
* in which case we complete delivery 2. Delivery confirmation is failed, in which case we run the
* delivery again 3. Delivery confirmation is cancelled, in which case we want to cancel delivery
* and perform "cleanup activity" Note that any delivery can cancel itself OR another delivery, so
* for example Furniure delivery can cancel the Food delivery. For sample we have some specific
* rules Which delivery can cancel which
*/
private String randomCompletionDeliveryResult(Packet packet) {
Random random = new Random();
double randomValue = random.nextDouble();
if (randomValue < 0.10) { // 10% chance for delivery completion to be canceled
int toCancelDelivery = determineCancelRules(packet);
System.out.println(
"** Activity - Delivery completion result for package: "
+ packet.getId()
+ " with content: "
+ packet.getContent()
+ ": "
+ "Cancelling delivery: "
+ toCancelDelivery);

// send cancellation signal for packet to be canceled
PacketDeliveryWorkflow packetWorkflow =
client.newWorkflowStub(
PacketDeliveryWorkflow.class,
Activity.getExecutionContext().getInfo().getWorkflowId());
packetWorkflow.cancelDelivery(toCancelDelivery, "canceled from delivery " + packet.getId());

return PacketUtils.COMPLETION_CANCELLED;
}
if (randomValue < 0.20) { // 20% chance for delivery completion to fail
System.out.println(
"** Activity - Delivery completion result for package: "
+ packet.getId()
+ " with content: "
+ packet.getContent()
+ ": "
+ "Failed");
return PacketUtils.COMPLETION_FAILURE;
}

System.out.println(
"** Activity - Delivery completion result for package: "
+ packet.getId()
+ " with content: "
+ packet.getContent()
+ ": "
+ "Successful");
return PacketUtils.COMPLETION_SUCCESS;
}

private void sleep(int seconds) {
try {
Thread.sleep(seconds * 1000L);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}

/**
* Sample rules for canceling different deliveries We just rotate the list 1-5 (packet ids) by
* packet id and return first result
*/
private int determineCancelRules(Packet packet) {
List<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));
Collections.rotate(list, packet.getId());
System.out.println(
"** Activity - Package delivery : "
+ packet.getId()
+ " canceling package delivery: "
+ list.get(0));
return list.get(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.temporal.samples.packetdelivery;

import io.temporal.workflow.QueryMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.util.List;

@WorkflowInterface
public interface PacketDeliveryWorkflow {
@WorkflowMethod
String execute();

@SignalMethod
void confirmDelivery(int deliveryId);

@SignalMethod
void cancelDelivery(int deliveryId, String reason);

@QueryMethod
List<Packet> deliveryConfirmationPackets();
}
Loading
Loading