From 2a51eab813cb5f3830a22e20876318743b53d202 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Wed, 9 Jul 2025 17:57:02 -0400 Subject: [PATCH 01/13] [WIP] Async package delivery sample Signed-off-by: Tihomir Surdilovic --- .../samples/packetdelivery/Packet.java | 21 ++++++ .../packetdelivery/PacketDelivery.java | 71 +++++++++++++++++++ .../PacketDeliveryActivities.java | 13 ++++ .../PacketDeliveryActivitiesImpl.java | 47 ++++++++++++ .../PacketDeliveryWorkflow.java | 14 ++++ .../PacketDeliveryWorkflowImpl.java | 44 ++++++++++++ .../samples/packetdelivery/Starter.java | 56 +++++++++++++++ 7 files changed, 266 insertions(+) create mode 100644 core/src/main/java/io/temporal/samples/packetdelivery/Packet.java create mode 100644 core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java create mode 100644 core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivities.java create mode 100644 core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivitiesImpl.java create mode 100644 core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java create mode 100644 core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java create mode 100644 core/src/main/java/io/temporal/samples/packetdelivery/Starter.java diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/Packet.java b/core/src/main/java/io/temporal/samples/packetdelivery/Packet.java new file mode 100644 index 000000000..2d7ee91d4 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/packetdelivery/Packet.java @@ -0,0 +1,21 @@ +package io.temporal.samples.packetdelivery; + +public class Packet { + private int id; + private String content; + + public Packet() {} + + public Packet(int id, String content) { + this.id = id; + this.content = content; + } + + public int getId() { + return id; + } + + public String getContent() { + return content; + } +} diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java new file mode 100644 index 000000000..db0f2abf9 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java @@ -0,0 +1,71 @@ +package io.temporal.samples.packetdelivery; + +import io.temporal.activity.ActivityOptions; +import io.temporal.workflow.Async; +import io.temporal.workflow.CompletablePromise; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; +import java.time.Duration; +import org.slf4j.Logger; + +public class PacketDelivery { + private Packet packet; + private boolean deliveryConfirmation = false; + private CompletablePromise delivered = Workflow.newPromise(); + private String deliveryConfirmationCode = ""; + + private Logger logger = Workflow.getLogger(this.getClass().getName()); + + private final PacketDeliveryActivities activities = + Workflow.newActivityStub( + PacketDeliveryActivities.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(3)).build()); + + public PacketDelivery(Packet packet) { + this.packet = packet; + processDeliveryAsync(); + } + + public Promise getDelivered() { + System.out.println("*************** CHECKING GET DELIVERED!!! : " + delivered); + return delivered; + } + + public void processDeliveryAsync() { + delivered.completeFrom(Async.procedure(this::processDelivery)); + System.out.println("*************** DONE WITH PROCESS DELIVERY ASYNC"); + } + + public void processDelivery() { + while (!deliveryConfirmationCode.equals("Confirmed")) { + // Step 1 perform delivery + logger.info( + "** Performing delivery for packet: " + packet.getId() + " - " + packet.getContent()); + activities.performDelivery(packet); + // Step 2 wait for delivery confirmation + logger.info( + "** Delivery for packet: " + + packet.getId() + + " - " + + packet.getContent() + + " awaiting delivery confirmation"); + Workflow.await(() -> deliveryConfirmation); + logger.info( + "** Delivery for packet: " + + packet.getId() + + " - " + + packet.getContent() + + " received confirmation"); + // Step 3 complete delivery processing + logger.info( + "** Completing delivery for packet: " + packet.getId() + " - " + packet.getContent()); + deliveryConfirmationCode = activities.completeDelivery(packet); + // Reset deliveryConfirmation + deliveryConfirmation = false; + } + } + + public void confirmDelivery() { + this.deliveryConfirmation = true; + } +} diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivities.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivities.java new file mode 100644 index 000000000..f338dbed9 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivities.java @@ -0,0 +1,13 @@ +package io.temporal.samples.packetdelivery; + +import io.temporal.activity.ActivityInterface; +import java.util.List; + +@ActivityInterface +public interface PacketDeliveryActivities { + List generatePackets(); + + void performDelivery(Packet packet); + + String completeDelivery(Packet packet); +} diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivitiesImpl.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivitiesImpl.java new file mode 100644 index 000000000..94f0c30e3 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivitiesImpl.java @@ -0,0 +1,47 @@ +package io.temporal.samples.packetdelivery; + +import java.util.ArrayList; +import java.util.List; + +public class PacketDeliveryActivitiesImpl implements PacketDeliveryActivities { + @Override + public List generatePackets() { + List result = new ArrayList<>(); + result.add(new Packet(1, "books")); + result.add(new Packet(2, "jewelry")); + result.add(new Packet(3, "furniture")); + result.add(new Packet(4, "food")); + result.add(new Packet(5, "electronics")); + return result; + } + + @Override + public void performDelivery(Packet packet) { + System.out.println( + "** Activity - Performing delivery for packet: " + + packet.getId() + + " with content: " + + packet.getContent()); + sleep(2); + } + + @Override + public String completeDelivery(Packet packet) { + System.out.println( + "** Activity - Completing delivery for package: " + + packet.getId() + + " with content: " + + packet.getContent()); + sleep(1); + // for sample we just confirm + return "Confirmed"; + } + + private void sleep(int seconds) { + try { + Thread.sleep(seconds * 1000L); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } +} diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java new file mode 100644 index 000000000..66e85fd2f --- /dev/null +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java @@ -0,0 +1,14 @@ +package io.temporal.samples.packetdelivery; + +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface PacketDeliveryWorkflow { + @WorkflowMethod + String execute(); + + @SignalMethod + void confirmDelivery(int deliveryId); +} diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java new file mode 100644 index 000000000..6cc3704cc --- /dev/null +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java @@ -0,0 +1,44 @@ +package io.temporal.samples.packetdelivery; + +import io.temporal.activity.ActivityOptions; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class PacketDeliveryWorkflowImpl implements PacketDeliveryWorkflow { + + private final Map packetDeliveries = new HashMap<>(); + // private Logger logger = Workflow.getLogger(this.getClass().getName()); + + private final PacketDeliveryActivities activities = + Workflow.newActivityStub( + PacketDeliveryActivities.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(3)).build()); + + @Override + public String execute() { + List> packetsDelivered = new ArrayList<>(); + // Step 1 - upload initial packets to deliver + List initialPackets = activities.generatePackets(); + // Step 2 - set up delivery processing + for (Packet packet : initialPackets) { + PacketDelivery delivery = new PacketDelivery(packet); + packetDeliveries.put(packet.getId(), delivery); + packetsDelivered.add(delivery.getDelivered()); + } + + Promise.allOf(packetsDelivered).get(); + return "completed"; + } + + @Override + public void confirmDelivery(int deliveryId) { + if (packetDeliveries.containsKey(deliveryId)) { + packetDeliveries.get(deliveryId).confirmDelivery(); + } + } +} diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java b/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java new file mode 100644 index 000000000..37ea053be --- /dev/null +++ b/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java @@ -0,0 +1,56 @@ +package io.temporal.samples.packetdelivery; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; + +public class Starter { + public static void main(String[] args) { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = WorkflowClient.newInstance(service); + WorkerFactory factory = WorkerFactory.newInstance(client); + Worker worker = factory.newWorker("packet-delivery-taskqueue"); + + worker.registerWorkflowImplementationTypes(PacketDeliveryWorkflowImpl.class); + worker.registerActivitiesImplementations(new PacketDeliveryActivitiesImpl()); + + factory.start(); + + PacketDeliveryWorkflow workflow = + client.newWorkflowStub( + PacketDeliveryWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId("packet-delivery-workflow") + .setTaskQueue("packet-delivery-taskqueue") + .build()); + + WorkflowClient.start(workflow::execute); + + // start completing package deliveries (send confirmations) + sleep(3); + workflow.confirmDelivery(3); // furniture + sleep(1); + workflow.confirmDelivery(5); // electronics + sleep(1); + workflow.confirmDelivery(1); // books + sleep(1); + workflow.confirmDelivery(2); // jewelry + sleep(1); + workflow.confirmDelivery(4); // food + + // wait for workflow to complete + String result = WorkflowStub.fromTyped(workflow).getResult(String.class); + System.out.println("** Workflow Result: " + result); + } + + private static void sleep(int seconds) { + try { + Thread.sleep(seconds * 1000L); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } +} From 50f098158a60147433b7af37edf91bb82c555958 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Wed, 9 Jul 2025 18:00:32 -0400 Subject: [PATCH 02/13] update Signed-off-by: Tihomir Surdilovic --- .../java/io/temporal/samples/packetdelivery/PacketDelivery.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java index db0f2abf9..c4ecac9bc 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java @@ -27,13 +27,11 @@ public PacketDelivery(Packet packet) { } public Promise getDelivered() { - System.out.println("*************** CHECKING GET DELIVERED!!! : " + delivered); return delivered; } public void processDeliveryAsync() { delivered.completeFrom(Async.procedure(this::processDelivery)); - System.out.println("*************** DONE WITH PROCESS DELIVERY ASYNC"); } public void processDelivery() { From 3be525a821e179c12e0ae2629e8d9d93a7fbfcc2 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Sat, 12 Jul 2025 12:31:05 -0400 Subject: [PATCH 03/13] adding cancelation and failure Signed-off-by: Tihomir Surdilovic --- .../packetdelivery/PacketDelivery.java | 105 +++++++++---- .../PacketDeliveryActivities.java | 2 + .../PacketDeliveryActivitiesImpl.java | 138 ++++++++++++++++-- .../PacketDeliveryWorkflow.java | 3 + .../PacketDeliveryWorkflowImpl.java | 13 +- .../samples/packetdelivery/PacketUtils.java | 8 + .../samples/packetdelivery/Starter.java | 2 +- 7 files changed, 226 insertions(+), 45 deletions(-) create mode 100644 core/src/main/java/io/temporal/samples/packetdelivery/PacketUtils.java diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java index c4ecac9bc..72ca76cf7 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java @@ -1,10 +1,9 @@ package io.temporal.samples.packetdelivery; import io.temporal.activity.ActivityOptions; -import io.temporal.workflow.Async; -import io.temporal.workflow.CompletablePromise; -import io.temporal.workflow.Promise; -import io.temporal.workflow.Workflow; +import io.temporal.failure.ActivityFailure; +import io.temporal.failure.CanceledFailure; +import io.temporal.workflow.*; import java.time.Duration; import org.slf4j.Logger; @@ -12,11 +11,19 @@ public class PacketDelivery { private Packet packet; private boolean deliveryConfirmation = false; private CompletablePromise delivered = Workflow.newPromise(); - private String deliveryConfirmationCode = ""; + private CancellationScope cancellationScope; private Logger logger = Workflow.getLogger(this.getClass().getName()); private final PacketDeliveryActivities activities = + Workflow.newActivityStub( + PacketDeliveryActivities.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(5)) + .setHeartbeatTimeout(Duration.ofSeconds(2)) + .build()); + + private final PacketDeliveryActivities compensationActivities = Workflow.newActivityStub( PacketDeliveryActivities.class, ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(3)).build()); @@ -35,35 +42,73 @@ public void processDeliveryAsync() { } public void processDelivery() { - while (!deliveryConfirmationCode.equals("Confirmed")) { - // Step 1 perform delivery - logger.info( - "** Performing delivery for packet: " + packet.getId() + " - " + packet.getContent()); - activities.performDelivery(packet); - // Step 2 wait for delivery confirmation - logger.info( - "** Delivery for packet: " - + packet.getId() - + " - " - + packet.getContent() - + " awaiting delivery confirmation"); - Workflow.await(() -> deliveryConfirmation); - logger.info( - "** Delivery for packet: " - + packet.getId() - + " - " - + packet.getContent() - + " received confirmation"); - // Step 3 complete delivery processing - logger.info( - "** Completing delivery for packet: " + packet.getId() + " - " + packet.getContent()); - deliveryConfirmationCode = activities.completeDelivery(packet); - // Reset deliveryConfirmation - deliveryConfirmation = false; + cancellationScope = + Workflow.newCancellationScope( + () -> { + String deliveryConfirmationResult = ""; + while (!deliveryConfirmationResult.equals(PacketUtils.COMPLETION_SUCCESS)) { + // Step 1 perform delivery + logger.info( + "** Performing delivery for packet: " + + packet.getId() + + " - " + + packet.getContent()); + activities.performDelivery(packet); + // Step 2 wait for delivery confirmation + logger.info( + "** Delivery for packet: " + + packet.getId() + + " - " + + packet.getContent() + + " awaiting delivery confirmation"); + Workflow.await(() -> deliveryConfirmation); + logger.info( + "** Delivery for packet: " + + packet.getId() + + " - " + + packet.getContent() + + " received confirmation"); + // Step 3 complete delivery processing + logger.info( + "** Completing delivery for packet: " + + packet.getId() + + " - " + + packet.getContent()); + deliveryConfirmationResult = activities.completeDelivery(packet); + // Reset deliveryConfirmation + deliveryConfirmation = false; + } + }); + + try { + cancellationScope.run(); + } catch (Exception e) { + System.out.println("*************** E1: " + e.getClass().getName()); + if (e instanceof ActivityFailure) { + ActivityFailure activityFailure = (ActivityFailure) e; + if (activityFailure.getCause() instanceof CanceledFailure) { + // Run compensation activity and complete + System.out.println("*************** E11: " + e.getClass().getName()); + compensationActivities.compensateDelivery(packet); + } + } + // Just for show for example that cancel could come in while we are waiting on approval signal + // too + else if (e instanceof CanceledFailure) { + // Run compensation activity and complete + compensationActivities.compensateDelivery(packet); + } + return; } } public void confirmDelivery() { this.deliveryConfirmation = true; } + + public void cancelDelivery(String reason) { + if (cancellationScope != null) { + cancellationScope.cancel(reason); + } + } } diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivities.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivities.java index f338dbed9..b2e67dbab 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivities.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivities.java @@ -10,4 +10,6 @@ public interface PacketDeliveryActivities { void performDelivery(Packet packet); String completeDelivery(Packet packet); + + String compensateDelivery(Packet packet); } diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivitiesImpl.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivitiesImpl.java index 94f0c30e3..655223501 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivitiesImpl.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivitiesImpl.java @@ -1,40 +1,139 @@ package io.temporal.samples.packetdelivery; -import java.util.ArrayList; -import java.util.List; +import io.temporal.activity.Activity; +import io.temporal.activity.ActivityExecutionContext; +import io.temporal.client.ActivityCompletionException; +import io.temporal.client.WorkflowClient; +import java.util.*; public class PacketDeliveryActivitiesImpl implements PacketDeliveryActivities { + private List packets = + Arrays.asList( + new Packet(1, "books"), + new Packet(2, "jewelry"), + new Packet(3, "furniture"), + new Packet(4, "food"), + new Packet(5, "electronics")); + private WorkflowClient client; + + public PacketDeliveryActivitiesImpl(WorkflowClient client) { + this.client = client; + } + @Override public List generatePackets() { - List result = new ArrayList<>(); - result.add(new Packet(1, "books")); - result.add(new Packet(2, "jewelry")); - result.add(new Packet(3, "furniture")); - result.add(new Packet(4, "food")); - result.add(new Packet(5, "electronics")); - return result; + return packets; } @Override public void performDelivery(Packet packet) { + ActivityExecutionContext context = Activity.getExecutionContext(); System.out.println( "** Activity - Performing delivery for packet: " + packet.getId() + " with content: " + packet.getContent()); - sleep(2); + for (int i = 0; i < 4; i++) { + try { + // Perform the heartbeat. Used to notify the workflow that activity execution is alive + context.heartbeat(i); + } catch (ActivityCompletionException e) { + System.out.println( + "** Activity - Canceling delivery activity for packet: " + + packet.getId() + + " with content: " + + packet.getContent()); + throw e; + } + } } @Override public String completeDelivery(Packet packet) { + ActivityExecutionContext context = Activity.getExecutionContext(); System.out.println( "** Activity - Completing delivery for package: " + packet.getId() + " with content: " + packet.getContent()); + for (int i = 0; i < 4; i++) { + try { + // Perform the heartbeat. Used to notify the workflow that activity execution is alive + context.heartbeat(i); + } catch (ActivityCompletionException e) { + System.out.println( + "** Activity - Canceling complete delivery activity for packet: " + + packet.getId() + + " with content: " + + packet.getContent()); + throw e; + } + } + // For sample we just confirm + return randomCompletionDeliveryResult(packet); + } + + @Override + public String compensateDelivery(Packet packet) { + System.out.println( + "** Activity - Compensating delivery for package: " + + packet.getId() + + " with content: " + + packet.getContent()); sleep(1); - // for sample we just confirm - return "Confirmed"; + return PacketUtils.COMPENSATION_COMPLETED; + } + + /** + * For this sample activity completion result can drive if 1. Delivery confirmation is completed, + * in which case we complete delivery 2. Delivery confirmation is failed, in which case we run the + * delivery again 3. Delivery confirmation is cancelled, in which case we want to cancel delivery + * and perform "cleanup activity" Note that any delivery can cancel itself OR another delivery, so + * for example Furniure delivery can cancel the Food delivery. For sample we have some specific + * rules Which delivery can cancel which + */ + private String randomCompletionDeliveryResult(Packet packet) { + Random random = new Random(); + double randomValue = random.nextDouble(); + if (randomValue < 0.10) { // 10% chance for delivery completion to be canceled + int toCancelDelivery = determineCancelRules(packet); + System.out.println( + "** Activity - Delivery completion result for package: " + + packet.getId() + + " with content: " + + packet.getContent() + + ": " + + "Cancelling delivery: " + + toCancelDelivery); + + // send cancellation signal for packet to be canceled + PacketDeliveryWorkflow packetWorkflow = + client.newWorkflowStub( + PacketDeliveryWorkflow.class, + Activity.getExecutionContext().getInfo().getWorkflowId()); + packetWorkflow.cancelDelivery(toCancelDelivery, "canceled from delivery " + packet.getId()); + + return PacketUtils.COMPLETION_CANCELLED; + } + if (randomValue < 0.20) { // 20% chance for delivery completion to fail + System.out.println( + "** Activity - Delivery completion result for package: " + + packet.getId() + + " with content: " + + packet.getContent() + + ": " + + "Failed"); + return PacketUtils.COMPLETION_FAILURE; + } + + System.out.println( + "** Activity - Delivery completion result for package: " + + packet.getId() + + " with content: " + + packet.getContent() + + ": " + + "Successful"); + return PacketUtils.COMPLETION_SUCCESS; } private void sleep(int seconds) { @@ -44,4 +143,19 @@ private void sleep(int seconds) { System.out.println(e.getMessage()); } } + + /** + * Sample rules for canceling different deliveries We just rotate the list 1-5 (packet ids) by + * packet id and return first result + */ + private int determineCancelRules(Packet packet) { + List list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5)); + Collections.rotate(list, packet.getId()); + System.out.println( + "** Activity - Package delivery : " + + packet.getId() + + " canceling package delivery: " + + list.get(0)); + return list.get(0); + } } diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java index 66e85fd2f..cfe381fc9 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java @@ -11,4 +11,7 @@ public interface PacketDeliveryWorkflow { @SignalMethod void confirmDelivery(int deliveryId); + + @SignalMethod + void cancelDelivery(int deliveryId, String reason); } diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java index 6cc3704cc..bacb9d995 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java @@ -12,12 +12,14 @@ public class PacketDeliveryWorkflowImpl implements PacketDeliveryWorkflow { private final Map packetDeliveries = new HashMap<>(); - // private Logger logger = Workflow.getLogger(this.getClass().getName()); private final PacketDeliveryActivities activities = Workflow.newActivityStub( PacketDeliveryActivities.class, - ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(3)).build()); + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(5)) + .setHeartbeatTimeout(Duration.ofSeconds(2)) + .build()); @Override public String execute() { @@ -41,4 +43,11 @@ public void confirmDelivery(int deliveryId) { packetDeliveries.get(deliveryId).confirmDelivery(); } } + + @Override + public void cancelDelivery(int deliveryId, String reason) { + if (packetDeliveries.containsKey(deliveryId)) { + packetDeliveries.get(deliveryId).cancelDelivery(reason); + } + } } diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketUtils.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketUtils.java new file mode 100644 index 000000000..193a2f4be --- /dev/null +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketUtils.java @@ -0,0 +1,8 @@ +package io.temporal.samples.packetdelivery; + +public class PacketUtils { + public static String COMPLETION_SUCCESS = "Delivery Completion Successful"; + public static String COMPLETION_FAILURE = "Delivery Completion Failed"; + public static String COMPLETION_CANCELLED = "Delivery Completion Cancelled"; + public static String COMPENSATION_COMPLETED = "Delivery Compensation Completed"; +} diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java b/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java index 37ea053be..e3221ed7c 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java @@ -15,7 +15,7 @@ public static void main(String[] args) { Worker worker = factory.newWorker("packet-delivery-taskqueue"); worker.registerWorkflowImplementationTypes(PacketDeliveryWorkflowImpl.class); - worker.registerActivitiesImplementations(new PacketDeliveryActivitiesImpl()); + worker.registerActivitiesImplementations(new PacketDeliveryActivitiesImpl(client)); factory.start(); From 0267b4fa2cfb00b767beb265bdd4c98dc8f500e7 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Sat, 12 Jul 2025 12:41:31 -0400 Subject: [PATCH 04/13] adding readme Signed-off-by: Tihomir Surdilovic --- .../java/io/temporal/samples/packetdelivery/README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 core/src/main/java/io/temporal/samples/packetdelivery/README.md diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/README.md b/core/src/main/java/io/temporal/samples/packetdelivery/README.md new file mode 100644 index 000000000..5f187ccab --- /dev/null +++ b/core/src/main/java/io/temporal/samples/packetdelivery/README.md @@ -0,0 +1,10 @@ +# Async Package Delivery Sample + +This sample show how to run multiple "paths" of execution async within single workflow. +Sample starts deliveries of 5 items in parallel. Each item performs an activity +and then waits for a confirmation signal, then performs second activity. + +Workflow waits until all packets have been delivered. Each packet delivery path can choose to +also "cancel" delivery of another item. This is done via signal and cancellation of the +CancellationScope. + From 9f54fc4ad4108f4b65b8bba680cd290c5fab69b2 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Sat, 12 Jul 2025 12:43:27 -0400 Subject: [PATCH 05/13] update readme Signed-off-by: Tihomir Surdilovic --- README.md | 3 +++ .../main/java/io/temporal/samples/packetdelivery/README.md | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/README.md b/README.md index 0b8ff8b12..1cbf700c9 100644 --- a/README.md +++ b/README.md @@ -108,6 +108,9 @@ See the README.md file in each main sample directory for cut/paste Gradle comman - [**Custom Annotation**](/core/src/main/java/io/temporal/samples/customannotation): Demonstrates how to create a custom annotation using an interceptor. +- [**Asnyc Packet Delivery**](/core/src/main/java/io/temporal/samples/packetdelivery): Demonstrates running multiple execution paths async within single execution. + + #### API demonstrations - [**Async Untyped Child Workflow**](/core/src/main/java/io/temporal/samples/asyncuntypedchild): Demonstrates how to invoke an untyped child workflow async, that can complete after parent workflow is already completed. diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/README.md b/core/src/main/java/io/temporal/samples/packetdelivery/README.md index 5f187ccab..815c7461d 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/README.md +++ b/core/src/main/java/io/temporal/samples/packetdelivery/README.md @@ -8,3 +8,7 @@ Workflow waits until all packets have been delivered. Each packet delivery path also "cancel" delivery of another item. This is done via signal and cancellation of the CancellationScope. +2. Start the Sample: +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.packetdelivery.Starter +`` \ No newline at end of file From 3f90abc48d1261343b81fada567b8fd895196c06 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Sat, 12 Jul 2025 13:29:06 -0400 Subject: [PATCH 06/13] update Signed-off-by: Tihomir Surdilovic --- .../java/io/temporal/samples/packetdelivery/PacketDelivery.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java index 72ca76cf7..8d03a2dfe 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java @@ -83,12 +83,10 @@ public void processDelivery() { try { cancellationScope.run(); } catch (Exception e) { - System.out.println("*************** E1: " + e.getClass().getName()); if (e instanceof ActivityFailure) { ActivityFailure activityFailure = (ActivityFailure) e; if (activityFailure.getCause() instanceof CanceledFailure) { // Run compensation activity and complete - System.out.println("*************** E11: " + e.getClass().getName()); compensationActivities.compensateDelivery(packet); } } From 819b3f9564805ca413fb35e5679ab04e8ba2c3da Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Sat, 12 Jul 2025 14:26:36 -0400 Subject: [PATCH 07/13] update Signed-off-by: Tihomir Surdilovic --- .../packetdelivery/PacketDelivery.java | 14 ++++++++- .../PacketDeliveryWorkflow.java | 5 +++ .../PacketDeliveryWorkflowImpl.java | 29 +++++++++++++++-- .../samples/packetdelivery/Starter.java | 31 +++++++++++++------ 4 files changed, 66 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java index 8d03a2dfe..334d21bcb 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java @@ -10,6 +10,7 @@ public class PacketDelivery { private Packet packet; private boolean deliveryConfirmation = false; + private boolean needDeliveryConfirmation = false; private CompletablePromise delivered = Workflow.newPromise(); private CancellationScope cancellationScope; @@ -61,6 +62,7 @@ public void processDelivery() { + " - " + packet.getContent() + " awaiting delivery confirmation"); + needDeliveryConfirmation = true; Workflow.await(() -> deliveryConfirmation); logger.info( "** Delivery for packet: " @@ -75,8 +77,9 @@ public void processDelivery() { + " - " + packet.getContent()); deliveryConfirmationResult = activities.completeDelivery(packet); - // Reset deliveryConfirmation + // Reset deliveryConfirmation and needDeliveryConfirmation deliveryConfirmation = false; + needDeliveryConfirmation = false; } }); @@ -93,6 +96,7 @@ public void processDelivery() { // Just for show for example that cancel could come in while we are waiting on approval signal // too else if (e instanceof CanceledFailure) { + needDeliveryConfirmation = false; // Run compensation activity and complete compensationActivities.compensateDelivery(packet); } @@ -109,4 +113,12 @@ public void cancelDelivery(String reason) { cancellationScope.cancel(reason); } } + + public boolean isNeedDeliveryConfirmation() { + return needDeliveryConfirmation; + } + + public Packet getPacket() { + return packet; + } } diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java index cfe381fc9..1f48fac08 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java @@ -1,8 +1,10 @@ package io.temporal.samples.packetdelivery; +import io.temporal.workflow.QueryMethod; import io.temporal.workflow.SignalMethod; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; +import java.util.List; @WorkflowInterface public interface PacketDeliveryWorkflow { @@ -14,4 +16,7 @@ public interface PacketDeliveryWorkflow { @SignalMethod void cancelDelivery(int deliveryId, String reason); + + @QueryMethod + List deliveryConfirmationPackets(); } diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java index bacb9d995..f03b8f18a 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java @@ -8,10 +8,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.slf4j.Logger; public class PacketDeliveryWorkflowImpl implements PacketDeliveryWorkflow { - private final Map packetDeliveries = new HashMap<>(); + private final Logger logger = Workflow.getLogger(this.getClass().getName()); private final PacketDeliveryActivities activities = Workflow.newActivityStub( @@ -47,7 +48,31 @@ public void confirmDelivery(int deliveryId) { @Override public void cancelDelivery(int deliveryId, String reason) { if (packetDeliveries.containsKey(deliveryId)) { - packetDeliveries.get(deliveryId).cancelDelivery(reason); + // Only makes sense to cancel if delivery is not done yet + if (!packetDeliveries.get(deliveryId).getDelivered().isCompleted()) { + logger.info("Sending cancellation for delivery : " + deliveryId + " and reason: " + reason); + packetDeliveries.get(deliveryId).cancelDelivery(reason); + } + logger.info( + "Bypassing sending cancellation for delivery : " + + deliveryId + + " and reason: " + + reason + + " because delivery already completed"); } } + + @Override + public List deliveryConfirmationPackets() { + List confirmationPackets = new ArrayList<>(); + packetDeliveries + .values() + .forEach( + p -> { + if (p.isNeedDeliveryConfirmation()) { + confirmationPackets.add(p.getPacket()); + } + }); + return confirmationPackets; + } } diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java b/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java index e3221ed7c..2902d97ba 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java @@ -1,11 +1,13 @@ package io.temporal.samples.packetdelivery; import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowNotFoundException; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactory; +import java.util.List; public class Starter { public static void main(String[] args) { @@ -30,16 +32,25 @@ public static void main(String[] args) { WorkflowClient.start(workflow::execute); // start completing package deliveries (send confirmations) - sleep(3); - workflow.confirmDelivery(3); // furniture - sleep(1); - workflow.confirmDelivery(5); // electronics - sleep(1); - workflow.confirmDelivery(1); // books - sleep(1); - workflow.confirmDelivery(2); // jewelry - sleep(1); - workflow.confirmDelivery(4); // food + // Query workflow for packets that need confirmation, confirm until none need confirmation any + // more + while (true) { + sleep(3); + List packets = workflow.deliveryConfirmationPackets(); + if (packets.isEmpty()) { + break; + } + + for (Packet p : packets) { + try { + workflow.confirmDelivery(p.getId()); + } catch (WorkflowNotFoundException e) { + // In some cases with cancellations happening, workflow could be completed by now + // We just ignore and exit out of loop + break; + } + } + } // wait for workflow to complete String result = WorkflowStub.fromTyped(workflow).getResult(String.class); From 0febe573bd1c33b2b59491d69bd172fdeefa6e7d Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Sat, 12 Jul 2025 14:31:55 -0400 Subject: [PATCH 08/13] update starter Signed-off-by: Tihomir Surdilovic --- .../main/java/io/temporal/samples/packetdelivery/Starter.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java b/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java index 2902d97ba..56513033f 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java @@ -7,6 +7,7 @@ import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactory; +import java.util.Collections; import java.util.List; public class Starter { @@ -36,10 +37,13 @@ public static void main(String[] args) { // more while (true) { sleep(3); + // for "fun", reverse the list we get from delivery confirmation list List packets = workflow.deliveryConfirmationPackets(); if (packets.isEmpty()) { break; } + // for "fun", reverse the list we get from delivery confirmation list + Collections.reverse(packets); for (Packet p : packets) { try { From 143857e0fff533306f16147bc71a942274026b59 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Sat, 12 Jul 2025 14:35:24 -0400 Subject: [PATCH 09/13] update readme Signed-off-by: Tihomir Surdilovic --- .../main/java/io/temporal/samples/packetdelivery/README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/README.md b/core/src/main/java/io/temporal/samples/packetdelivery/README.md index 815c7461d..a935fc995 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/README.md +++ b/core/src/main/java/io/temporal/samples/packetdelivery/README.md @@ -8,7 +8,10 @@ Workflow waits until all packets have been delivered. Each packet delivery path also "cancel" delivery of another item. This is done via signal and cancellation of the CancellationScope. -2. Start the Sample: +## Start the Sample: ```bash ./gradlew -q execute -PmainClass=io.temporal.samples.packetdelivery.Starter -`` \ No newline at end of file +`` + +Run sample multiple times to see different scenarios (delivery failure and retry and delivery cancelation) +There is a 10% chance delivery is going to be canceled and 20% chane it will fail. \ No newline at end of file From 126ec11b49e913096c54aedf9594be85a8821ba2 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Sat, 12 Jul 2025 14:35:52 -0400 Subject: [PATCH 10/13] update readme Signed-off-by: Tihomir Surdilovic --- core/src/main/java/io/temporal/samples/packetdelivery/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/README.md b/core/src/main/java/io/temporal/samples/packetdelivery/README.md index a935fc995..6751df1fe 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/README.md +++ b/core/src/main/java/io/temporal/samples/packetdelivery/README.md @@ -11,7 +11,7 @@ CancellationScope. ## Start the Sample: ```bash ./gradlew -q execute -PmainClass=io.temporal.samples.packetdelivery.Starter -`` +``` Run sample multiple times to see different scenarios (delivery failure and retry and delivery cancelation) There is a 10% chance delivery is going to be canceled and 20% chane it will fail. \ No newline at end of file From 02712658876be11ae4c0ed5d40eb52b5aa01b5a5 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Sat, 12 Jul 2025 14:38:26 -0400 Subject: [PATCH 11/13] add comment Signed-off-by: Tihomir Surdilovic --- .../samples/packetdelivery/PacketDeliveryWorkflowImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java index f03b8f18a..6236458d3 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java @@ -34,6 +34,7 @@ public String execute() { packetsDelivered.add(delivery.getDelivered()); } + // Wait for all packet deliveries to complete Promise.allOf(packetsDelivered).get(); return "completed"; } From dff2e9d8885ce99656dbcffd98d720c78d514fb7 Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Fri, 18 Jul 2025 09:56:01 -0400 Subject: [PATCH 12/13] update readme Signed-off-by: Tihomir Surdilovic --- .../main/java/io/temporal/samples/packetdelivery/README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/README.md b/core/src/main/java/io/temporal/samples/packetdelivery/README.md index 6751df1fe..d43309990 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/README.md +++ b/core/src/main/java/io/temporal/samples/packetdelivery/README.md @@ -8,6 +8,13 @@ Workflow waits until all packets have been delivered. Each packet delivery path also "cancel" delivery of another item. This is done via signal and cancellation of the CancellationScope. +## Notes +1. In this sample we do not handle event history count and size partitioning via ContinueAsNew. It is assumed +that the total number of paths and path lengths (in terms of activity executions) would not exceed it. +For your use case you might need to add ContinueAsNew checks to deal with this situation. +2. Use this sample as all other ones as reference for your implementation. It was not tested on high scale +so using it as-is without load testing is not recommended. + ## Start the Sample: ```bash ./gradlew -q execute -PmainClass=io.temporal.samples.packetdelivery.Starter From bfe92096bd1922918cd5a99f5d47a21fe7fc7ffa Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Fri, 18 Jul 2025 09:57:11 -0400 Subject: [PATCH 13/13] remove duplicate comment Signed-off-by: Tihomir Surdilovic --- .../main/java/io/temporal/samples/packetdelivery/Starter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java b/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java index 56513033f..e1d97e621 100644 --- a/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java +++ b/core/src/main/java/io/temporal/samples/packetdelivery/Starter.java @@ -37,7 +37,6 @@ public static void main(String[] args) { // more while (true) { sleep(3); - // for "fun", reverse the list we get from delivery confirmation list List packets = workflow.deliveryConfirmationPackets(); if (packets.isEmpty()) { break;