Skip to content

Commit 4137929

Browse files
committed
Updated setp functions tool
1 parent 62bc837 commit 4137929

File tree

5 files changed

+187
-91
lines changed

5 files changed

+187
-91
lines changed

lib/idp_common_pkg/idp_common/agents/error_analyzer/agent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515

1616
from ..common.strands_bedrock_model import create_strands_bedrock_model
1717
from .tools import (
18+
analyze_workflow_execution,
1819
fetch_document_record,
1920
fetch_recent_records,
2021
lambda_lookup,
2122
search_cloudwatch_logs,
22-
stepfunction_details,
2323
xray_performance_analysis,
2424
xray_trace,
2525
)
@@ -53,7 +53,7 @@ def create_error_analyzer_agent(
5353
fetch_document_record,
5454
fetch_recent_records,
5555
lambda_lookup,
56-
stepfunction_details,
56+
analyze_workflow_execution,
5757
xray_trace,
5858
xray_performance_analysis,
5959
]

lib/idp_common_pkg/idp_common/agents/error_analyzer/tools/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
fetch_recent_records,
2020
)
2121
from .lambda_tool import lambda_lookup
22-
from .stepfunction_tool import stepfunction_details
22+
from .stepfunction_tool import analyze_workflow_execution
2323
from .xray_tool import (
2424
xray_performance_analysis,
2525
xray_trace,
@@ -30,7 +30,7 @@
3030
"lambda_lookup",
3131
"fetch_document_record",
3232
"fetch_recent_records",
33-
"stepfunction_details",
33+
"analyze_workflow_execution",
3434
"xray_trace",
3535
"xray_performance_analysis",
3636
]

lib/idp_common_pkg/idp_common/agents/error_analyzer/tools/stepfunction_tool.py

Lines changed: 175 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,184 @@
1313

1414
from idp_common.config import get_config
1515

16-
from ..config import (
17-
create_error_response,
18-
create_response,
19-
)
20-
2116
logger = logging.getLogger(__name__)
2217

2318

