Skip to content

Commit 1282662

Browse files
Enhance Workflow Model with Helper Methods (#335)
1 parent 58b9b60 commit 1282662

File tree

9 files changed

+1143
-25
lines changed

9 files changed

+1143
-25
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Conductor OSS Python SDK
1+
<# Conductor OSS Python SDK
22
[![CI Status](https://github.com/conductor-oss/python-sdk/actions/workflows/pull_request.yml/badge.svg)](https://github.com/conductor-oss/python-sdk/actions/workflows/pull_request.yml)
33
[![codecov](https://codecov.io/gh/conductor-oss/python-sdk/branch/main/graph/badge.svg?token=K10D161X4R)](https://codecov.io/gh/conductor-oss/python-sdk)
44

Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
# Workflow Helper Methods Usage Examples
2+
3+
This document provides comprehensive usage examples for the helper methods available in the `Workflow` class.
4+
5+
## Overview
6+
7+
The `Workflow` class provides several helper methods to make it easier to work with workflow instances:
8+
9+
- **Status checking methods**: `is_completed()`, `is_successful()`, `is_running()`, `is_failed()`
10+
- **Task retrieval methods**: `current_task`, `get_in_progress_tasks()`, `get_task_by_reference_name()`
11+
12+
## Status Checking Methods
13+
14+
### `is_completed()`
15+
16+
Checks if the workflow has completed (regardless of success or failure).
17+
18+
```python
19+
from conductor.client.http.models.workflow import Workflow
20+
21+
# Example workflow instances
22+
workflow_completed = Workflow(status="COMPLETED")
23+
workflow_failed = Workflow(status="FAILED")
24+
workflow_terminated = Workflow(status="TERMINATED")
25+
workflow_running = Workflow(status="RUNNING")
26+
27+
# Check completion status
28+
print(workflow_completed.is_completed()) # True
29+
print(workflow_failed.is_completed()) # True
30+
print(workflow_terminated.is_completed()) # True
31+
print(workflow_running.is_completed()) # False
32+
```
33+
34+
### `is_successful()`
35+
36+
Checks if the workflow completed successfully.
37+
38+
```python
39+
# Check success status
40+
print(workflow_completed.is_successful()) # True
41+
print(workflow_failed.is_successful()) # False
42+
print(workflow_terminated.is_successful()) # False
43+
print(workflow_running.is_successful()) # False
44+
```
45+
46+
### `is_running()`
47+
48+
Checks if the workflow is currently running.
49+
50+
```python
51+
workflow_paused = Workflow(status="PAUSED")
52+
53+
# Check running status
54+
print(workflow_running.is_running()) # True
55+
print(workflow_paused.is_running()) # True
56+
print(workflow_completed.is_running()) # False
57+
```
58+
59+
### `is_failed()`
60+
61+
Checks if the workflow has failed.
62+
63+
```python
64+
workflow_timed_out = Workflow(status="TIMED_OUT")
65+
66+
# Check failure status
67+
print(workflow_failed.is_failed()) # True
68+
print(workflow_terminated.is_failed()) # True
69+
print(workflow_timed_out.is_failed()) # True
70+
print(workflow_completed.is_failed()) # False
71+
```
72+
73+
## Task Retrieval Methods
74+
75+
### `current_task` Property
76+
77+
Gets the currently in-progress task (SCHEDULED or IN_PROGRESS).
78+
79+
```python
80+
from conductor.client.http.models.task import Task
81+
from conductor.client.http.models.workflow_task import WorkflowTask
82+
83+
# Create mock tasks
84+
task_completed = TaskA(status="COMPLETED", task_def_name="task1")
85+
task_in_progress = Task(status="IN_PROGRESS", task_def_name="task2")
86+
task_scheduled = Task(status="SCHEDULED", task_def_name="task3")
87+
88+
# Set up workflow with tasks
89+
workflow = Workflow(
90+
status="RUNNING",
91+
tasks=[task_completed, task_in_progress, task_scheduled]
92+
)
93+
94+
# Get current task
95+
current = workflow.current_task
96+
print(current.task_def_name) # "task2" (first IN_PROGRESS task)
97+
98+
# If no in-progress tasks
99+
workflow_no_progress = Workflow(
100+
status="RUNNING",
101+
tasks=[task_completed]
102+
)
103+
print(workflow_no_progress.current_task) # None
104+
```
105+
106+
### `get_in_progress_tasks()`
107+
108+
Gets all currently in-progress tasks.
109+
110+
```python
111+
# Get all in-progress tasks
112+
in_progress_tasks = workflow.get_in_progress_tasks()
113+
print(len(in_progress_tasks)) # 2 (task_in_progress and task_scheduled)
114+
115+
# Check specific tasks
116+
for task in in_progress_tasks:
117+
print(f"Task: {task.task_def_name}, Status: {task.status}")
118+
```
119+
120+
### `get_task_by_reference_name()`
121+
122+
Gets a task by its reference name.
123+
124+
```python
125+
# Create tasks with workflow task references
126+
workflow_task1 = WorkflowTaskAdapter(task_reference_name="ref_task_1")
127+
workflow_task2 = WorkflowTaskAdapter(task_reference_name="ref_task_2")
128+
129+
task1 = TaskAdapter(
130+
status="COMPLETED",
131+
task_def_name="task1",
132+
workflow_task=workflow_task1
133+
)
134+
task2 = TaskAdapter(
135+
status="IN_PROGRESS",
136+
task_def_name="task2",
137+
workflow_task=workflow_task2
138+
)
139+
140+
workflow_with_refs = Workflow(
141+
status="RUNNING",
142+
tasks=[task1, task2]
143+
)
144+
145+
# Get task by reference name
146+
found_task = workflow_with_refs.get_task_by_reference_name("ref_task_2")
147+
if found_task:
148+
print(f"Found task: {found_task.task_def_name}") # "task2"
149+
150+
# Task not found
151+
not_found = workflow_with_refs.get_task_by_reference_name("nonexistent_ref")
152+
print(not_found) # None
153+
```
154+
155+
## Real-World Usage Examples
156+
157+
### Example 1: Workflow Status Monitoring
158+
159+
```python
160+
def monitor_workflow_status(workflow: Workflow):
161+
"""Monitor and report workflow status"""
162+
163+
if workflow.is_completed():
164+
if workflow.is_successful():
165+
print("Workflow completed successfully!")
166+
return "SUCCESS"
167+
else:
168+
print("Workflow failed or was terminated")
169+
return "FAILED"
170+
elif workflow.is_running():
171+
print("Workflow is still running...")
172+
173+
# Check current task
174+
current = workflow.current_task
175+
if current:
176+
print(f"Current task: {current.task_def_name} ({current.status})")
177+
178+
# Get all in-progress tasks
179+
in_progress = workflow.get_in_progress_tasks()
180+
print(f"Total in-progress tasks: {len(in_progress)}")
181+
182+
return "RUNNING"
183+
else:
184+
print("Unknown workflow status")
185+
return "UNKNOWN"
186+
187+
# Usage
188+
workflow = Workflow(status="RUNNING", tasks=[...])
189+
status = monitor_workflow_status(workflow)
190+
```
191+
192+
### Example 2: Task Progress Tracking
193+
194+
```python
195+
def track_task_progress(workflow: Workflow):
196+
"""Track progress of specific tasks in a workflow"""
197+
198+
# Get all in-progress tasks
199+
in_progress_tasks = workflow.get_in_progress_tasks()
200+
201+
print(f"Workflow Status: {workflow.status}")
202+
print(f"Total in-progress tasks: {len(in_progress_tasks)}")
203+
204+
for task in in_progress_tasks:
205+
print(f"- {task.task_def_name}: {task.status}")
206+
207+
# If task has a reference name, show it
208+
if hasattr(task, 'workflow_task') and task.workflow_task:
209+
ref_name = getattr(task.workflow_task, 'task_reference_name', 'N/A')
210+
print(f" Reference: {ref_name}")
211+
212+
# Usage
213+
workflow = Workflow(status="RUNNING", tasks=[...])
214+
track_task_progress(workflow)
215+
```
216+
217+
### Example 3: Workflow Result Processing
218+
219+
```python
220+
def process_workflow_result(workflow: Workflow):
221+
"""Process workflow results based on completion status"""
222+
223+
if not workflow.is_completed():
224+
print("Workflow is not yet completed")
225+
return None
226+
227+
if workflow.is_successful():
228+
print("Processing successful workflow result...")
229+
230+
# Get workflow output
231+
if workflow.output:
232+
print(f"Workflow output: {workflow.output}")
233+
234+
# Find specific tasks by reference name
235+
result_task = workflow.get_task_by_reference_name("process_result")
236+
if result_task:
237+
print(f"Result task status: {result_task.status}")
238+
if hasattr(result_task, 'output_data') and result_task.output_data:
239+
print(f"Task output: {result_task.output_data}")
240+
241+
return workflow.output
242+
243+
else:
244+
print("Processing failed workflow...")
245+
246+
# Get failed tasks
247+
failed_tasks = [task for task in workflow.tasks if task.status == "FAILED"]
248+
print(f"Number of failed tasks: {len(failed_tasks)}")
249+
250+
for task in failed_tasks:
251+
print(f"Failed task: {task.task_def_name}")
252+
if hasattr(task, 'reason_for_incompletion'):
253+
print(f"Reason: {task.reason_for_incompletion}")
254+
255+
return None
256+
257+
# Usage
258+
workflow = Workflow(status="COMPLETED", output={"result": "success"})
259+
result = process_workflow_result(workflow)
260+
```
261+
262+
### Example 4: Workflow Health Check
263+
264+
```python
265+
def health_check_workflow(workflow: Workflow) -> dict:
266+
"""Perform a comprehensive health check on a workflow"""
267+
268+
health_status = {
269+
"workflow_id": getattr(workflow, 'workflow_id', 'unknown'),
270+
"status": workflow.status,
271+
"is_completed": workflow.is_completed(),
272+
"is_successful": workflow.is_successful(),
273+
"is_running": workflow.is_running(),
274+
"is_failed": workflow.is_failed(),
275+
"current_task": None,
276+
"in_progress_count": 0,
277+
"total_tasks": 0
278+
}
279+
280+
# Task information
281+
if workflow.tasks:
282+
health_status["total_tasks"] = len(workflow.tasks)
283+
health_status["in_progress_count"] = len(workflow.get_in_progress_tasks())
284+
285+
current = workflow.current_task
286+
if current:
287+
health_status["current_task"] = {
288+
"name": current.task_def_name,
289+
"status": current.status
290+
}
291+
292+
# Overall health assessment
293+
if workflow.is_successful():
294+
health_status["health"] = "HEALTHY"
295+
elif workflow.is_failed():
296+
health_status["health"] = "UNHEALTHY"
297+
elif workflow.is_running():
298+
health_status["health"] = "IN_PROGRESS"
299+
else:
300+
health_status["health"] = "UNKNOWN"
301+
302+
return health_status
303+
304+
# Usage
305+
workflow = Workflow(status="RUNNING", tasks=[...])
306+
health = health_check_workflow(workflow)
307+
print(f"Workflow health: {health['health']}")
308+
```
309+
310+
## Async Client Usage
311+
312+
The async client (`conductor.asyncio_client.adapters.models.workflow_adapter.WorkflowAdapter`) provides the same helper methods:
313+
314+
```python
315+
from conductor.asyncio_client.adapters.models.workflow_adapter import WorkflowAdapter
316+
317+
# All the same methods are available
318+
workflow = WorkflowAdapter(status="RUNNING")
319+
print(workflow.is_running()) # True
320+
print(workflow.current_task) # None or current task
321+
```
322+
323+
## Best Practices
324+
325+
1. **Always check for None**: When using `current_task` or `get_task_by_reference_name()`, always check if the result is None.
326+
327+
2. **Use appropriate status methods**: Use `is_completed()` for general completion, `is_successful()` for success checking, and `is_failed()` for failure detection.
328+
329+
3. **Handle missing tasks gracefully**: Always check if `workflow.tasks` is not None before calling task-related methods.
330+
331+
4. **Use reference names for task identification**: When possible, use `get_task_by_reference_name()` instead of iterating through tasks manually.
332+
333+
5. **Combine methods for comprehensive checks**: Use multiple helper methods together to get a complete picture of workflow state.
334+
335+
```python
336+
def comprehensive_workflow_check(workflow: WorkflowAdapter):
337+
"""Comprehensive workflow state checking"""
338+
339+
if workflow.is_completed():
340+
if workflow.is_successful():
341+
return "SUCCESS"
342+
elif workflow.is_failed():
343+
return "FAILED"
344+
else:
345+
return "COMPLETED_UNKNOWN"
346+
elif workflow.is_running():
347+
current = workflow.current_task
348+
if current:
349+
return f"RUNNING_CURRENT_TASK_{current.task_def_name}"
350+
else:
351+
return "RUNNING_NO_CURRENT_TASK"
352+
else:
353+
return "UNKNOWN_STATUS"
354+
```

0 commit comments

Comments
 (0)