Skip to content

Commit 9d94446

Browse files
committed
fix until WorkflowDefinition
Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
1 parent cf54981 commit 9d94446

12 files changed

+167
-294
lines changed

cadence/_internal/workflow/decision_events_iterator.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
from cadence._internal.workflow.history_event_iterator import HistoryEventsIterator
1313
from cadence.api.v1.history_pb2 import HistoryEvent
14-
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
1514

1615

1716
@dataclass
@@ -44,10 +43,8 @@ class DecisionEventsIterator(Iterator[DecisionEvents]):
4443

4544
def __init__(
4645
self,
47-
decision_task: PollForDecisionTaskResponse,
4846
events: List[HistoryEvent],
4947
):
50-
self._decision_task = decision_task
5148
self._events: HistoryEventsIterator = HistoryEventsIterator(events)
5249
self._next_decision_event_id: Optional[int] = None
5350
self._replay_current_time_milliseconds: Optional[int] = None

cadence/_internal/workflow/workflow_engine.py

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
from dataclasses import dataclass
3+
from typing import List
34

45
from cadence._internal.workflow.context import Context
56
from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator
@@ -10,7 +11,10 @@
1011
CompleteWorkflowExecutionDecisionAttributes,
1112
Decision,
1213
)
13-
from cadence.api.v1.history_pb2 import WorkflowExecutionStartedEventAttributes
14+
from cadence.api.v1.history_pb2 import (
15+
HistoryEvent,
16+
WorkflowExecutionStartedEventAttributes,
17+
)
1418
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
1519
from cadence.workflow import WorkflowDefinition, WorkflowInfo
1620

@@ -33,7 +37,8 @@ def __init__(self, info: WorkflowInfo, workflow_definition: WorkflowDefinition):
3337
self._context = Context(info, self._decision_manager)
3438

3539
def process_decision(
36-
self, decision_task: PollForDecisionTaskResponse
40+
self,
41+
events: List[HistoryEvent],
3742
) -> DecisionResult:
3843
"""
3944
Process a decision task and generate decisions using DecisionEventsIterator.
@@ -57,18 +62,14 @@ def process_decision(
5762
"workflow_type": ctx.info().workflow_type,
5863
"workflow_id": ctx.info().workflow_id,
5964
"run_id": ctx.info().workflow_run_id,
60-
"started_event_id": decision_task.started_event_id,
61-
"attempt": decision_task.attempt,
6265
},
6366
)
6467

6568
# Create DecisionEventsIterator for structured event processing
66-
events_iterator = DecisionEventsIterator(
67-
decision_task, ctx.info().workflow_events
68-
)
69+
events_iterator = DecisionEventsIterator(events)
6970

7071
# Process decision events using iterator-driven approach
71-
self._process_decision_events(ctx, events_iterator, decision_task)
72+
self._process_decision_events(ctx, events_iterator)
7273

7374
# Collect all pending decisions from state machines
7475
decisions = self._decision_manager.collect_pending_decisions()
@@ -100,8 +101,6 @@ def process_decision(
100101
"workflow_type": ctx.info().workflow_type,
101102
"workflow_id": ctx.info().workflow_id,
102103
"run_id": ctx.info().workflow_run_id,
103-
"started_event_id": decision_task.started_event_id,
104-
"attempt": decision_task.attempt,
105104
"error_type": type(e).__name__,
106105
},
107106
exc_info=True,
@@ -116,7 +115,6 @@ def _process_decision_events(
116115
self,
117116
ctx: Context,
118117
events_iterator: DecisionEventsIterator,
119-
decision_task: PollForDecisionTaskResponse,
120118
) -> None:
121119
"""
122120
Process decision events using the iterator-driven approach similar to Java client.
@@ -139,15 +137,14 @@ def _process_decision_events(
139137
"Processing decision events batch",
140138
extra={
141139
"workflow_id": ctx.info().workflow_id,
142-
"events_count": len(decision_events.get_events()),
143-
"markers_count": len(decision_events.get_markers()),
144-
"replay_mode": decision_events.is_replay(),
140+
"markers_count": len(decision_events.markers),
141+
"replay_mode": decision_events.replay,
145142
"replay_time": decision_events.replay_current_time_milliseconds,
146143
},
147144
)
148145

