-
Notifications
You must be signed in to change notification settings - Fork 175
Async package delivery sample #746
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
2a51eab
[WIP] Async package delivery sample
tsurdilo 50f0981
update
tsurdilo 3be525a
adding cancelation and failure
tsurdilo 0267b4f
adding readme
tsurdilo 9f54fc4
update readme
tsurdilo 3f90abc
update
tsurdilo 819b3f9
update
tsurdilo 0febe57
update starter
tsurdilo 143857e
update readme
tsurdilo 126ec11
update readme
tsurdilo 0271265
add comment
tsurdilo dff2e9d
update readme
tsurdilo bfe9209
remove duplicate comment
tsurdilo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
21 changes: 21 additions & 0 deletions
21
core/src/main/java/io/temporal/samples/packetdelivery/Packet.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| package io.temporal.samples.packetdelivery; | ||
|
|
||
| public class Packet { | ||
| private int id; | ||
| private String content; | ||
|
|
||
| public Packet() {} | ||
|
|
||
| public Packet(int id, String content) { | ||
| this.id = id; | ||
| this.content = content; | ||
| } | ||
|
|
||
| public int getId() { | ||
| return id; | ||
| } | ||
|
|
||
| public String getContent() { | ||
| return content; | ||
| } | ||
| } |
124 changes: 124 additions & 0 deletions
124
core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,124 @@ | ||
| package io.temporal.samples.packetdelivery; | ||
|
|
||
| import io.temporal.activity.ActivityOptions; | ||
| import io.temporal.failure.ActivityFailure; | ||
| import io.temporal.failure.CanceledFailure; | ||
| import io.temporal.workflow.*; | ||
| import java.time.Duration; | ||
| import org.slf4j.Logger; | ||
|
|
||
| public class PacketDelivery { | ||
| private Packet packet; | ||
| private boolean deliveryConfirmation = false; | ||
| private boolean needDeliveryConfirmation = false; | ||
| private CompletablePromise delivered = Workflow.newPromise(); | ||
| private CancellationScope cancellationScope; | ||
|
|
||
| private Logger logger = Workflow.getLogger(this.getClass().getName()); | ||
|
|
||
| private final PacketDeliveryActivities activities = | ||
| Workflow.newActivityStub( | ||
| PacketDeliveryActivities.class, | ||
| ActivityOptions.newBuilder() | ||
| .setStartToCloseTimeout(Duration.ofSeconds(5)) | ||
| .setHeartbeatTimeout(Duration.ofSeconds(2)) | ||
| .build()); | ||
|
|
||
| private final PacketDeliveryActivities compensationActivities = | ||
| Workflow.newActivityStub( | ||
| PacketDeliveryActivities.class, | ||
| ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(3)).build()); | ||
|
|
||
| public PacketDelivery(Packet packet) { | ||
| this.packet = packet; | ||
| processDeliveryAsync(); | ||
| } | ||
|
|
||
| public Promise<Void> getDelivered() { | ||
| return delivered; | ||
| } | ||
|
|
||
| public void processDeliveryAsync() { | ||
| delivered.completeFrom(Async.procedure(this::processDelivery)); | ||
| } | ||
|
|
||
| public void processDelivery() { | ||
| cancellationScope = | ||
| Workflow.newCancellationScope( | ||
| () -> { | ||
| String deliveryConfirmationResult = ""; | ||
| while (!deliveryConfirmationResult.equals(PacketUtils.COMPLETION_SUCCESS)) { | ||
| // Step 1 perform delivery | ||
| logger.info( | ||
| "** Performing delivery for packet: " | ||
| + packet.getId() | ||
| + " - " | ||
| + packet.getContent()); | ||
| activities.performDelivery(packet); | ||
| // Step 2 wait for delivery confirmation | ||
| logger.info( | ||
| "** Delivery for packet: " | ||
| + packet.getId() | ||
| + " - " | ||
| + packet.getContent() | ||
| + " awaiting delivery confirmation"); | ||
| needDeliveryConfirmation = true; | ||
| Workflow.await(() -> deliveryConfirmation); | ||
| logger.info( | ||
| "** Delivery for packet: " | ||
| + packet.getId() | ||
| + " - " | ||
| + packet.getContent() | ||
| + " received confirmation"); | ||
| // Step 3 complete delivery processing | ||
| logger.info( | ||
| "** Completing delivery for packet: " | ||
| + packet.getId() | ||
| + " - " | ||
| + packet.getContent()); | ||
| deliveryConfirmationResult = activities.completeDelivery(packet); | ||
| // Reset deliveryConfirmation and needDeliveryConfirmation | ||
| deliveryConfirmation = false; | ||
| needDeliveryConfirmation = false; | ||
| } | ||
| }); | ||
|
|
||
| try { | ||
| cancellationScope.run(); | ||
| } catch (Exception e) { | ||
| if (e instanceof ActivityFailure) { | ||
| ActivityFailure activityFailure = (ActivityFailure) e; | ||
| if (activityFailure.getCause() instanceof CanceledFailure) { | ||
| // Run compensation activity and complete | ||
| compensationActivities.compensateDelivery(packet); | ||
| } | ||
| } | ||
| // Just for show for example that cancel could come in while we are waiting on approval signal | ||
| // too | ||
| else if (e instanceof CanceledFailure) { | ||
| needDeliveryConfirmation = false; | ||
| // Run compensation activity and complete | ||
| compensationActivities.compensateDelivery(packet); | ||
| } | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| public void confirmDelivery() { | ||
| this.deliveryConfirmation = true; | ||
| } | ||
|
|
||
| public void cancelDelivery(String reason) { | ||
| if (cancellationScope != null) { | ||
| cancellationScope.cancel(reason); | ||
| } | ||
| } | ||
|
|
||
| public boolean isNeedDeliveryConfirmation() { | ||
| return needDeliveryConfirmation; | ||
| } | ||
|
|
||
| public Packet getPacket() { | ||
| return packet; | ||
| } | ||
| } |
15 changes: 15 additions & 0 deletions
15
core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivities.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| package io.temporal.samples.packetdelivery; | ||
|
|
||
| import io.temporal.activity.ActivityInterface; | ||
| import java.util.List; | ||
|
|
||
| @ActivityInterface | ||
| public interface PacketDeliveryActivities { | ||
| List<Packet> generatePackets(); | ||
|
|
||
| void performDelivery(Packet packet); | ||
|
|
||
| String completeDelivery(Packet packet); | ||
|
|
||
| String compensateDelivery(Packet packet); | ||
| } |
161 changes: 161 additions & 0 deletions
161
core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivitiesImpl.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,161 @@ | ||
| package io.temporal.samples.packetdelivery; | ||
|
|
||
| import io.temporal.activity.Activity; | ||
| import io.temporal.activity.ActivityExecutionContext; | ||
| import io.temporal.client.ActivityCompletionException; | ||
| import io.temporal.client.WorkflowClient; | ||
| import java.util.*; | ||
|
|
||
| public class PacketDeliveryActivitiesImpl implements PacketDeliveryActivities { | ||
| private List<Packet> packets = | ||
| Arrays.asList( | ||
| new Packet(1, "books"), | ||
| new Packet(2, "jewelry"), | ||
| new Packet(3, "furniture"), | ||
| new Packet(4, "food"), | ||
| new Packet(5, "electronics")); | ||
| private WorkflowClient client; | ||
|
|
||
| public PacketDeliveryActivitiesImpl(WorkflowClient client) { | ||
| this.client = client; | ||
| } | ||
|
|
||
| @Override | ||
| public List<Packet> generatePackets() { | ||
| return packets; | ||
| } | ||
|
|
||
| @Override | ||
| public void performDelivery(Packet packet) { | ||
| ActivityExecutionContext context = Activity.getExecutionContext(); | ||
| System.out.println( | ||
| "** Activity - Performing delivery for packet: " | ||
| + packet.getId() | ||
| + " with content: " | ||
| + packet.getContent()); | ||
| for (int i = 0; i < 4; i++) { | ||
| try { | ||
| // Perform the heartbeat. Used to notify the workflow that activity execution is alive | ||
| context.heartbeat(i); | ||
| } catch (ActivityCompletionException e) { | ||
| System.out.println( | ||
| "** Activity - Canceling delivery activity for packet: " | ||
| + packet.getId() | ||
| + " with content: " | ||
| + packet.getContent()); | ||
| throw e; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public String completeDelivery(Packet packet) { | ||
| ActivityExecutionContext context = Activity.getExecutionContext(); | ||
| System.out.println( | ||
| "** Activity - Completing delivery for package: " | ||
| + packet.getId() | ||
| + " with content: " | ||
| + packet.getContent()); | ||
| for (int i = 0; i < 4; i++) { | ||
| try { | ||
| // Perform the heartbeat. Used to notify the workflow that activity execution is alive | ||
| context.heartbeat(i); | ||
| } catch (ActivityCompletionException e) { | ||
| System.out.println( | ||
| "** Activity - Canceling complete delivery activity for packet: " | ||
| + packet.getId() | ||
| + " with content: " | ||
| + packet.getContent()); | ||
| throw e; | ||
| } | ||
| } | ||
| // For sample we just confirm | ||
| return randomCompletionDeliveryResult(packet); | ||
| } | ||
|
|
||
| @Override | ||
| public String compensateDelivery(Packet packet) { | ||
| System.out.println( | ||
| "** Activity - Compensating delivery for package: " | ||
| + packet.getId() | ||
| + " with content: " | ||
| + packet.getContent()); | ||
| sleep(1); | ||
| return PacketUtils.COMPENSATION_COMPLETED; | ||
| } | ||
|
|
||
| /** | ||
| * For this sample activity completion result can drive if 1. Delivery confirmation is completed, | ||
| * in which case we complete delivery 2. Delivery confirmation is failed, in which case we run the | ||
| * delivery again 3. Delivery confirmation is cancelled, in which case we want to cancel delivery | ||
| * and perform "cleanup activity" Note that any delivery can cancel itself OR another delivery, so | ||
| * for example Furniure delivery can cancel the Food delivery. For sample we have some specific | ||
| * rules Which delivery can cancel which | ||
| */ | ||
| private String randomCompletionDeliveryResult(Packet packet) { | ||
| Random random = new Random(); | ||
| double randomValue = random.nextDouble(); | ||
| if (randomValue < 0.10) { // 10% chance for delivery completion to be canceled | ||
| int toCancelDelivery = determineCancelRules(packet); | ||
| System.out.println( | ||
| "** Activity - Delivery completion result for package: " | ||
| + packet.getId() | ||
| + " with content: " | ||
| + packet.getContent() | ||
| + ": " | ||
| + "Cancelling delivery: " | ||
| + toCancelDelivery); | ||
|
|
||
| // send cancellation signal for packet to be canceled | ||
| PacketDeliveryWorkflow packetWorkflow = | ||
| client.newWorkflowStub( | ||
| PacketDeliveryWorkflow.class, | ||
| Activity.getExecutionContext().getInfo().getWorkflowId()); | ||
| packetWorkflow.cancelDelivery(toCancelDelivery, "canceled from delivery " + packet.getId()); | ||
|
|
||
| return PacketUtils.COMPLETION_CANCELLED; | ||
| } | ||
| if (randomValue < 0.20) { // 20% chance for delivery completion to fail | ||
| System.out.println( | ||
| "** Activity - Delivery completion result for package: " | ||
| + packet.getId() | ||
| + " with content: " | ||
| + packet.getContent() | ||
| + ": " | ||
| + "Failed"); | ||
| return PacketUtils.COMPLETION_FAILURE; | ||
| } | ||
|
|
||
| System.out.println( | ||
| "** Activity - Delivery completion result for package: " | ||
| + packet.getId() | ||
| + " with content: " | ||
| + packet.getContent() | ||
| + ": " | ||
| + "Successful"); | ||
| return PacketUtils.COMPLETION_SUCCESS; | ||
| } | ||
|
|
||
| private void sleep(int seconds) { | ||
| try { | ||
| Thread.sleep(seconds * 1000L); | ||
| } catch (Exception e) { | ||
| System.out.println(e.getMessage()); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Sample rules for canceling different deliveries We just rotate the list 1-5 (packet ids) by | ||
| * packet id and return first result | ||
| */ | ||
| private int determineCancelRules(Packet packet) { | ||
| List<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5)); | ||
| Collections.rotate(list, packet.getId()); | ||
| System.out.println( | ||
| "** Activity - Package delivery : " | ||
| + packet.getId() | ||
| + " canceling package delivery: " | ||
| + list.get(0)); | ||
| return list.get(0); | ||
| } | ||
| } | ||
22 changes: 22 additions & 0 deletions
22
core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| package io.temporal.samples.packetdelivery; | ||
|
|
||
| import io.temporal.workflow.QueryMethod; | ||
| import io.temporal.workflow.SignalMethod; | ||
| import io.temporal.workflow.WorkflowInterface; | ||
| import io.temporal.workflow.WorkflowMethod; | ||
| import java.util.List; | ||
|
|
||
| @WorkflowInterface | ||
| public interface PacketDeliveryWorkflow { | ||
| @WorkflowMethod | ||
| String execute(); | ||
|
|
||
| @SignalMethod | ||
| void confirmDelivery(int deliveryId); | ||
|
|
||
| @SignalMethod | ||
| void cancelDelivery(int deliveryId, String reason); | ||
|
|
||
| @QueryMethod | ||
| List<Packet> deliveryConfirmationPackets(); | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the idea here to have a "long-running" activity that can be cancelled? Activity execution latency in this example is between 5-7 ms and heartbeatTimeout is 2 seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its to handle cancelation in activity, similar to HelloCancellation sample
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understand, . Of all the times I’ve run the example, none have resulted in a cancelled activity. maybe 7 ms is not enough, but I will try again later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah you can change the "percentages" of cancel / failure to increase change for cancelation locally