diff --git a/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchStarter.java b/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchStarter.java index c89fc590c..dd5157c46 100644 --- a/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchStarter.java +++ b/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchStarter.java @@ -2,10 +2,10 @@ import static io.temporal.samples.batch.iterator.IteratorBatchWorker.TASK_QUEUE; -import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.List; /** Starts a single execution of IteratorBatchWorkflow. */ public class IteratorBatchStarter { @@ -16,12 +16,8 @@ public static void main(String[] args) { WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build(); IteratorBatchWorkflow batchWorkflow = workflowClient.newWorkflowStub(IteratorBatchWorkflow.class, options); - WorkflowExecution execution = WorkflowClient.start(batchWorkflow::processBatch, 5, 0); - System.out.println( - "Started batch workflow. WorkflowId=" - + execution.getWorkflowId() - + ", RunId=" - + execution.getRunId()); + List responses = batchWorkflow.processBatch(100, 0); + System.out.println("Responses=" + responses.toString()); System.exit(0); } } diff --git a/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchWorker.java b/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchWorker.java index 88b8eacb7..54fe952fa 100644 --- a/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchWorker.java +++ b/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchWorker.java @@ -21,7 +21,9 @@ public static void main(String[] args) { Worker worker = factory.newWorker(TASK_QUEUE); worker.registerWorkflowImplementationTypes( - IteratorBatchWorkflowImpl.class, RecordProcessorWorkflowImpl.class); + IteratorBatchWorkflowImpl.class, + SingleListingMigrationWorkflowImpl.class, + ListingMigrationWorkflowImpl.class); worker.registerActivitiesImplementations(new RecordLoaderImpl()); factory.start(); diff --git a/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchWorkflow.java b/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchWorkflow.java index 991cdd257..6cb235174 100644 --- a/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchWorkflow.java +++ b/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchWorkflow.java @@ -2,6 +2,7 @@ import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; +import java.util.List; @WorkflowInterface public interface IteratorBatchWorkflow { @@ -14,5 +15,5 @@ public interface IteratorBatchWorkflow { * @return total number of processed records. */ @WorkflowMethod - int processBatch(int pageSize, int offset); + List processBatch(int pageSize, int offset); } diff --git a/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchWorkflowImpl.java b/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchWorkflowImpl.java index fa294ec70..bfc77ae8f 100644 --- a/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchWorkflowImpl.java +++ b/core/src/main/java/io/temporal/samples/batch/iterator/IteratorBatchWorkflowImpl.java @@ -31,19 +31,19 @@ public final class IteratorBatchWorkflowImpl implements IteratorBatchWorkflow { Workflow.newContinueAsNewStub(IteratorBatchWorkflow.class); @Override - public int processBatch(int pageSize, int offset) { + public List processBatch(int pageSize, int offset) { // Loads a page of records List records = recordLoader.getRecords(pageSize, offset); // Starts a child per record asynchrnously. - List> results = new ArrayList<>(records.size()); + List> results = new ArrayList<>(records.size()); for (SingleRecord record : records) { // Uses human friendly child id. String childId = Workflow.getInfo().getWorkflowId() + "/" + record.getId(); - RecordProcessorWorkflow processor = + SingleListingMigrationWorkflow processor = Workflow.newChildWorkflowStub( - RecordProcessorWorkflow.class, + SingleListingMigrationWorkflow.class, ChildWorkflowOptions.newBuilder().setWorkflowId(childId).build()); - Promise result = Async.procedure(processor::processRecord, record); + Promise result = Async.function(processor::processRecord, record); results.add(result); } // Waits for all children to complete. @@ -54,7 +54,7 @@ public int processBatch(int pageSize, int offset) { // No more records in the dataset. Completes the workflow. if (records.isEmpty()) { - return offset; + return List.of(); } // Continues as new with the increased offset. diff --git a/core/src/main/java/io/temporal/samples/batch/iterator/ListingMigrationStarter.java b/core/src/main/java/io/temporal/samples/batch/iterator/ListingMigrationStarter.java new file mode 100644 index 000000000..0298a90fb --- /dev/null +++ b/core/src/main/java/io/temporal/samples/batch/iterator/ListingMigrationStarter.java @@ -0,0 +1,20 @@ +package io.temporal.samples.batch.iterator; + +import static io.temporal.samples.batch.iterator.IteratorBatchWorker.TASK_QUEUE; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; + +public class ListingMigrationStarter { + + public static void main(String[] args) { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient workflowClient = WorkflowClient.newInstance(service); + WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build(); + ListingMigrationWorkflow migrationWorkflow = + workflowClient.newWorkflowStub(ListingMigrationWorkflow.class, options); + migrationWorkflow.execute(); + System.exit(0); + } +} diff --git a/core/src/main/java/io/temporal/samples/batch/iterator/ListingMigrationWorkflow.java b/core/src/main/java/io/temporal/samples/batch/iterator/ListingMigrationWorkflow.java new file mode 100644 index 000000000..d7e40d7af --- /dev/null +++ b/core/src/main/java/io/temporal/samples/batch/iterator/ListingMigrationWorkflow.java @@ -0,0 +1,11 @@ +package io.temporal.samples.batch.iterator; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface ListingMigrationWorkflow { + + @WorkflowMethod + void execute(); +} diff --git a/core/src/main/java/io/temporal/samples/batch/iterator/ListingMigrationWorkflowImpl.java b/core/src/main/java/io/temporal/samples/batch/iterator/ListingMigrationWorkflowImpl.java new file mode 100644 index 000000000..6a65a1818 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/batch/iterator/ListingMigrationWorkflowImpl.java @@ -0,0 +1,11 @@ +package io.temporal.samples.batch.iterator; + +import io.temporal.workflow.Workflow; + +public class ListingMigrationWorkflowImpl implements ListingMigrationWorkflow { + @Override + public void execute() { + var iteratorBatchWorkflow = Workflow.newChildWorkflowStub(IteratorBatchWorkflow.class); + iteratorBatchWorkflow.processBatch(100, 0); + } +} diff --git a/core/src/main/java/io/temporal/samples/batch/iterator/RecordLoaderImpl.java b/core/src/main/java/io/temporal/samples/batch/iterator/RecordLoaderImpl.java index b7102ce24..853708035 100644 --- a/core/src/main/java/io/temporal/samples/batch/iterator/RecordLoaderImpl.java +++ b/core/src/main/java/io/temporal/samples/batch/iterator/RecordLoaderImpl.java @@ -8,12 +8,12 @@ public final class RecordLoaderImpl implements RecordLoader { // The sample always returns 5 pages. // The real application would iterate over an existing dataset or file. - public static final int PAGE_COUNT = 5; + public static final int MAX_COUNT = 1000; @Override public List getRecords(int pageSize, int offset) { List records = new ArrayList<>(pageSize); - if (offset < pageSize * PAGE_COUNT) { + if (offset < MAX_COUNT) { for (int i = 0; i < pageSize; i++) { records.add(new SingleRecord(offset + i)); } diff --git a/core/src/main/java/io/temporal/samples/batch/iterator/RecordProcessorWorkflowImpl.java b/core/src/main/java/io/temporal/samples/batch/iterator/RecordProcessorWorkflowImpl.java deleted file mode 100644 index 78e7eeb88..000000000 --- a/core/src/main/java/io/temporal/samples/batch/iterator/RecordProcessorWorkflowImpl.java +++ /dev/null @@ -1,19 +0,0 @@ -package io.temporal.samples.batch.iterator; - -import io.temporal.workflow.Workflow; -import java.time.Duration; -import java.util.Random; -import org.slf4j.Logger; - -/** Fake RecordProcessorWorkflow implementation. */ -public class RecordProcessorWorkflowImpl implements RecordProcessorWorkflow { - public static final Logger log = Workflow.getLogger(RecordProcessorWorkflowImpl.class); - private final Random random = Workflow.newRandom(); - - @Override - public void processRecord(SingleRecord r) { - // Simulate some processing - Workflow.sleep(Duration.ofSeconds(random.nextInt(30))); - log.info("Processed " + r); - } -} diff --git a/core/src/main/java/io/temporal/samples/batch/iterator/RecordProcessorWorkflow.java b/core/src/main/java/io/temporal/samples/batch/iterator/SingleListingMigrationWorkflow.java similarity index 73% rename from core/src/main/java/io/temporal/samples/batch/iterator/RecordProcessorWorkflow.java rename to core/src/main/java/io/temporal/samples/batch/iterator/SingleListingMigrationWorkflow.java index 498da8567..5227efb72 100644 --- a/core/src/main/java/io/temporal/samples/batch/iterator/RecordProcessorWorkflow.java +++ b/core/src/main/java/io/temporal/samples/batch/iterator/SingleListingMigrationWorkflow.java @@ -5,9 +5,9 @@ /** Workflow that implements processing of a single record. */ @WorkflowInterface -public interface RecordProcessorWorkflow { +public interface SingleListingMigrationWorkflow { /** Processes a single record */ @WorkflowMethod - void processRecord(SingleRecord r); + SingleResponse processRecord(SingleRecord r); } diff --git a/core/src/main/java/io/temporal/samples/batch/iterator/SingleListingMigrationWorkflowImpl.java b/core/src/main/java/io/temporal/samples/batch/iterator/SingleListingMigrationWorkflowImpl.java new file mode 100644 index 000000000..436d2cc49 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/batch/iterator/SingleListingMigrationWorkflowImpl.java @@ -0,0 +1,21 @@ +package io.temporal.samples.batch.iterator; + +import io.temporal.workflow.Workflow; +import java.time.Duration; +import java.util.Random; +import org.slf4j.Logger; + +/** Fake RecordProcessorWorkflow implementation. */ +public class SingleListingMigrationWorkflowImpl implements SingleListingMigrationWorkflow { + public static final Logger log = Workflow.getLogger(SingleListingMigrationWorkflowImpl.class); + private final Random random = Workflow.newRandom(); + + @Override + public SingleResponse processRecord(SingleRecord r) { + // Simulate some processing + int result = random.nextInt(30) + r.getId(); + Workflow.sleep(Duration.ofSeconds(random.nextInt(5))); + log.info("Processed {}, result={}", r, result); + return new SingleResponse(r.getId(), result); + } +} diff --git a/core/src/main/java/io/temporal/samples/batch/iterator/SingleResponse.java b/core/src/main/java/io/temporal/samples/batch/iterator/SingleResponse.java new file mode 100644 index 000000000..ffacb60ee --- /dev/null +++ b/core/src/main/java/io/temporal/samples/batch/iterator/SingleResponse.java @@ -0,0 +1,27 @@ +package io.temporal.samples.batch.iterator; + +public class SingleResponse { + + private int input; + private int result; + + public SingleResponse() {} + + public SingleResponse(int input, int result) { + this.input = input; + this.result = result; + } + + public int getInput() { + return input; + } + + public int getResult() { + return result; + } + + @Override + public String toString() { + return "SingleResponse{" + "input=" + input + ", result=" + result + '}'; + } +} diff --git a/core/src/main/java/io/temporal/samples/workerversioning/Starter.java b/core/src/main/java/io/temporal/samples/workerversioning/Starter.java index a48902557..baf8b5295 100644 --- a/core/src/main/java/io/temporal/samples/workerversioning/Starter.java +++ b/core/src/main/java/io/temporal/samples/workerversioning/Starter.java @@ -146,6 +146,7 @@ private static void waitForWorkerAndMakeCurrent( break; } } catch (Exception ignored) { + System.out.println("Worker deployment not found yet, retrying..."); } Thread.sleep(1000); } diff --git a/core/src/test/java/io/temporal/samples/batch/iterator/IteratorIteratorBatchWorkflowTest.java b/core/src/test/java/io/temporal/samples/batch/iterator/IteratorIteratorBatchWorkflowTest.java index 12ff30b3b..1aca0f603 100644 --- a/core/src/test/java/io/temporal/samples/batch/iterator/IteratorIteratorBatchWorkflowTest.java +++ b/core/src/test/java/io/temporal/samples/batch/iterator/IteratorIteratorBatchWorkflowTest.java @@ -15,7 +15,8 @@ public class IteratorIteratorBatchWorkflowTest { /** The sample RecordLoaderImpl always returns the fixed number pages. */ private static boolean[] processedRecords = new boolean[PAGE_SIZE * PAGE_COUNT]; - public static class TestRecordProcessorWorkflowImpl implements RecordProcessorWorkflow { + public static class TestSingleListingMigrationWorkflowImpl + implements SingleListingMigrationWorkflow { @Override public void processRecord(SingleRecord r) { @@ -27,7 +28,8 @@ public void processRecord(SingleRecord r) { @Rule public TestWorkflowRule testWorkflowRule = TestWorkflowRule.newBuilder() - .setWorkflowTypes(IteratorBatchWorkflowImpl.class, TestRecordProcessorWorkflowImpl.class) + .setWorkflowTypes( + IteratorBatchWorkflowImpl.class, TestSingleListingMigrationWorkflowImpl.class) .setActivityImplementations(new RecordLoaderImpl()) .build();