From eb3042b50a6a7540498bd1382995093a698c6ba1 Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Mon, 5 Jan 2026 11:01:58 -0500 Subject: [PATCH 1/4] made task trait associate an error type --- README.md | 50 ++++++++++++++++++++++++++- src/error.rs | 54 +++++++++++++++++++++++++++-- src/lib.rs | 11 ++++++ src/task.rs | 30 +++++++++++++++-- tests/common/tasks.rs | 75 ++++++++++++++++++++++++++++++----------- tests/execution_test.rs | 13 +++++++ 6 files changed, 207 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 35a4044..fb8639d 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,15 @@ struct ResearchResult { sources: Vec, } +// Define a typed error for your task +#[derive(Debug, Serialize, thiserror::Error)] +enum ResearchError { + #[error("No sources found for query")] + NoSources, + #[error("Analysis failed: {0}")] + AnalysisFailed(String), +} + // Implement the Task trait struct ResearchTask; @@ -64,6 +73,7 @@ impl Task for ResearchTask { fn name() -> Cow<'static, str> { Cow::Borrowed("research") } type Params = ResearchParams; type Output = ResearchResult; + type Error = ResearchError; async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult { // Phase 1: Find relevant sources (checkpointed) @@ -140,6 +150,7 @@ impl Task for MyTask { fn name() -> Cow<'static, str> { Cow::Borrowed("my-task") } // Unique identifier type Params = MyParams; // Input (JSON-serializable) type Output = MyOutput; // Output (JSON-serializable) + type Error = MyError; // Error type (see below) async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult { // Your task logic here @@ -147,6 +158,41 @@ impl Task for MyTask { } ``` +### Error Types + +Tasks declare an error type via `type Error`. This enables structured error reporting: + +```rust +#[derive(Debug, Serialize, thiserror::Error)] +enum MyError { + #[error("Invalid input: {0}")] + InvalidInput(String), + #[error("External service failed")] + ServiceError, +} + +impl Task for MyTask { + type Error = MyError; + // ... + + async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult { + // Return typed errors using TaskError::user() + if params.value < 0 { + return Err(TaskError::user(MyError::InvalidInput("negative value".into()))); + } + + // Or simple string errors + if something_wrong { + return Err(TaskError::user_message("Something went wrong")); + } + + Ok(result) + } +} +``` + +The `TaskError::user()` helper serializes your error type to JSON for storage in the database, enabling structured error analysis and debugging. + ### TaskContext The [`TaskContext`] provides methods for durable execution: @@ -287,7 +333,9 @@ This is useful when you need to guarantee that a task is only enqueued if relate | [`Task`] | Trait for defining task types | | [`TaskContext`] | Context passed to task execution | | [`TaskResult`] | Result type alias for task returns | -| [`TaskError`] | Error type with control flow signals | +| [`TaskError`] | Error type with control flow signals and user errors | +| [`TaskError::user()`] | Helper to create typed user errors | +| [`TaskError::user_message()`] | Helper to create string user errors | | [`TaskHandle`] | Handle to a spawned subtask (returned by `ctx.spawn()`) | ### Configuration diff --git a/src/error.rs b/src/error.rs index e7d7df4..e81399e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -105,10 +105,22 @@ pub enum TaskError { message: String, }, - /// An error from user task code. + /// A typed error from user task code. /// - /// This is the catch-all variant for errors returned by task implementations. - /// Use `anyhow::anyhow!()` or `?` on any error type to create this variant. + /// This variant stores a serialized user error for persistence and retrieval. + /// Created automatically when a task returns its custom `Error` type. + #[error("{message}")] + User { + /// The error message (from Display impl) + message: String, + /// Serialized error data for storage/retrieval + error_data: JsonValue, + }, + + /// An untyped error from user task code. + /// + /// This is the catch-all variant for errors returned by task implementations + /// that use `anyhow::Error` as their error type. #[error(transparent)] TaskInternal(#[from] anyhow::Error), } @@ -118,6 +130,35 @@ pub enum TaskError { /// Use this as the return type for [`Task::run`](crate::Task::run) implementations. pub type TaskResult = Result; +impl TaskError { + /// Create a user error with a message and optional structured data. + /// + /// Use this to return typed errors from your task: + /// + /// ```ignore + /// #[derive(Debug, Serialize)] + /// struct MyError { code: i32, details: String } + /// + /// // In your task: + /// Err(TaskError::user(MyError { code: 404, details: "Not found".into() })) + /// ``` + pub fn user(err: E) -> Self { + let message = err.to_string(); + let error_data = serde_json::to_value(&err) + .unwrap_or_else(|_| serde_json::Value::String(message.clone())); + TaskError::User { message, error_data } + } + + /// Create a user error from just a message string. + pub fn user_message(message: impl Into) -> Self { + let message = message.into(); + TaskError::User { + error_data: serde_json::Value::String(message.clone()), + message, + } + } +} + impl From for TaskError { fn from(err: serde_json::Error) -> Self { TaskError::Serialization(err) @@ -192,6 +233,13 @@ pub fn serialize_task_error(err: &TaskError) -> JsonValue { "message": message, }) } + TaskError::User { message, error_data } => { + serde_json::json!({ + "name": "User", + "message": message, + "error_data": error_data, + }) + } TaskError::TaskInternal(e) => { serde_json::json!({ "name": "TaskInternal", diff --git a/src/lib.rs b/src/lib.rs index 1c5d404..1891c4d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,11 @@ //! #[derive(Serialize, Deserialize)] //! struct MyOutput { result: i32 } //! +//! // Simple error type for the task +//! #[derive(Debug, Serialize, thiserror::Error)] +//! #[error("{0}")] +//! struct MyError(String); +//! //! struct MyTask; //! //! #[async_trait] @@ -26,6 +31,7 @@ //! fn name() -> Cow<'static, str> { Cow::Borrowed("my-task") } //! type Params = MyParams; //! type Output = MyOutput; +//! type Error = MyError; //! //! async fn run(params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult { //! let doubled = ctx.step("double", || async { @@ -61,6 +67,10 @@ //! http_client: reqwest::Client, //! } //! +//! #[derive(Debug, Serialize, thiserror::Error)] +//! #[error("{0}")] +//! struct FetchError(String); +//! //! struct FetchTask; //! //! #[async_trait] @@ -68,6 +78,7 @@ //! fn name() -> Cow<'static, str> { Cow::Borrowed("fetch") } //! type Params = String; //! type Output = String; +//! type Error = FetchError; //! //! async fn run(url: Self::Params, mut ctx: TaskContext, state: AppState) -> TaskResult { //! ctx.step("fetch", || async { diff --git a/src/task.rs b/src/task.rs index 6d5126e..5537838 100644 --- a/src/task.rs +++ b/src/task.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use serde::{Serialize, de::DeserializeOwned}; use serde_json::Value as JsonValue; use std::borrow::Cow; +use std::error::Error; use std::marker::PhantomData; use crate::context::TaskContext; @@ -28,6 +29,7 @@ use crate::error::{TaskError, TaskResult}; /// fn name() -> Cow<'static, str> { Cow::Borrowed("send-email") } /// type Params = SendEmailParams; /// type Output = SendEmailResult; +/// type Error = anyhow::Error; // For documentation; actual return is TaskResult /// /// async fn run(params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult { /// let result = ctx.step("send", || async { @@ -38,12 +40,20 @@ use crate::error::{TaskError, TaskResult}; /// } /// } /// -/// // With application state: +/// // With application state and typed errors: /// #[derive(Clone)] /// struct AppState { /// http_client: reqwest::Client, /// } /// +/// #[derive(Debug, Clone, Serialize, thiserror::Error)] +/// pub enum FetchError { +/// #[error("HTTP error: {0}")] +/// Http(String), +/// #[error("Invalid URL: {0}")] +/// InvalidUrl(String), +/// } +/// /// struct FetchUrlTask; /// /// #[async_trait] @@ -51,11 +61,14 @@ use crate::error::{TaskError, TaskResult}; /// fn name() -> Cow<'static, str> { Cow::Borrowed("fetch-url") } /// type Params = String; /// type Output = String; +/// type Error = FetchError; /// /// async fn run(url: Self::Params, mut ctx: TaskContext, state: AppState) -> TaskResult { /// let body = ctx.step("fetch", || async { -/// state.http_client.get(&url).send().await?.text().await -/// .map_err(|e| anyhow::anyhow!(e)) +/// state.http_client.get(&url).send().await +/// .map_err(|e| anyhow::anyhow!("HTTP error: {}", e))? +/// .text().await +/// .map_err(|e| anyhow::anyhow!("HTTP error: {}", e)) /// }).await?; /// Ok(body) /// } @@ -76,12 +89,23 @@ where /// Output type (must be JSON-serializable) type Output: Serialize + DeserializeOwned + Send; + /// Error type for this task. + /// + /// Use a custom enum with `#[derive(Serialize, thiserror::Error)]` for typed errors + /// that will be stored as structured JSON in the database. + /// + /// Note: Tasks still return `TaskResult` to preserve control flow + /// (suspend, cancel). Use `TaskError::user(your_error)` to wrap your typed errors. + type Error: Error + Send + 'static; + /// Execute the task logic. /// /// Return `Ok(output)` on success, or `Err(TaskError)` on failure. /// Use `?` freely - errors will propagate and the task will be retried /// according to its [`RetryStrategy`](crate::RetryStrategy). /// + /// To return a typed error, use `TaskError::user(your_error)`. + /// /// The [`TaskContext`] provides methods for checkpointing, sleeping, /// and waiting for events. See [`TaskContext`] for details. /// diff --git a/tests/common/tasks.rs b/tests/common/tasks.rs index 91319f7..f2bde0a 100644 --- a/tests/common/tasks.rs +++ b/tests/common/tasks.rs @@ -1,9 +1,26 @@ use durable::{SpawnOptions, Task, TaskContext, TaskError, TaskHandle, TaskResult, async_trait}; use serde::{Deserialize, Serialize}; use std::borrow::Cow; +use std::fmt; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +// ============================================================================ +// Simple error type for tests +// ============================================================================ + +/// Simple string-based error for test tasks +#[derive(Debug, Clone, Serialize)] +pub struct TestError(pub String); + +impl fmt::Display for TestError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for TestError {} + // ============================================================================ // ResearchTask - Example from README demonstrating multi-step checkpointing // ============================================================================ @@ -31,6 +48,7 @@ impl Task<()> for ResearchTask { } type Params = ResearchParams; type Output = ResearchResult; + type Error = TestError; async fn run( params: Self::Params, @@ -92,6 +110,7 @@ impl Task<()> for EchoTask { } type Params = EchoParams; type Output = String; + type Error = TestError; async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok(params.message) @@ -118,12 +137,10 @@ impl Task<()> for FailingTask { } type Params = FailingParams; type Output = (); + type Error = TestError; async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { - Err(TaskError::TaskInternal(anyhow::anyhow!( - "{}", - params.error_message - ))) + Err(TaskError::user_message(format!("{}", params.error_message))) } } @@ -149,6 +166,7 @@ impl Task<()> for MultiStepTask { } type Params = (); type Output = MultiStepOutput; + type Error = TestError; async fn run( _params: Self::Params, @@ -186,6 +204,7 @@ impl Task<()> for SleepingTask { } type Params = SleepParams; type Output = String; + type Error = TestError; async fn run( params: Self::Params, @@ -219,6 +238,7 @@ impl Task<()> for EventWaitingTask { } type Params = EventWaitParams; type Output = serde_json::Value; + type Error = TestError; async fn run( params: Self::Params, @@ -270,6 +290,7 @@ impl Task<()> for StepCountingTask { } type Params = StepCountingParams; type Output = StepCountingOutput; + type Error = TestError; async fn run( params: Self::Params, @@ -290,9 +311,7 @@ impl Task<()> for StepCountingTask { .await?; if params.fail_after_step2 { - return Err(TaskError::TaskInternal(anyhow::anyhow!( - "Intentional failure after step2" - ))); + return Err(TaskError::user_message(format!("Intentional failure after step2"))); } let step3_value: String = ctx @@ -323,6 +342,7 @@ impl Task<()> for EmptyParamsTask { } type Params = (); type Output = String; + type Error = TestError; async fn run(_params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok("completed".to_string()) @@ -349,6 +369,7 @@ impl Task<()> for HeartbeatTask { } type Params = HeartbeatParams; type Output = u32; + type Error = TestError; async fn run(params: Self::Params, ctx: TaskContext, _state: ()) -> TaskResult { for _i in 0..params.iterations { @@ -383,6 +404,7 @@ impl Task<()> for ConvenienceMethodsTask { } type Params = (); type Output = ConvenienceMethodsOutput; + type Error = TestError; async fn run( _params: Self::Params, @@ -424,6 +446,7 @@ impl Task<()> for MultipleConvenienceCallsTask { } type Params = (); type Output = MultipleCallsOutput; + type Error = TestError; async fn run( _params: Self::Params, @@ -458,6 +481,7 @@ impl Task<()> for ReservedPrefixTask { } type Params = (); type Output = (); + type Error = TestError; async fn run( _params: Self::Params, @@ -493,6 +517,7 @@ impl Task<()> for DoubleTask { } type Params = DoubleParams; type Output = i32; + type Error = TestError; async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok(params.value * 2) @@ -510,11 +535,10 @@ impl Task<()> for FailingChildTask { } type Params = (); type Output = (); + type Error = TestError; async fn run(_params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { - Err(TaskError::TaskInternal(anyhow::anyhow!( - "Child task failed intentionally" - ))) + Err(TaskError::user_message(format!("Child task failed intentionally"))) } } @@ -545,6 +569,7 @@ impl Task<()> for SingleSpawnTask { } type Params = SingleSpawnParams; type Output = SingleSpawnOutput; + type Error = TestError; async fn run( params: Self::Params, @@ -592,6 +617,7 @@ impl Task<()> for MultiSpawnTask { } type Params = MultiSpawnParams; type Output = MultiSpawnOutput; + type Error = TestError; async fn run( params: Self::Params, @@ -633,6 +659,7 @@ impl Task<()> for SpawnFailingChildTask { } type Params = (); type Output = (); + type Error = TestError; async fn run( _params: Self::Params, @@ -679,6 +706,7 @@ impl Task<()> for LongRunningHeartbeatTask { } type Params = LongRunningHeartbeatParams; type Output = String; + type Error = TestError; async fn run(params: Self::Params, ctx: TaskContext, _state: ()) -> TaskResult { let start = std::time::Instant::now(); @@ -711,6 +739,7 @@ impl Task<()> for SlowChildTask { } type Params = SlowChildParams; type Output = String; + type Error = TestError; async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { tokio::time::sleep(std::time::Duration::from_millis(params.sleep_ms)).await; @@ -735,6 +764,7 @@ impl Task<()> for SpawnSlowChildTask { } type Params = SpawnSlowChildParams; type Output = String; + type Error = TestError; async fn run( params: Self::Params, @@ -779,6 +809,7 @@ impl Task<()> for EventEmitterTask { } type Params = EventEmitterParams; type Output = String; + type Error = TestError; async fn run(params: Self::Params, ctx: TaskContext, _state: ()) -> TaskResult { ctx.emit_event(¶ms.event_name, ¶ms.payload).await?; @@ -806,6 +837,7 @@ impl Task<()> for ManyStepsTask { } type Params = ManyStepsParams; type Output = u32; + type Error = TestError; async fn run( params: Self::Params, @@ -842,6 +874,7 @@ impl Task<()> for LargePayloadTask { } type Params = LargePayloadParams; type Output = String; + type Error = TestError; async fn run( params: Self::Params, @@ -879,6 +912,7 @@ impl Task<()> for CpuBoundTask { } type Params = CpuBoundParams; type Output = String; + type Error = TestError; async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { let start = std::time::Instant::now(); @@ -915,6 +949,7 @@ impl Task<()> for SlowNoHeartbeatTask { } type Params = SlowNoHeartbeatParams; type Output = String; + type Error = TestError; async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { // Just sleep - no heartbeat calls @@ -1008,6 +1043,7 @@ impl Task<()> for DeterministicReplayTask { } type Params = DeterministicReplayParams; type Output = DeterministicReplayOutput; + type Error = TestError; async fn run( params: Self::Params, @@ -1031,9 +1067,7 @@ impl Task<()> for DeterministicReplayTask { }); if should_fail { - return Err(TaskError::TaskInternal(anyhow::anyhow!( - "First attempt failure" - ))); + return Err(TaskError::user_message(format!("First attempt failure"))); } } @@ -1074,6 +1108,7 @@ impl Task<()> for EventThenFailTask { } type Params = EventThenFailParams; type Output = serde_json::Value; + type Error = TestError; async fn run( params: Self::Params, @@ -1095,9 +1130,7 @@ impl Task<()> for EventThenFailTask { }); if should_fail { - return Err(TaskError::TaskInternal(anyhow::anyhow!( - "First attempt failure after event" - ))); + return Err(TaskError::user_message(format!("First attempt failure after event"))); } // Second attempt succeeds with the same payload (from checkpoint) @@ -1126,6 +1159,7 @@ impl Task<()> for EventThenDelayTask { } type Params = EventThenDelayParams; type Output = serde_json::Value; + type Error = TestError; async fn run( params: Self::Params, @@ -1165,6 +1199,7 @@ impl Task<()> for MultiEventTask { } type Params = MultiEventParams; type Output = serde_json::Value; + type Error = TestError; async fn run( params: Self::Params, @@ -1210,6 +1245,7 @@ impl Task<()> for SpawnThenFailTask { } type Params = SpawnThenFailParams; type Output = serde_json::Value; + type Error = TestError; async fn run( params: Self::Params, @@ -1241,9 +1277,7 @@ impl Task<()> for SpawnThenFailTask { }); if should_fail { - return Err(TaskError::TaskInternal(anyhow::anyhow!( - "First attempt failure after spawn" - ))); + return Err(TaskError::user_message(format!("First attempt failure after spawn"))); } // Second attempt - join child @@ -1282,6 +1316,7 @@ impl Task<()> for SpawnByNameTask { } type Params = SpawnByNameParams; type Output = SpawnByNameOutput; + type Error = TestError; async fn run( params: Self::Params, @@ -1325,6 +1360,7 @@ impl Task<()> for JoinCancelledChildTask { } type Params = JoinCancelledChildParams; type Output = String; + type Error = TestError; async fn run( params: Self::Params, @@ -1371,6 +1407,7 @@ impl Task<()> for VerySlowChildTask { } type Params = VerySlowChildParams; type Output = String; + type Error = TestError; async fn run(params: Self::Params, ctx: TaskContext, _state: ()) -> TaskResult { // Heartbeat regularly to keep this task alive diff --git a/tests/execution_test.rs b/tests/execution_test.rs index 876b2e5..ab7980b 100644 --- a/tests/execution_test.rs +++ b/tests/execution_test.rs @@ -730,6 +730,18 @@ struct WriteToDbParams { value: String, } +/// Simple error type for WriteToDbTask +#[derive(Debug, serde::Serialize)] +struct WriteToDbError(String); + +impl std::fmt::Display for WriteToDbError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for WriteToDbError {} + #[durable::async_trait] impl durable::Task for WriteToDbTask { fn name() -> Cow<'static, str> { @@ -737,6 +749,7 @@ impl durable::Task for WriteToDbTask { } type Params = WriteToDbParams; type Output = i64; + type Error = WriteToDbError; async fn run( params: Self::Params, From 1e1c81e9566add9ca2e81263c80215f50edd0dfd Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Mon, 5 Jan 2026 11:32:48 -0500 Subject: [PATCH 2/4] fixed clippies? --- benches/common/tasks.rs | 4 ++++ src/error.rs | 10 ++++++++-- tests/common/tasks.rs | 21 +++++++++++++++------ 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/benches/common/tasks.rs b/benches/common/tasks.rs index ae668ec..bfaf7a3 100644 --- a/benches/common/tasks.rs +++ b/benches/common/tasks.rs @@ -16,6 +16,7 @@ impl Task<()> for NoOpTask { } type Params = (); type Output = (); + type Error = std::convert::Infallible; async fn run(_params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok(()) @@ -42,6 +43,7 @@ impl Task<()> for QuickTask { } type Params = QuickParams; type Output = u32; + type Error = std::convert::Infallible; async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok(params.task_num) @@ -68,6 +70,7 @@ impl Task<()> for MultiStepBenchTask { } type Params = MultiStepParams; type Output = u32; + type Error = std::convert::Infallible; async fn run( params: Self::Params, @@ -103,6 +106,7 @@ impl Task<()> for LargePayloadBenchTask { } type Params = LargePayloadParams; type Output = usize; + type Error = std::convert::Infallible; async fn run( params: Self::Params, diff --git a/src/error.rs b/src/error.rs index e81399e..1fee849 100644 --- a/src/error.rs +++ b/src/error.rs @@ -146,7 +146,10 @@ impl TaskError { let message = err.to_string(); let error_data = serde_json::to_value(&err) .unwrap_or_else(|_| serde_json::Value::String(message.clone())); - TaskError::User { message, error_data } + TaskError::User { + message, + error_data, + } } /// Create a user error from just a message string. @@ -233,7 +236,10 @@ pub fn serialize_task_error(err: &TaskError) -> JsonValue { "message": message, }) } - TaskError::User { message, error_data } => { + TaskError::User { + message, + error_data, + } => { serde_json::json!({ "name": "User", "message": message, diff --git a/tests/common/tasks.rs b/tests/common/tasks.rs index f2bde0a..7a28b32 100644 --- a/tests/common/tasks.rs +++ b/tests/common/tasks.rs @@ -10,6 +10,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; // ============================================================================ /// Simple string-based error for test tasks +#[allow(dead_code)] #[derive(Debug, Clone, Serialize)] pub struct TestError(pub String); @@ -140,7 +141,7 @@ impl Task<()> for FailingTask { type Error = TestError; async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { - Err(TaskError::user_message(format!("{}", params.error_message))) + Err(TaskError::user_message(params.error_message.to_string())) } } @@ -311,7 +312,9 @@ impl Task<()> for StepCountingTask { .await?; if params.fail_after_step2 { - return Err(TaskError::user_message(format!("Intentional failure after step2"))); + return Err(TaskError::user_message( + "Intentional failure after step2".to_string(), + )); } let step3_value: String = ctx @@ -538,7 +541,9 @@ impl Task<()> for FailingChildTask { type Error = TestError; async fn run(_params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { - Err(TaskError::user_message(format!("Child task failed intentionally"))) + Err(TaskError::user_message( + "Child task failed intentionally".to_string(), + )) } } @@ -1067,7 +1072,7 @@ impl Task<()> for DeterministicReplayTask { }); if should_fail { - return Err(TaskError::user_message(format!("First attempt failure"))); + return Err(TaskError::user_message("First attempt failure".to_string())); } } @@ -1130,7 +1135,9 @@ impl Task<()> for EventThenFailTask { }); if should_fail { - return Err(TaskError::user_message(format!("First attempt failure after event"))); + return Err(TaskError::user_message( + "First attempt failure after event".to_string(), + )); } // Second attempt succeeds with the same payload (from checkpoint) @@ -1277,7 +1284,9 @@ impl Task<()> for SpawnThenFailTask { }); if should_fail { - return Err(TaskError::user_message(format!("First attempt failure after spawn"))); + return Err(TaskError::user_message( + "First attempt failure after spawn".to_string(), + )); } // Second attempt - join child From 959157951f8f57a61a7fb7c8573302a84d1cecd2 Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Mon, 5 Jan 2026 14:24:26 -0500 Subject: [PATCH 3/4] removed associated error type --- README.md | 49 +++++------------------- benches/common/tasks.rs | 8 ++-- src/error.rs | 37 ++++++++++-------- src/lib.rs | 11 ------ src/task.rs | 26 ++----------- tests/common/tasks.rs | 84 ++++++++++++++++------------------------- tests/execution_test.rs | 13 ------- 7 files changed, 72 insertions(+), 156 deletions(-) diff --git a/README.md b/README.md index fb8639d..2d6e5e6 100644 --- a/README.md +++ b/README.md @@ -56,15 +56,6 @@ struct ResearchResult { sources: Vec, } -// Define a typed error for your task -#[derive(Debug, Serialize, thiserror::Error)] -enum ResearchError { - #[error("No sources found for query")] - NoSources, - #[error("Analysis failed: {0}")] - AnalysisFailed(String), -} - // Implement the Task trait struct ResearchTask; @@ -73,7 +64,6 @@ impl Task for ResearchTask { fn name() -> Cow<'static, str> { Cow::Borrowed("research") } type Params = ResearchParams; type Output = ResearchResult; - type Error = ResearchError; async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult { // Phase 1: Find relevant sources (checkpointed) @@ -150,7 +140,6 @@ impl Task for MyTask { fn name() -> Cow<'static, str> { Cow::Borrowed("my-task") } // Unique identifier type Params = MyParams; // Input (JSON-serializable) type Output = MyOutput; // Output (JSON-serializable) - type Error = MyError; // Error type (see below) async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult { // Your task logic here @@ -158,40 +147,22 @@ impl Task for MyTask { } ``` -### Error Types +### User Errors -Tasks declare an error type via `type Error`. This enables structured error reporting: +Return user errors with structured data using `TaskError::user()`: ```rust -#[derive(Debug, Serialize, thiserror::Error)] -enum MyError { - #[error("Invalid input: {0}")] - InvalidInput(String), - #[error("External service failed")] - ServiceError, -} +// With structured data (message extracted from "message" field if present) +Err(TaskError::user(json!({"message": "Not found", "code": 404}))) -impl Task for MyTask { - type Error = MyError; - // ... +// With any serializable type +Err(TaskError::user(MyError { code: 404, details: "..." })) - async fn run(params: Self::Params, mut ctx: TaskContext) -> TaskResult { - // Return typed errors using TaskError::user() - if params.value < 0 { - return Err(TaskError::user(MyError::InvalidInput("negative value".into()))); - } - - // Or simple string errors - if something_wrong { - return Err(TaskError::user_message("Something went wrong")); - } - - Ok(result) - } -} +// Simple string message +Err(TaskError::user_message("Something went wrong")) ``` -The `TaskError::user()` helper serializes your error type to JSON for storage in the database, enabling structured error analysis and debugging. +The error data is serialized to JSON and stored in the database for debugging and analysis. ### TaskContext @@ -334,7 +305,7 @@ This is useful when you need to guarantee that a task is only enqueued if relate | [`TaskContext`] | Context passed to task execution | | [`TaskResult`] | Result type alias for task returns | | [`TaskError`] | Error type with control flow signals and user errors | -| [`TaskError::user()`] | Helper to create typed user errors | +| [`TaskError::user()`] | Helper to create user errors with JSON data | | [`TaskError::user_message()`] | Helper to create string user errors | | [`TaskHandle`] | Handle to a spawned subtask (returned by `ctx.spawn()`) | diff --git a/benches/common/tasks.rs b/benches/common/tasks.rs index bfaf7a3..8448528 100644 --- a/benches/common/tasks.rs +++ b/benches/common/tasks.rs @@ -16,7 +16,7 @@ impl Task<()> for NoOpTask { } type Params = (); type Output = (); - type Error = std::convert::Infallible; + async fn run(_params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok(()) @@ -43,7 +43,7 @@ impl Task<()> for QuickTask { } type Params = QuickParams; type Output = u32; - type Error = std::convert::Infallible; + async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok(params.task_num) @@ -70,7 +70,7 @@ impl Task<()> for MultiStepBenchTask { } type Params = MultiStepParams; type Output = u32; - type Error = std::convert::Infallible; + async fn run( params: Self::Params, @@ -106,7 +106,7 @@ impl Task<()> for LargePayloadBenchTask { } type Params = LargePayloadParams; type Output = usize; - type Error = std::convert::Infallible; + async fn run( params: Self::Params, diff --git a/src/error.rs b/src/error.rs index 1fee849..37a996d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -105,22 +105,22 @@ pub enum TaskError { message: String, }, - /// A typed error from user task code. + /// A user error from task code. /// /// This variant stores a serialized user error for persistence and retrieval. - /// Created automatically when a task returns its custom `Error` type. + /// Created via [`TaskError::user()`] or [`TaskError::user_message()`]. #[error("{message}")] User { - /// The error message (from Display impl) + /// The error message (extracted from "message" field or stringified data) message: String, /// Serialized error data for storage/retrieval error_data: JsonValue, }, - /// An untyped error from user task code. + /// An internal error from user task code. /// - /// This is the catch-all variant for errors returned by task implementations - /// that use `anyhow::Error` as their error type. + /// This is the catch-all variant for errors propagated via `?` on anyhow errors. + /// For structured user errors, prefer using [`TaskError::user()`]. #[error(transparent)] TaskInternal(#[from] anyhow::Error), } @@ -131,21 +131,26 @@ pub enum TaskError { pub type TaskResult = Result; impl TaskError { - /// Create a user error with a message and optional structured data. + /// Create a user error from arbitrary JSON data. /// - /// Use this to return typed errors from your task: + /// If the JSON is an object with a "message" field, that's used for display. + /// Otherwise, the JSON is stringified for the display message. /// /// ```ignore - /// #[derive(Debug, Serialize)] - /// struct MyError { code: i32, details: String } + /// // With structured data + /// Err(TaskError::user(json!({"message": "Not found", "code": 404}))) /// - /// // In your task: - /// Err(TaskError::user(MyError { code: 404, details: "Not found".into() })) + /// // With any serializable type + /// Err(TaskError::user(MyError { code: 404, details: "..." })) /// ``` - pub fn user(err: E) -> Self { - let message = err.to_string(); - let error_data = serde_json::to_value(&err) - .unwrap_or_else(|_| serde_json::Value::String(message.clone())); + pub fn user(error_data: impl serde::Serialize) -> Self { + let error_data = + serde_json::to_value(&error_data).unwrap_or(serde_json::Value::Null); + let message = error_data + .get("message") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .unwrap_or_else(|| error_data.to_string()); TaskError::User { message, error_data, diff --git a/src/lib.rs b/src/lib.rs index 1891c4d..1c5d404 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,11 +19,6 @@ //! #[derive(Serialize, Deserialize)] //! struct MyOutput { result: i32 } //! -//! // Simple error type for the task -//! #[derive(Debug, Serialize, thiserror::Error)] -//! #[error("{0}")] -//! struct MyError(String); -//! //! struct MyTask; //! //! #[async_trait] @@ -31,7 +26,6 @@ //! fn name() -> Cow<'static, str> { Cow::Borrowed("my-task") } //! type Params = MyParams; //! type Output = MyOutput; -//! type Error = MyError; //! //! async fn run(params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult { //! let doubled = ctx.step("double", || async { @@ -67,10 +61,6 @@ //! http_client: reqwest::Client, //! } //! -//! #[derive(Debug, Serialize, thiserror::Error)] -//! #[error("{0}")] -//! struct FetchError(String); -//! //! struct FetchTask; //! //! #[async_trait] @@ -78,7 +68,6 @@ //! fn name() -> Cow<'static, str> { Cow::Borrowed("fetch") } //! type Params = String; //! type Output = String; -//! type Error = FetchError; //! //! async fn run(url: Self::Params, mut ctx: TaskContext, state: AppState) -> TaskResult { //! ctx.step("fetch", || async { diff --git a/src/task.rs b/src/task.rs index 5537838..54f6019 100644 --- a/src/task.rs +++ b/src/task.rs @@ -2,7 +2,6 @@ use async_trait::async_trait; use serde::{Serialize, de::DeserializeOwned}; use serde_json::Value as JsonValue; use std::borrow::Cow; -use std::error::Error; use std::marker::PhantomData; use crate::context::TaskContext; @@ -29,7 +28,6 @@ use crate::error::{TaskError, TaskResult}; /// fn name() -> Cow<'static, str> { Cow::Borrowed("send-email") } /// type Params = SendEmailParams; /// type Output = SendEmailResult; -/// type Error = anyhow::Error; // For documentation; actual return is TaskResult /// /// async fn run(params: Self::Params, mut ctx: TaskContext, _state: ()) -> TaskResult { /// let result = ctx.step("send", || async { @@ -40,20 +38,12 @@ use crate::error::{TaskError, TaskResult}; /// } /// } /// -/// // With application state and typed errors: +/// // With application state: /// #[derive(Clone)] /// struct AppState { /// http_client: reqwest::Client, /// } /// -/// #[derive(Debug, Clone, Serialize, thiserror::Error)] -/// pub enum FetchError { -/// #[error("HTTP error: {0}")] -/// Http(String), -/// #[error("Invalid URL: {0}")] -/// InvalidUrl(String), -/// } -/// /// struct FetchUrlTask; /// /// #[async_trait] @@ -61,7 +51,6 @@ use crate::error::{TaskError, TaskResult}; /// fn name() -> Cow<'static, str> { Cow::Borrowed("fetch-url") } /// type Params = String; /// type Output = String; -/// type Error = FetchError; /// /// async fn run(url: Self::Params, mut ctx: TaskContext, state: AppState) -> TaskResult { /// let body = ctx.step("fetch", || async { @@ -89,22 +78,15 @@ where /// Output type (must be JSON-serializable) type Output: Serialize + DeserializeOwned + Send; - /// Error type for this task. - /// - /// Use a custom enum with `#[derive(Serialize, thiserror::Error)]` for typed errors - /// that will be stored as structured JSON in the database. - /// - /// Note: Tasks still return `TaskResult` to preserve control flow - /// (suspend, cancel). Use `TaskError::user(your_error)` to wrap your typed errors. - type Error: Error + Send + 'static; - /// Execute the task logic. /// /// Return `Ok(output)` on success, or `Err(TaskError)` on failure. /// Use `?` freely - errors will propagate and the task will be retried /// according to its [`RetryStrategy`](crate::RetryStrategy). /// - /// To return a typed error, use `TaskError::user(your_error)`. + /// For user errors with structured data, use `TaskError::user(data)` where + /// data is any serializable value. For simple message errors, use + /// `TaskError::user_message("message")`. /// /// The [`TaskContext`] provides methods for checkpointing, sleeping, /// and waiting for events. See [`TaskContext`] for details. diff --git a/tests/common/tasks.rs b/tests/common/tasks.rs index 7a28b32..9e80e82 100644 --- a/tests/common/tasks.rs +++ b/tests/common/tasks.rs @@ -1,27 +1,9 @@ use durable::{SpawnOptions, Task, TaskContext, TaskError, TaskHandle, TaskResult, async_trait}; use serde::{Deserialize, Serialize}; use std::borrow::Cow; -use std::fmt; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; -// ============================================================================ -// Simple error type for tests -// ============================================================================ - -/// Simple string-based error for test tasks -#[allow(dead_code)] -#[derive(Debug, Clone, Serialize)] -pub struct TestError(pub String); - -impl fmt::Display for TestError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl std::error::Error for TestError {} - // ============================================================================ // ResearchTask - Example from README demonstrating multi-step checkpointing // ============================================================================ @@ -49,7 +31,7 @@ impl Task<()> for ResearchTask { } type Params = ResearchParams; type Output = ResearchResult; - type Error = TestError; + async fn run( params: Self::Params, @@ -111,7 +93,7 @@ impl Task<()> for EchoTask { } type Params = EchoParams; type Output = String; - type Error = TestError; + async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok(params.message) @@ -138,7 +120,7 @@ impl Task<()> for FailingTask { } type Params = FailingParams; type Output = (); - type Error = TestError; + async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Err(TaskError::user_message(params.error_message.to_string())) @@ -167,7 +149,7 @@ impl Task<()> for MultiStepTask { } type Params = (); type Output = MultiStepOutput; - type Error = TestError; + async fn run( _params: Self::Params, @@ -205,7 +187,7 @@ impl Task<()> for SleepingTask { } type Params = SleepParams; type Output = String; - type Error = TestError; + async fn run( params: Self::Params, @@ -239,7 +221,7 @@ impl Task<()> for EventWaitingTask { } type Params = EventWaitParams; type Output = serde_json::Value; - type Error = TestError; + async fn run( params: Self::Params, @@ -291,7 +273,7 @@ impl Task<()> for StepCountingTask { } type Params = StepCountingParams; type Output = StepCountingOutput; - type Error = TestError; + async fn run( params: Self::Params, @@ -345,7 +327,7 @@ impl Task<()> for EmptyParamsTask { } type Params = (); type Output = String; - type Error = TestError; + async fn run(_params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok("completed".to_string()) @@ -372,7 +354,7 @@ impl Task<()> for HeartbeatTask { } type Params = HeartbeatParams; type Output = u32; - type Error = TestError; + async fn run(params: Self::Params, ctx: TaskContext, _state: ()) -> TaskResult { for _i in 0..params.iterations { @@ -407,7 +389,7 @@ impl Task<()> for ConvenienceMethodsTask { } type Params = (); type Output = ConvenienceMethodsOutput; - type Error = TestError; + async fn run( _params: Self::Params, @@ -449,7 +431,7 @@ impl Task<()> for MultipleConvenienceCallsTask { } type Params = (); type Output = MultipleCallsOutput; - type Error = TestError; + async fn run( _params: Self::Params, @@ -484,7 +466,7 @@ impl Task<()> for ReservedPrefixTask { } type Params = (); type Output = (); - type Error = TestError; + async fn run( _params: Self::Params, @@ -520,7 +502,7 @@ impl Task<()> for DoubleTask { } type Params = DoubleParams; type Output = i32; - type Error = TestError; + async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok(params.value * 2) @@ -538,7 +520,7 @@ impl Task<()> for FailingChildTask { } type Params = (); type Output = (); - type Error = TestError; + async fn run(_params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Err(TaskError::user_message( @@ -574,7 +556,7 @@ impl Task<()> for SingleSpawnTask { } type Params = SingleSpawnParams; type Output = SingleSpawnOutput; - type Error = TestError; + async fn run( params: Self::Params, @@ -622,7 +604,7 @@ impl Task<()> for MultiSpawnTask { } type Params = MultiSpawnParams; type Output = MultiSpawnOutput; - type Error = TestError; + async fn run( params: Self::Params, @@ -664,7 +646,7 @@ impl Task<()> for SpawnFailingChildTask { } type Params = (); type Output = (); - type Error = TestError; + async fn run( _params: Self::Params, @@ -711,7 +693,7 @@ impl Task<()> for LongRunningHeartbeatTask { } type Params = LongRunningHeartbeatParams; type Output = String; - type Error = TestError; + async fn run(params: Self::Params, ctx: TaskContext, _state: ()) -> TaskResult { let start = std::time::Instant::now(); @@ -744,7 +726,7 @@ impl Task<()> for SlowChildTask { } type Params = SlowChildParams; type Output = String; - type Error = TestError; + async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { tokio::time::sleep(std::time::Duration::from_millis(params.sleep_ms)).await; @@ -769,7 +751,7 @@ impl Task<()> for SpawnSlowChildTask { } type Params = SpawnSlowChildParams; type Output = String; - type Error = TestError; + async fn run( params: Self::Params, @@ -814,7 +796,7 @@ impl Task<()> for EventEmitterTask { } type Params = EventEmitterParams; type Output = String; - type Error = TestError; + async fn run(params: Self::Params, ctx: TaskContext, _state: ()) -> TaskResult { ctx.emit_event(¶ms.event_name, ¶ms.payload).await?; @@ -842,7 +824,7 @@ impl Task<()> for ManyStepsTask { } type Params = ManyStepsParams; type Output = u32; - type Error = TestError; + async fn run( params: Self::Params, @@ -879,7 +861,7 @@ impl Task<()> for LargePayloadTask { } type Params = LargePayloadParams; type Output = String; - type Error = TestError; + async fn run( params: Self::Params, @@ -917,7 +899,7 @@ impl Task<()> for CpuBoundTask { } type Params = CpuBoundParams; type Output = String; - type Error = TestError; + async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { let start = std::time::Instant::now(); @@ -954,7 +936,7 @@ impl Task<()> for SlowNoHeartbeatTask { } type Params = SlowNoHeartbeatParams; type Output = String; - type Error = TestError; + async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { // Just sleep - no heartbeat calls @@ -1048,7 +1030,7 @@ impl Task<()> for DeterministicReplayTask { } type Params = DeterministicReplayParams; type Output = DeterministicReplayOutput; - type Error = TestError; + async fn run( params: Self::Params, @@ -1113,7 +1095,7 @@ impl Task<()> for EventThenFailTask { } type Params = EventThenFailParams; type Output = serde_json::Value; - type Error = TestError; + async fn run( params: Self::Params, @@ -1166,7 +1148,7 @@ impl Task<()> for EventThenDelayTask { } type Params = EventThenDelayParams; type Output = serde_json::Value; - type Error = TestError; + async fn run( params: Self::Params, @@ -1206,7 +1188,7 @@ impl Task<()> for MultiEventTask { } type Params = MultiEventParams; type Output = serde_json::Value; - type Error = TestError; + async fn run( params: Self::Params, @@ -1252,7 +1234,7 @@ impl Task<()> for SpawnThenFailTask { } type Params = SpawnThenFailParams; type Output = serde_json::Value; - type Error = TestError; + async fn run( params: Self::Params, @@ -1325,7 +1307,7 @@ impl Task<()> for SpawnByNameTask { } type Params = SpawnByNameParams; type Output = SpawnByNameOutput; - type Error = TestError; + async fn run( params: Self::Params, @@ -1369,7 +1351,7 @@ impl Task<()> for JoinCancelledChildTask { } type Params = JoinCancelledChildParams; type Output = String; - type Error = TestError; + async fn run( params: Self::Params, @@ -1416,7 +1398,7 @@ impl Task<()> for VerySlowChildTask { } type Params = VerySlowChildParams; type Output = String; - type Error = TestError; + async fn run(params: Self::Params, ctx: TaskContext, _state: ()) -> TaskResult { // Heartbeat regularly to keep this task alive diff --git a/tests/execution_test.rs b/tests/execution_test.rs index ab7980b..876b2e5 100644 --- a/tests/execution_test.rs +++ b/tests/execution_test.rs @@ -730,18 +730,6 @@ struct WriteToDbParams { value: String, } -/// Simple error type for WriteToDbTask -#[derive(Debug, serde::Serialize)] -struct WriteToDbError(String); - -impl std::fmt::Display for WriteToDbError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl std::error::Error for WriteToDbError {} - #[durable::async_trait] impl durable::Task for WriteToDbTask { fn name() -> Cow<'static, str> { @@ -749,7 +737,6 @@ impl durable::Task for WriteToDbTask { } type Params = WriteToDbParams; type Output = i64; - type Error = WriteToDbError; async fn run( params: Self::Params, From 82495c7baf2060da71229faa9f0d939602ddf716 Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Mon, 5 Jan 2026 14:26:38 -0500 Subject: [PATCH 4/4] fixed fmt --- benches/common/tasks.rs | 4 ---- src/error.rs | 3 +-- tests/common/tasks.rs | 33 --------------------------------- 3 files changed, 1 insertion(+), 39 deletions(-) diff --git a/benches/common/tasks.rs b/benches/common/tasks.rs index 8448528..ae668ec 100644 --- a/benches/common/tasks.rs +++ b/benches/common/tasks.rs @@ -17,7 +17,6 @@ impl Task<()> for NoOpTask { type Params = (); type Output = (); - async fn run(_params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok(()) } @@ -44,7 +43,6 @@ impl Task<()> for QuickTask { type Params = QuickParams; type Output = u32; - async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok(params.task_num) } @@ -71,7 +69,6 @@ impl Task<()> for MultiStepBenchTask { type Params = MultiStepParams; type Output = u32; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -107,7 +104,6 @@ impl Task<()> for LargePayloadBenchTask { type Params = LargePayloadParams; type Output = usize; - async fn run( params: Self::Params, mut ctx: TaskContext, diff --git a/src/error.rs b/src/error.rs index 37a996d..f55fcfc 100644 --- a/src/error.rs +++ b/src/error.rs @@ -144,8 +144,7 @@ impl TaskError { /// Err(TaskError::user(MyError { code: 404, details: "..." })) /// ``` pub fn user(error_data: impl serde::Serialize) -> Self { - let error_data = - serde_json::to_value(&error_data).unwrap_or(serde_json::Value::Null); + let error_data = serde_json::to_value(&error_data).unwrap_or(serde_json::Value::Null); let message = error_data .get("message") .and_then(|v| v.as_str()) diff --git a/tests/common/tasks.rs b/tests/common/tasks.rs index 9e80e82..1a938be 100644 --- a/tests/common/tasks.rs +++ b/tests/common/tasks.rs @@ -32,7 +32,6 @@ impl Task<()> for ResearchTask { type Params = ResearchParams; type Output = ResearchResult; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -94,7 +93,6 @@ impl Task<()> for EchoTask { type Params = EchoParams; type Output = String; - async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok(params.message) } @@ -121,7 +119,6 @@ impl Task<()> for FailingTask { type Params = FailingParams; type Output = (); - async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Err(TaskError::user_message(params.error_message.to_string())) } @@ -150,7 +147,6 @@ impl Task<()> for MultiStepTask { type Params = (); type Output = MultiStepOutput; - async fn run( _params: Self::Params, mut ctx: TaskContext, @@ -188,7 +184,6 @@ impl Task<()> for SleepingTask { type Params = SleepParams; type Output = String; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -222,7 +217,6 @@ impl Task<()> for EventWaitingTask { type Params = EventWaitParams; type Output = serde_json::Value; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -274,7 +268,6 @@ impl Task<()> for StepCountingTask { type Params = StepCountingParams; type Output = StepCountingOutput; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -328,7 +321,6 @@ impl Task<()> for EmptyParamsTask { type Params = (); type Output = String; - async fn run(_params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok("completed".to_string()) } @@ -355,7 +347,6 @@ impl Task<()> for HeartbeatTask { type Params = HeartbeatParams; type Output = u32; - async fn run(params: Self::Params, ctx: TaskContext, _state: ()) -> TaskResult { for _i in 0..params.iterations { // Simulate work @@ -390,7 +381,6 @@ impl Task<()> for ConvenienceMethodsTask { type Params = (); type Output = ConvenienceMethodsOutput; - async fn run( _params: Self::Params, mut ctx: TaskContext, @@ -432,7 +422,6 @@ impl Task<()> for MultipleConvenienceCallsTask { type Params = (); type Output = MultipleCallsOutput; - async fn run( _params: Self::Params, mut ctx: TaskContext, @@ -467,7 +456,6 @@ impl Task<()> for ReservedPrefixTask { type Params = (); type Output = (); - async fn run( _params: Self::Params, mut ctx: TaskContext, @@ -503,7 +491,6 @@ impl Task<()> for DoubleTask { type Params = DoubleParams; type Output = i32; - async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Ok(params.value * 2) } @@ -521,7 +508,6 @@ impl Task<()> for FailingChildTask { type Params = (); type Output = (); - async fn run(_params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { Err(TaskError::user_message( "Child task failed intentionally".to_string(), @@ -557,7 +543,6 @@ impl Task<()> for SingleSpawnTask { type Params = SingleSpawnParams; type Output = SingleSpawnOutput; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -605,7 +590,6 @@ impl Task<()> for MultiSpawnTask { type Params = MultiSpawnParams; type Output = MultiSpawnOutput; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -647,7 +631,6 @@ impl Task<()> for SpawnFailingChildTask { type Params = (); type Output = (); - async fn run( _params: Self::Params, mut ctx: TaskContext, @@ -694,7 +677,6 @@ impl Task<()> for LongRunningHeartbeatTask { type Params = LongRunningHeartbeatParams; type Output = String; - async fn run(params: Self::Params, ctx: TaskContext, _state: ()) -> TaskResult { let start = std::time::Instant::now(); let total_duration = std::time::Duration::from_millis(params.total_duration_ms); @@ -727,7 +709,6 @@ impl Task<()> for SlowChildTask { type Params = SlowChildParams; type Output = String; - async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { tokio::time::sleep(std::time::Duration::from_millis(params.sleep_ms)).await; Ok("done".to_string()) @@ -752,7 +733,6 @@ impl Task<()> for SpawnSlowChildTask { type Params = SpawnSlowChildParams; type Output = String; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -797,7 +777,6 @@ impl Task<()> for EventEmitterTask { type Params = EventEmitterParams; type Output = String; - async fn run(params: Self::Params, ctx: TaskContext, _state: ()) -> TaskResult { ctx.emit_event(¶ms.event_name, ¶ms.payload).await?; Ok("emitted".to_string()) @@ -825,7 +804,6 @@ impl Task<()> for ManyStepsTask { type Params = ManyStepsParams; type Output = u32; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -862,7 +840,6 @@ impl Task<()> for LargePayloadTask { type Params = LargePayloadParams; type Output = String; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -900,7 +877,6 @@ impl Task<()> for CpuBoundTask { type Params = CpuBoundParams; type Output = String; - async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { let start = std::time::Instant::now(); let duration = std::time::Duration::from_millis(params.duration_ms); @@ -937,7 +913,6 @@ impl Task<()> for SlowNoHeartbeatTask { type Params = SlowNoHeartbeatParams; type Output = String; - async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult { // Just sleep - no heartbeat calls tokio::time::sleep(std::time::Duration::from_millis(params.sleep_ms)).await; @@ -1031,7 +1006,6 @@ impl Task<()> for DeterministicReplayTask { type Params = DeterministicReplayParams; type Output = DeterministicReplayOutput; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -1096,7 +1070,6 @@ impl Task<()> for EventThenFailTask { type Params = EventThenFailParams; type Output = serde_json::Value; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -1149,7 +1122,6 @@ impl Task<()> for EventThenDelayTask { type Params = EventThenDelayParams; type Output = serde_json::Value; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -1189,7 +1161,6 @@ impl Task<()> for MultiEventTask { type Params = MultiEventParams; type Output = serde_json::Value; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -1235,7 +1206,6 @@ impl Task<()> for SpawnThenFailTask { type Params = SpawnThenFailParams; type Output = serde_json::Value; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -1308,7 +1278,6 @@ impl Task<()> for SpawnByNameTask { type Params = SpawnByNameParams; type Output = SpawnByNameOutput; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -1352,7 +1321,6 @@ impl Task<()> for JoinCancelledChildTask { type Params = JoinCancelledChildParams; type Output = String; - async fn run( params: Self::Params, mut ctx: TaskContext, @@ -1399,7 +1367,6 @@ impl Task<()> for VerySlowChildTask { type Params = VerySlowChildParams; type Output = String; - async fn run(params: Self::Params, ctx: TaskContext, _state: ()) -> TaskResult { // Heartbeat regularly to keep this task alive let interval_ms = 500;