Skip to content

Commit 9180cf5

Browse files
committed
Add DecisionEventsIterator implementation
Signed-off-by: Tim Li <ltim@uber.com>
1 parent 884c64f commit 9180cf5

File tree

2 files changed

+713
-0
lines changed

2 files changed

+713
-0
lines changed
Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Decision Events Iterator for Cadence workflow orchestration.
4+
5+
This module provides functionality to iterate through workflow history events,
6+
particularly focusing on decision-related events for replay and execution.
7+
"""
8+
9+
from dataclasses import dataclass, field
10+
from typing import List, Optional, AsyncIterator
11+
12+
from cadence.api.v1.history_pb2 import HistoryEvent
13+
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
14+
from cadence.client import Client
15+
from cadence._internal.workflow.history_event_iterator import iterate_history_events
16+
17+
18+
@dataclass
19+
class DecisionEvents:
20+
"""
21+
Represents events for a single decision iteration.
22+
"""
23+
events: List[HistoryEvent] = field(default_factory=list)
24+
decision_events: List[HistoryEvent] = field(default_factory=list)
25+
markers: List[HistoryEvent] = field(default_factory=list)
26+
replay: bool = False
27+
replay_current_time_milliseconds: Optional[int] = None
28+
next_decision_event_id: Optional[int] = None
29+
30+
def get_events(self) -> List[HistoryEvent]:
31+
"""Return all events in this decision iteration."""
32+
return self.events
33+
34+
def get_decision_events(self) -> List[HistoryEvent]:
35+
"""Return decision-related events."""
36+
return self.decision_events
37+
38+
def get_markers(self) -> List[HistoryEvent]:
39+
"""Return marker events."""
40+
return self.markers
41+
42+
def is_replay(self) -> bool:
43+
"""Check if this decision is in replay mode."""
44+
return self.replay
45+
46+
def get_optional_decision_event(self, event_id: int) -> Optional[HistoryEvent]:
47+
"""Retrieve a specific decision event by ID."""
48+
for event in self.decision_events:
49+
if hasattr(event, 'event_id') and event.event_id == event_id:
50+
return event
51+
return None
52+
53+
class DecisionEventsIterator:
54+
"""
55+
Iterator for processing decision events from workflow history.
56+
57+
This is the main class that processes workflow history events and groups them
58+
into decision iterations for proper workflow replay and execution.
59+
"""
60+
61+
def __init__(self, decision_task: PollForDecisionTaskResponse, client: Client):
62+
self._client = client
63+
self._decision_task = decision_task
64+
self._events: List[HistoryEvent] = []
65+
self._event_index = 0
66+
self._decision_task_started_event: Optional[HistoryEvent] = None
67+
self._next_decision_event_id = 1
68+
self._replay = True
69+
self._replay_current_time_milliseconds: Optional[int] = None
70+
self._initialized = False
71+
72+
@staticmethod
73+
def _is_decision_task_started(event: HistoryEvent) -> bool:
74+
"""Check if event is DecisionTaskStarted."""
75+
return (hasattr(event, 'decision_task_started_event_attributes') and
76+
event.HasField('decision_task_started_event_attributes'))
77+
78+
@staticmethod
79+
def _is_decision_task_completed(event: HistoryEvent) -> bool:
80+
"""Check if event is DecisionTaskCompleted."""
81+
return (hasattr(event, 'decision_task_completed_event_attributes') and
82+
event.HasField('decision_task_completed_event_attributes'))
83+
84+
@staticmethod
85+
def _is_decision_task_failed(event: HistoryEvent) -> bool:
86+
"""Check if event is DecisionTaskFailed."""
87+
return (hasattr(event, 'decision_task_failed_event_attributes') and
88+
event.HasField('decision_task_failed_event_attributes'))
89+
90+
@staticmethod
91+
def _is_decision_task_timed_out(event: HistoryEvent) -> bool:
92+
"""Check if event is DecisionTaskTimedOut."""
93+
return (hasattr(event, 'decision_task_timed_out_event_attributes') and
94+
event.HasField('decision_task_timed_out_event_attributes'))
95+
96+
@staticmethod
97+
def _is_marker_recorded(event: HistoryEvent) -> bool:
98+
"""Check if event is MarkerRecorded."""
99+
return (hasattr(event, 'marker_recorded_event_attributes') and
100+
event.HasField('marker_recorded_event_attributes'))
101+
102+
@staticmethod
103+
def _is_decision_task_completion(event: HistoryEvent) -> bool:
104+
"""Check if event is any kind of decision task completion."""
105+
return (DecisionEventsIterator._is_decision_task_completed(event) or
106+
DecisionEventsIterator._is_decision_task_failed(event) or
107+
DecisionEventsIterator._is_decision_task_timed_out(event))
108+
109+
async def _ensure_initialized(self):
110+
"""Initialize events list using the existing iterate_history_events."""
111+
if not self._initialized:
112+
# Use existing iterate_history_events function
113+
events_iterator = iterate_history_events(self._decision_task, self._client)
114+
self._events = [event async for event in events_iterator]
115+
self._initialized = True
116+
117+
# Find first decision task started event
118+
for i, event in enumerate(self._events):
119+
if self._is_decision_task_started(event):
120+
self._event_index = i
121+
break
122+
123+
async def has_next_decision_events(self) -> bool:
124+
"""Check if there are more decision events to process."""
125+
await self._ensure_initialized()
126+
127+
# Look for the next DecisionTaskStarted event from current position
128+
for i in range(self._event_index, len(self._events)):
129+
if self._is_decision_task_started(self._events[i]):
130+
return True
131+
132+
return False
133+
134+
async def next_decision_events(self) -> DecisionEvents:
135+
"""
136+
Get the next set of decision events.
137+
138+
This method processes events starting from a DecisionTaskStarted event
139+
until the corresponding DecisionTaskCompleted/Failed/TimedOut event.
140+
"""
141+
await self._ensure_initialized()
142+
143+
# Find next DecisionTaskStarted event
144+
start_index = None
145+
for i in range(self._event_index, len(self._events)):
146+
if self._is_decision_task_started(self._events[i]):
147+
start_index = i
148+
break
149+
150+
if start_index is None:
151+
raise StopIteration("No more decision events")
152+
153+
decision_events = DecisionEvents()
154+
decision_events.replay = self._replay
155+
decision_events.replay_current_time_milliseconds = self._replay_current_time_milliseconds
156+
decision_events.next_decision_event_id = self._next_decision_event_id
157+
158+
# Process DecisionTaskStarted event
159+
decision_task_started = self._events[start_index]
160+
self._decision_task_started_event = decision_task_started
161+
decision_events.events.append(decision_task_started)
162+
163+
# Update replay time if available
164+
if hasattr(decision_task_started, 'event_time') and decision_task_started.event_time:
165+
self._replay_current_time_milliseconds = getattr(
166+
decision_task_started.event_time, 'seconds', 0
167+
) * 1000
168+
decision_events.replay_current_time_milliseconds = self._replay_current_time_milliseconds
169+
170+
# Process subsequent events until we find the corresponding DecisionTask completion
171+
current_index = start_index + 1
172+
while current_index < len(self._events):
173+
event = self._events[current_index]
174+
decision_events.events.append(event)
175+
176+
# Categorize the event
177+
if self._is_marker_recorded(event):
178+
decision_events.markers.append(event)
179+
elif self._is_decision_task_completion(event):
180+
# This marks the end of this decision iteration
181+
self._process_decision_completion_event(event, decision_events)
182+
current_index += 1 # Move past this event
183+
break
184+
else:
185+
# Other events that are part of this decision
186+
decision_events.decision_events.append(event)
187+
188+
current_index += 1
189+
190+
# Update the event index for next iteration
191+
self._event_index = current_index
192+
193+
# Update the next decision event ID
194+
if decision_events.events:
195+
last_event = decision_events.events[-1]
196+
if hasattr(last_event, 'event_id'):
197+
self._next_decision_event_id = last_event.event_id + 1
198+
199+
# Check if this is the last decision events
200+
# Set replay to false only if there are no more decision events after this one
201+
# Check directly without calling has_next_decision_events to avoid recursion
202+
has_more = False
203+
for i in range(self._event_index, len(self._events)):
204+
if self._is_decision_task_started(self._events[i]):
205+
has_more = True
206+
break
207+
208+
if not has_more:
209+
self._replay = False
210+
decision_events.replay = False
211+
212+
return decision_events
213+
214+
def _process_decision_completion_event(self, event: HistoryEvent, decision_events: DecisionEvents):
215+
"""Process the decision completion event and update state."""
216+
if self._is_decision_task_completed(event):
217+
# Extract decisions from the completed event if available
218+
if hasattr(event, 'decision_task_completed_event_attributes'):
219+
completed_attrs = event.decision_task_completed_event_attributes
220+
if hasattr(completed_attrs, 'decisions'):
221+
# Process decisions - they represent what the workflow decided to do
222+
for decision in completed_attrs.decisions:
223+
decision_events.decision_events.append(decision)
224+
225+
# Check if we're still in replay mode
226+
# This is determined by comparing event IDs with the current decision task's started event ID
227+
if (self._decision_task_started_event and
228+
hasattr(self._decision_task_started_event, 'event_id') and
229+
hasattr(event, 'event_id')):
230+
231+
# If this completion event ID is >= the current decision task's started event ID,
232+
# we're no longer in replay mode
233+
current_task_started_id = getattr(
234+
self._decision_task.started_event_id, 'value', 0
235+
) if hasattr(self._decision_task, 'started_event_id') else 0
236+
237+
if event.event_id >= current_task_started_id:
238+
self._replay = False
239+
decision_events.replay = False
240+
241+
def get_replay_current_time_milliseconds(self) -> Optional[int]:
242+
"""Get the current replay time in milliseconds."""
243+
return self._replay_current_time_milliseconds
244+
245+
def is_replay_mode(self) -> bool:
246+
"""Check if the iterator is currently in replay mode."""
247+
return self._replay
248+
249+
def __aiter__(self):
250+
return self
251+
252+
async def __anext__(self) -> DecisionEvents:
253+
if not await self.has_next_decision_events():
254+
raise StopAsyncIteration
255+
return await self.next_decision_events()
256+
257+
258+
class HistoryHelper:
259+
"""
260+
Main helper class for processing workflow history events.
261+
262+
Provides the primary interface for iterating through decision events
263+
and managing workflow history processing.
264+
"""
265+
266+
def __init__(self, decision_task: PollForDecisionTaskResponse, client: Client):
267+
self._decision_task = decision_task
268+
self._client = client
269+
self._decision_events_iterator: Optional[DecisionEventsIterator] = None
270+
271+
async def get_decision_events_iterator(self) -> DecisionEventsIterator:
272+
"""Get the decision events iterator for this history."""
273+
if self._decision_events_iterator is None:
274+
self._decision_events_iterator = DecisionEventsIterator(
275+
self._decision_task, self._client
276+
)
277+
await self._decision_events_iterator._ensure_initialized()
278+
279+
return self._decision_events_iterator
280+
281+
async def get_all_decision_events(self) -> List[DecisionEvents]:
282+
"""Get all decision events as a list."""
283+
iterator = await self.get_decision_events_iterator()
284+
all_decision_events = []
285+
286+
while await iterator.has_next_decision_events():
287+
decision_events = await iterator.next_decision_events()
288+
all_decision_events.append(decision_events)
289+
290+
return all_decision_events
291+
292+
def get_workflow_execution(self):
293+
"""Get the workflow execution from the decision task."""
294+
return self._decision_task.workflow_execution
295+
296+
def get_workflow_type(self):
297+
"""Get the workflow type from the decision task."""
298+
return self._decision_task.workflow_type
299+
300+
301+
# Factory function for easy creation
302+
async def create_history_helper(
303+
decision_task: PollForDecisionTaskResponse,
304+
client: Client
305+
) -> HistoryHelper:
306+
"""Create a HistoryHelper instance."""
307+
return HistoryHelper(decision_task, client)
308+
309+
310+
# Utility functions
311+
def is_decision_event(event: HistoryEvent) -> bool:
312+
"""Check if an event is a decision-related event."""
313+
return (DecisionEventsIterator._is_decision_task_started(event) or
314+
DecisionEventsIterator._is_decision_task_completed(event) or
315+
DecisionEventsIterator._is_decision_task_failed(event) or
316+
DecisionEventsIterator._is_decision_task_timed_out(event))
317+
318+
319+
def is_marker_event(event: HistoryEvent) -> bool:
320+
"""Check if an event is a marker event."""
321+
return DecisionEventsIterator._is_marker_recorded(event)
322+
323+
324+
def extract_event_timestamp_millis(event: HistoryEvent) -> Optional[int]:
325+
"""Extract timestamp from an event in milliseconds."""
326+
if hasattr(event, 'event_time') and event.HasField('event_time'):
327+
seconds = getattr(event.event_time, 'seconds', 0)
328+
return seconds * 1000 if seconds > 0 else None
329+
return None

0 commit comments

Comments
 (0)