19+
@tool
20+
def analyze_workflow_execution(execution_arn: str) -> Dict[str, Any]:
21+
"""
22+
Analyze Step Function workflow execution to identify failures and state transitions.
23+
24+
Performs comprehensive analysis of document processing workflow executions by
25+
retrieving execution history, analyzing state transitions, identifying failure
26+
points, and providing actionable recommendations. Essential for troubleshooting
27+
document processing failures and understanding workflow behavior.
28+
29+
Use this tool when:
30+
- Document processing failed and you have a Step Function execution ARN
31+
- Need to understand where in the workflow a failure occurred
32+
- Investigating workflow performance or timeout issues
33+
- Analyzing state transitions and execution timeline
34+
- User reports document processing stuck or failed
35+
36+
Tool chaining: Get execution ARN from fetch_document_record, then use this tool
37+
for detailed workflow analysis. Follow up with search_cloudwatch_logs for
38+
specific Lambda function errors identified in the failure analysis.
39+
40+
Example usage:
41+
- "Analyze the workflow execution for this document"
42+
- "What went wrong in the Step Function execution?"
43+
- "Show me the workflow timeline and failure point"
44+
- "Why did the document processing workflow fail?"
45+
- "Trace the execution flow and identify issues"
46+
47+
Args:
48+
execution_arn: Step Function execution ARN (get from document record's WorkflowExecutionArn or ExecutionArn field)
49+
50+
Returns:
51+
Dict with keys:
52+
- execution_status (str): Overall execution status (SUCCEEDED, FAILED, TIMED_OUT, etc.)
53+
- duration_seconds (float): Total execution duration if completed
54+
- timeline_analysis (dict): Detailed timeline with state transitions and failure point
55+
- analysis_summary (str): Human-readable summary of execution and failure
56+
- recommendations (list): Actionable next steps for investigation
57+
"""
58+
try:
59+
if not execution_arn:
60+
return _build_response(
61+
execution_status="ERROR",
62+
analysis_summary="No execution ARN provided",
63+
recommendations=[
64+
"Use search_cloudwatch_logs for detailed error information"
65+
],
66+
)
67+
68+
# Get execution data from Step Functions
69+
execution_data = _get_execution_data(execution_arn)
70+
71+
# Analyze timeline and failures
72+
timeline_analysis = _analyze_execution_timeline(execution_data["events"])
73+
74+
# Extract execution metadata
75+
execution_metadata = _extract_execution_metadata(
76+
execution_data["execution_response"]
77+
)
78+
79+
# Build analysis summary
80+
analysis_summary = _build_analysis_summary(
81+
execution_metadata["status"], timeline_analysis
82+
)
83+
84+
# Generate recommendations
85+
recommendations = _generate_recommendations(timeline_analysis)
86+
87+
return _build_response(
88+
execution_status=execution_metadata["status"],
89+
duration_seconds=execution_metadata["duration_seconds"],
90+
timeline_analysis=timeline_analysis,
91+
analysis_summary=analysis_summary,
92+
recommendations=recommendations,
93+
)
94+
95+
except Exception as e:
96+
logger.error(f"Error analyzing Step Function execution {execution_arn}: {e}")
97+
return _build_response(
98+
execution_status="ERROR",
99+
analysis_summary=f"Failed to analyze workflow execution: {str(e)}",
100+
recommendations=[
101+
"Use search_cloudwatch_logs for detailed error information"
102+
],
103+
)
104+
105+
106+
def _get_execution_data(execution_arn: str) -> Dict[str, Any]:
107+
"""
108+
Retrieve execution details and history from Step Functions.
109+
"""
110+
stepfunctions_client = boto3.client("stepfunctions")
111+
112+
execution_response = stepfunctions_client.describe_execution(
113+
executionArn=execution_arn
114+
)
115+
116+
history_response = stepfunctions_client.get_execution_history(
117+
executionArn=execution_arn,
118+
maxResults=100,
119+
reverseOrder=True, # Most recent events first
120+
)
121+
122+
return {
123+
"execution_response": execution_response,
124+
"events": history_response.get("events", []),
125+
}
126+
127+
128+
def _extract_execution_metadata(execution_response: Dict[str, Any]) -> Dict[str, Any]:
129+
"""
130+
Extract execution metadata including status and duration.
131+
"""
132+
execution_status = execution_response.get("status", "UNKNOWN")
133+
start_date = execution_response.get("startDate")
134+
stop_date = execution_response.get("stopDate")
135+
136+
duration_seconds = None
137+
if start_date and stop_date:
138+
duration_seconds = (stop_date - start_date).total_seconds()
139+
140+
return {"status": execution_status, "duration_seconds": duration_seconds}
141+
142+
143+
def _build_analysis_summary(
144+
execution_status: str, timeline_analysis: Dict[str, Any]
145+
) -> str:
146+
"""
147+
Build human-readable analysis summary.
148+
"""
149+
analysis_summary = f"Step Function execution {execution_status}"
150+
151+
if timeline_analysis.get("failure_point"):
152+
failure_point = timeline_analysis["failure_point"]
153+
analysis_summary += f" at state '{failure_point.get('state', 'Unknown')}'"
154+
if failure_point.get("details", {}).get("error"):
155+
analysis_summary += f": {failure_point['details']['error']}"
156+
157+
return analysis_summary
158+
159+
160+
def _generate_recommendations(timeline_analysis: Dict[str, Any]) -> List[str]:
161+
"""
162+
Generate actionable recommendations based on analysis.
163+
"""
164+
return [
165+
"Check the failure point state for specific error details",
166+
"Review Lambda function logs if failure occurred in Lambda task",
167+
"Verify input data format if failure occurred early in workflow",
168+
"Consider timeout adjustments if execution timed out",
169+
]
170+
171+
172+
def _build_response(
173+
execution_status: str,
174+
duration_seconds: Optional[float] = None,
175+
timeline_analysis: Optional[Dict[str, Any]] = None,
176+
analysis_summary: str = "",
177+
recommendations: Optional[List[str]] = None,
178+
) -> Dict[str, Any]:
179+
"""
180+
Build unified workflow analysis response with logging.
181+
"""
182+
response = {
183+
"execution_status": execution_status,
184+
"duration_seconds": duration_seconds,
185+
"timeline_analysis": timeline_analysis or {},
186+
"analysis_summary": analysis_summary,
187+
"recommendations": recommendations or [],
188+
}
189+
190+
logger.info(f"Workflow analysis response: {response}")
191+
return response
192+
193+
24194
def _extract_failure_details(event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
25195
"""
26196
Failure parser: Extracts detailed error information from Step Function events.
@@ -152,77 +322,3 @@ def _analyze_execution_timeline(events: List[Dict[str, Any]]) -> Dict[str, Any]:
152322
"failure_point": failure_point,
153323
"last_successful_state": last_successful_state,
154324
}
155-
156-
157-
@tool
158-
def stepfunction_details(execution_arn: str) -> Dict[str, Any]:
159-
"""
160-
Analyze Step Function execution to identify workflow failures and state transitions.
161-
Retrieves execution history and performs comprehensive analysis to identify failure points,
162-
state transitions, and execution patterns for document processing workflows.
163-
164-
Args:
165-
execution_arn: Step Function execution ARN from document context
166-
167-
Returns:
168-
Dict containing execution analysis, timeline, and failure details
169-
"""
170-
try:
171-
if not execution_arn:
172-
return create_error_response("No execution ARN provided")
173-
174-
stepfunctions_client = boto3.client("stepfunctions")
175-
176-
# Get execution details
177-
execution_response = stepfunctions_client.describe_execution(
178-
executionArn=execution_arn
179-
)
180-
181-
# Get execution history
182-
history_response = stepfunctions_client.get_execution_history(
183-
executionArn=execution_arn,
184-
maxResults=100,
185-
reverseOrder=True, # Most recent events first
186-
)
187-
188-
events = history_response.get("events", [])
189-
190-
# Analyze timeline and failures
191-
timeline_analysis = _analyze_execution_timeline(events)
192-
193-
# Extract execution metadata
194-
execution_status = execution_response.get("status", "UNKNOWN")
195-
start_date = execution_response.get("startDate")
196-
stop_date = execution_response.get("stopDate")
197-
198-
# Calculate execution duration
199-
duration_seconds = None
200-
if start_date and stop_date:
201-
duration_seconds = (stop_date - start_date).total_seconds()
202-
203-
# Build analysis summary
204-
analysis_summary = f"Step Function execution {execution_status}"
205-
if timeline_analysis.get("failure_point"):
206-
failure_point = timeline_analysis["failure_point"]
207-
analysis_summary += f" at state '{failure_point.get('state', 'Unknown')}'"
208-
if failure_point.get("details", {}).get("error"):
209-
analysis_summary += f": {failure_point['details']['error']}"
210-
211-
return create_response(
212-
{
213-
"execution_status": execution_status,
214-
"duration_seconds": duration_seconds,
215-
"timeline_analysis": timeline_analysis,
216-
"analysis_summary": analysis_summary,
217-
"recommendations": [
218-
"Check the failure point state for specific error details",
219-
"Review Lambda function logs if failure occurred in Lambda task",
220-
"Verify input data format if failure occurred early in workflow",
221-
"Consider timeout adjustments if execution timed out",
222-
],
223-
}
224-
)
225-
226-
except Exception as e:
227-
logger.error(f"Error analyzing Step Function execution {execution_arn}: {e}")
228-
return create_error_response(str(e))

lib/idp_common_pkg/tests/unit/agents/error_analyzer/test_agent.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,14 @@ def test_agent_system_prompt_format(
102102
def test_specific_tools_import(self):
103103
"""Test that specific tools can be imported correctly."""
104104
from idp_common.agents.error_analyzer.tools import (
105-
search_cloudwatch_logs,
105+
analyze_workflow_execution,
106106
fetch_document_record,
107-
xray_trace,
107+
search_cloudwatch_logs,
108108
)
109109

110110
assert search_cloudwatch_logs is not None
111111
assert callable(search_cloudwatch_logs)
112112
assert fetch_document_record is not None
113113
assert callable(fetch_document_record)
114-
assert xray_trace is not None
115-
assert callable(xray_trace)
114+
assert analyze_workflow_execution is not None
115+
assert callable(analyze_workflow_execution)

lib/idp_common_pkg/tests/unit/agents/error_analyzer/test_tools.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@ def test_dynamodb_tools_import(self):
3636
def test_execution_context_tools_import(self):
3737
"""Test execution context tools can be imported."""
3838
from idp_common.agents.error_analyzer.tools import (
39+
analyze_workflow_execution,
3940
lambda_lookup,
40-
stepfunction_details,
4141
)
4242

4343
assert lambda_lookup is not None
4444
assert callable(lambda_lookup)
45-
assert stepfunction_details is not None
46-
assert callable(stepfunction_details)
45+
assert analyze_workflow_execution is not None
46+
assert callable(analyze_workflow_execution)
4747

4848
def test_xray_tools_import(self):
4949
"""Test X-Ray tools can be imported."""
@@ -66,7 +66,7 @@ def test_all_tools_available(self):
6666
"fetch_document_record",
6767
"fetch_recent_records",
6868
"lambda_lookup",
69-
"stepfunction_details",
69+
"analyze_workflow_execution",
7070
"xray_trace",
7171
"xray_performance_analysis",
7272
}

0 commit comments

Comments
 (0)