149146
# Update context with replay information
150-
ctx.set_replay_mode(decision_events.is_replay())
147+
ctx.set_replay_mode(decision_events.replay)
151148
if decision_events.replay_current_time_milliseconds:
152149
ctx.set_replay_current_time_milliseconds(
153150
decision_events.replay_current_time_milliseconds
@@ -161,7 +158,7 @@ def _process_decision_events(
161158
"workflow_id": ctx.info().workflow_id,
162159
"marker_name": getattr(marker_event, "marker_name", "unknown"),
163160
"event_id": getattr(marker_event, "event_id", None),
164-
"replay_mode": ctx.is_replay_mode(),
161+
"replay_mode": decision_events.replay,
165162
},
166163
)
167164
# Process through state machines (DecisionsHelper now delegates to DecisionManager)
@@ -175,18 +172,24 @@ def _process_decision_events(
175172
"workflow_id": ctx.info().workflow_id,
176173
"event_type": getattr(event, "event_type", "unknown"),
177174
"event_id": getattr(event, "event_id", None),
178-
"replay_mode": ctx.is_replay_mode(),
175+
"replay_mode": decision_events.replay,
179176
},
180177
)
178+
# start workflow on workflow started event
179+
if (
180+
event.WhichOneof("attributes")
181+
== "workflow_execution_started_event_attributes"
182+
):
183+
started_attrs: WorkflowExecutionStartedEventAttributes = (
184+
event.workflow_execution_started_event_attributes
185+
)
186+
if started_attrs and hasattr(started_attrs, "input"):
187+
self._workflow_instance.start(started_attrs.input)
188+
181189
# Process through state machines (DecisionsHelper now delegates to DecisionManager)
182190
self._decision_manager.handle_history_event(event)
183191

184192
# Phase 3: Execute workflow logic
185-
if not self._workflow_instance.is_started():
186-
self._workflow_instance.start(
187-
self._extract_workflow_input(decision_task)
188-
)
189-
190193
self._workflow_instance.run_once()
191194

192195
# Phase 4: update state machine with output events

cadence/_internal/workflow/workflow_intance.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,14 @@ def __init__(
1212
):
1313
self._definition = workflow_definition
1414
self._data_converter = data_converter
15-
self._instance = (
16-
workflow_definition.cls().__init__()
17-
) # construct a new workflow object
15+
self._instance = workflow_definition.cls() # construct a new workflow object
1816
self._loop = DeterministicEventLoop()
1917
self._task: Optional[Task] = None
2018

2119
def start(self, input: Payload):
2220
if self._task is None:
2321
run_method = self._definition.get_run_method(self._instance)
24-
workflow_input = self._data_converter.from_data(input, [None])
22+
workflow_input = self._data_converter.from_data(input, [])
2523
self._task = self._loop.create_task(run_method(*workflow_input))
2624

2725
def is_started(self) -> bool:

cadence/worker/_decision_task_handler.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ async def _handle_task_implementation(
121121
workflow_run_id=run_id,
122122
workflow_task_list=self.task_list,
123123
data_converter=self._client.data_converter,
124-
workflow_events=workflow_events,
125124
)
126125

