Skip to content

Commit 5c9614f

Browse files
committed
Merge branch 'feature/mcp-server-2' into 'develop'
Optimized AgentCore Analytics Processor See merge request genaiic-reusable-assets/engagement-artifacts/genaiic-idp-accelerator!454
2 parents f6bcb61 + 6ab5bdb commit 5c9614f

File tree

1 file changed

+47
-98
lines changed
  • src/lambda/agentcore_analytics_processor

1 file changed

+47
-98
lines changed

src/lambda/agentcore_analytics_processor/index.py

Lines changed: 47 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -8,130 +8,79 @@
88

99
import json
1010
import logging
11-
import os
11+
import time
1212
from typing import Any, Dict
1313

1414
import boto3
15+
from botocore.exceptions import ClientError, EventStreamError
1516

16-
from idp_common.agents.analytics import get_analytics_config
17+
from idp_common.agents.analytics.config import get_analytics_config
1718
from idp_common.agents.common.config import configure_logging
18-
from idp_common.agents.factory import agent_factory
19+
from idp_common.agents.analytics.agent import create_analytics_agent
1920

2021
# Configure logging for both application and Strands framework
2122
configure_logging()
2223

2324
# Get logger for this module
2425
logger = logging.getLogger(__name__)
2526

26-
# Track Lambda cold/warm starts for debugging
27-
_lambda_invocation_count = 0
27+
# Cache at module level
28+
_session = None
29+
_config = None
2830

2931

3032
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
31-
"""Process analytics queries through agent orchestrator."""
32-
global _lambda_invocation_count
33-
_lambda_invocation_count += 1
34-
is_cold_start = _lambda_invocation_count == 1
33+
"""Process analytics queries through agent."""
34+
global _session, _config
3535

36-
request_id = context.aws_request_id if context else 'unknown'
37-
logger.info(f"[{request_id}] Lambda invocation #{_lambda_invocation_count} ({'COLD START' if is_cold_start else 'WARM'})")
38-
logger.info(f"[{request_id}] Received agentcore analytics event: {json.dumps(event, default=str)}")
36+
query = event.get('query')
37+
if not query:
38+
return {
39+
'statusCode': 200,
40+
'body': json.dumps({'query': '', 'result': 'No query provided'})
41+
}
3942

4043
try:
41-
# Create fresh AWS session for each invocation
42-
session = boto3.Session()
43-
logger.info(f"[{request_id}] Created fresh boto3 session")
44-
45-
# Extract query from event
46-
query = event.get('query', 'Unknown')
47-
logger.info(f"[{request_id}] Received query from gateway: {query}")
48-
49-
if not query or query == 'Unknown':
50-
logger.warning(f"[{request_id}] No query provided in event")
51-
return {
52-
'statusCode': 200,
53-
'body': json.dumps({
54-
'query': query,
55-
'result': 'No query provided'
56-
})
57-
}
58-
59-
# Get analytics configuration
60-
config = get_analytics_config()
61-
logger.info(f"[{request_id}] Configuration loaded successfully")
44+
# Reuse session and config across warm starts
45+
if _session is None:
46+
_session = boto3.Session()
47+
if _config is None:
48+
_config = get_analytics_config()
6249

63-
# Create analytics agent using factory
64-
analytics_agents = agent_factory.list_available_agents()
65-
analytics_agent_ids = [agent["agent_id"] for agent in analytics_agents if "analytics" in agent["agent_id"].lower()]
66-
67-
if not analytics_agent_ids:
68-
logger.warning(f"[{request_id}] No analytics agents found")
69-
return {
70-
'statusCode': 200,
71-
'body': json.dumps({
72-
'query': query,
73-
'result': 'No analytics agents available'
74-
})
75-
}
76-
77-
logger.info(f"[{request_id}] Using analytics agents: {analytics_agent_ids}")
78-
79-
# Create orchestrator with analytics agents
80-
orchestrator = agent_factory.create_conversational_orchestrator(
81-
agent_ids=analytics_agent_ids,
82-
session_id=request_id,
83-
config=config,
84-
session=session
85-
)
86-
logger.info(f"[{request_id}] Analytics orchestrator created")
50+
# Create analytics agent directly
51+
agent = create_analytics_agent(config=_config, session=_session)
8752

8853
try:
89-
# Process query through orchestrator
90-
logger.info(f"[{request_id}] Processing query through orchestrator")
91-
92-
# Try different methods based on what's available
93-
if hasattr(orchestrator, 'invoke'):
94-
result = orchestrator.invoke(query)
95-
elif hasattr(orchestrator, '__call__'):
96-
result = orchestrator(query)
97-
else:
98-
# Log available methods for debugging
99-
available_methods = [method for method in dir(orchestrator) if not method.startswith('_') and callable(getattr(orchestrator, method))]
100-
logger.error(f"[{request_id}] No suitable method found. Available methods: {available_methods}")
101-
raise AttributeError(f"Orchestrator has no invoke method. Available: {available_methods}")
102-
103-
logger.info(f"[{request_id}] Query processing completed")
104-
105-
# Format response
106-
response = {
54+
start_time = time.time()
55+
logger.info(f"Query: {query}")
56+
result = agent(query)
57+
elapsed = time.time() - start_time
58+
logger.info(f"Query completed in {elapsed:.2f}s")
59+
return {
10760
'statusCode': 200,
108-
'body': json.dumps({
109-
'query': query,
110-
'result': str(result)
111-
})
61+
'body': json.dumps({'query': query, 'result': str(result)})
11262
}
113-
114-
logger.info(f"[{request_id}] Returning successful response")
115-
logger.debug(f"[{request_id}] Response: {response}")
116-
return response
117-
11863
finally:
119-
# Clean up orchestrator resources
120-
try:
121-
if hasattr(orchestrator, '__exit__'):
122-
orchestrator.__exit__(None, None, None)
123-
elif hasattr(orchestrator, 'close'):
124-
orchestrator.close()
125-
logger.info(f"[{request_id}] Orchestrator resources cleaned up")
126-
except Exception as e:
127-
logger.warning(f"[{request_id}] Error cleaning up orchestrator: {e}")
64+
if hasattr(agent, '__exit__'):
65+
agent.__exit__(None, None, None)
66+
67+
except (EventStreamError, ClientError) as e:
68+
error_str = str(e).lower()
69+
error_code = getattr(e, 'response', {}).get('Error', {}).get('Code', '')
70+
71+
if 'unavailable' in error_str or error_code in ['ThrottlingException', 'ServiceUnavailable']:
72+
message = 'Service temporarily unavailable due to high demand. Please retry in a moment.'
73+
else:
74+
logger.error(f"AWS error: {e}")
75+
message = f'Error: {str(e)}'
12876

77+
return {
78+
'statusCode': 200,
79+
'body': json.dumps({'query': query, 'result': message})
80+
}
12981
except Exception as e:
130-
logger.error(f"[{request_id}] Error in agentcore analytics processor: {str(e)}")
82+
logger.error(f"Error: {e}")
13183
return {
13284
'statusCode': 200,
133-
'body': json.dumps({
134-
'query': event.get('query', 'Unknown'),
135-
'result': f'Error: {str(e)}'
136-
})
85+
'body': json.dumps({'query': query, 'result': 'An error occurred processing your request. Please try again.'})
13786
}

0 commit comments

Comments
 (0)