127126
# Use thread-safe cache to get or create workflow engine
@@ -136,7 +135,7 @@ async def _handle_task_implementation(
136135
self._workflow_engines[cache_key] = workflow_engine
137136

138137
decision_result = await asyncio.get_running_loop().run_in_executor(
139-
self._executor, workflow_engine.process_decision, task
138+
self._executor, workflow_engine.process_decision, workflow_events
140139
)
141140

142141
# Clean up completed workflows from cache to prevent memory leaks

cadence/workflow.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from datetime import timedelta
66
from typing import (
77
Callable,
8-
List,
98
cast,
109
Optional,
1110
Union,
@@ -15,10 +14,10 @@
1514
Type,
1615
Unpack,
1716
Any,
17+
Generic,
1818
)
1919
import inspect
2020

21-
from cadence.api.v1.history_pb2 import HistoryEvent
2221
from cadence.data_converter import DataConverter
2322

2423
ResultType = TypeVar("ResultType")
@@ -44,6 +43,7 @@ async def execute_activity(
4443

4544

4645
T = TypeVar("T", bound=Callable[..., Any])
46+
C = TypeVar("C")
4747

4848

4949
class WorkflowDefinitionOptions(TypedDict, total=False):
@@ -52,16 +52,16 @@ class WorkflowDefinitionOptions(TypedDict, total=False):
5252
name: str
5353

5454

55-
class WorkflowDefinition:
55+
class WorkflowDefinition(Generic[C]):
5656
"""
5757
Definition of a workflow class with metadata.
5858
5959
Similar to ActivityDefinition but for workflow classes.
6060
Provides type safety and metadata for workflow classes.
6161
"""
6262

63-
def __init__(self, cls: Type, name: str, run_method_name: str):
64-
self._cls = cls
63+
def __init__(self, cls: Type[C], name: str, run_method_name: str):
64+
self._cls: Type[C] = cls
6565
self._name = name
6666
self._run_method_name = run_method_name
6767

@@ -71,7 +71,7 @@ def name(self) -> str:
7171
return self._name
7272

7373
@property
74-
def cls(self) -> Type:
74+
def cls(self) -> Type[C]:
7575
"""Get the workflow class."""
7676
return self._cls
7777

@@ -151,7 +151,7 @@ def decorator(f: T) -> T:
151151
raise ValueError(f"Workflow run method '{f.__name__}' must be async")
152152

153153
# Attach metadata to the function
154-
f._workflow_run = True # type: ignore
154+
setattr(f, "_workflow_run", None)
155155
return f
156156

157157
# Support both @workflow.run and @workflow.run()
@@ -163,14 +163,13 @@ def decorator(f: T) -> T:
163163
return decorator(func)
164164

165165

166-
@dataclass
166+
@dataclass(frozen=True)
167167
class WorkflowInfo:
168168
workflow_type: str
169169
workflow_domain: str
170170
workflow_id: str
171171
workflow_run_id: str
172172
workflow_task_list: str
173-
workflow_events: List[HistoryEvent]
174173
data_converter: DataConverter
175174

176175

tests/cadence/_internal/workflow/test_decision_events_iterator.py

Lines changed: 2 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,12 @@
44
"""
55

66
import pytest
7-
from typing import List
87

9-
from cadence.api.v1.history_pb2 import HistoryEvent, History
10-
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
11-
from cadence.api.v1.common_pb2 import WorkflowExecution
128

139
from cadence._internal.workflow.decision_events_iterator import (
1410
DecisionEventsIterator,
1511
)
12+
from tests.cadence._internal.workflow.utils import create_mock_history_event
1613

1714

1815
class TestDecisionEventsIterator:
@@ -95,8 +92,7 @@ class TestDecisionEventsIterator:
9592
)
9693
def test_successful_cases(self, name, event_types, expected):
9794
events = create_mock_history_event(event_types)
98-
decision_task = create_mock_decision_task(events)
99-
iterator = DecisionEventsIterator(decision_task, events)
95+
iterator = DecisionEventsIterator(events)
10096

10197
batches = [decision_events for decision_events in iterator]
10298
assert len(expected) == len(batches)
@@ -108,84 +104,3 @@ def test_successful_cases(self, name, event_types, expected):
108104
assert batch.replay == expect["replay"]
109105
assert batch.replay_current_time_milliseconds == expect["replay_time"]
110106
assert batch.next_decision_event_id == expect["next_decision_event_id"]
111-
112-
113-
def create_mock_history_event(event_types: List[str]) -> List[HistoryEvent]:
114-
events = []
115-
for i, event_type in enumerate(event_types):
116-
event = HistoryEvent()
117-
event.event_id = i + 1
118-
event.event_time.FromMilliseconds((i + 1) * 1000)
119-
120-
# Set the appropriate attribute based on event type
121-
if event_type == "decision_task_started":
122-
event.decision_task_started_event_attributes.SetInParent()
123-
elif event_type == "decision_task_completed":
124-
event.decision_task_completed_event_attributes.SetInParent()
125-
elif event_type == "decision_task_failed":
126-
event.decision_task_failed_event_attributes.SetInParent()
127-
elif event_type == "decision_task_timed_out":
128-
event.decision_task_timed_out_event_attributes.SetInParent()
129-
elif event_type == "marker_recorded":
130-
event.marker_recorded_event_attributes.SetInParent()
131-
elif event_type == "activity_scheduled":
132-
event.activity_task_scheduled_event_attributes.SetInParent()
133-
elif event_type == "activity_started":
134-
event.activity_task_started_event_attributes.SetInParent()
135-
elif event_type == "activity_completed":
136-
event.activity_task_completed_event_attributes.SetInParent()
137-
elif event_type == "activity_failed":
138-
event.activity_task_failed_event_attributes.SetInParent()
139-
elif event_type == "activity_timed_out":
140-
event.activity_task_timed_out_event_attributes.SetInParent()
141-
elif event_type == "activity_cancel_requested":
142-
event.activity_task_cancel_requested_event_attributes.SetInParent()
143-
elif event_type == "request_cancel_activity_task_failed":
144-
event.request_cancel_activity_task_failed_event_attributes.SetInParent()
145-
elif event_type == "activity_canceled":
146-
event.activity_task_canceled_event_attributes.SetInParent()
147-
elif event_type == "timer_started":
148-
event.timer_started_event_attributes.SetInParent()
149-
elif event_type == "timer_fired":
150-
event.timer_fired_event_attributes.SetInParent()
151-
elif event_type == "timer_canceled":
152-
event.timer_canceled_event_attributes.SetInParent()
153-
elif event_type == "cancel_timer_failed":
154-
event.cancel_timer_failed_event_attributes.SetInParent()
155-
elif event_type == "request_cancel_external_workflow_execution_initiated":
156-
event.request_cancel_external_workflow_execution_initiated_event_attributes.SetInParent()
157-
elif event_type == "request_cancel_external_workflow_execution_failed":
158-
event.request_cancel_external_workflow_execution_failed_event_attributes.SetInParent()
159-
elif event_type == "external_workflow_execution_cancel_requested":
160-
event.external_workflow_execution_cancel_requested_event_attributes.SetInParent()
161-
elif event_type == "workflow_execution_started":
162-
event.workflow_execution_started_event_attributes.SetInParent()
163-
elif event_type == "workflow_execution_completed":
164-
event.workflow_execution_completed_event_attributes.SetInParent()
165-
166-
events.append(event)
167-
168-
return events
169-
170-
171-
def create_mock_decision_task(
172-
events: List[HistoryEvent], next_page_token: bytes = None
173-
) -> PollForDecisionTaskResponse:
174-
"""Create a mock decision task for testing."""
175-
task = PollForDecisionTaskResponse()
176-
177-
# Mock history
178-
history = History()
179-
history.events.extend(events)
180-
task.history.CopyFrom(history)
181-
182-
# Mock workflow execution
183-
workflow_execution = WorkflowExecution()
184-
workflow_execution.workflow_id = "test-workflow"
185-
workflow_execution.run_id = "test-run"
186-
task.workflow_execution.CopyFrom(workflow_execution)
187-
188-
if next_page_token:
189-
task.next_page_token = next_page_token
190-
191-
return task

0 commit comments

Comments
 (0)