diff --git a/examples/demos/procurement_agent/evals/README.md b/examples/demos/procurement_agent/evals/README.md
new file mode 100644
index 000000000..ddb96573c
--- /dev/null
+++ b/examples/demos/procurement_agent/evals/README.md
@@ -0,0 +1,63 @@
+# Procurement Agent Evals
+
+Integration tests for the procurement agent that verify tool calls and database state.
+
+## Prerequisites
+
+1. AgentEx backend running (`make dev` from scale-agentex)
+2. Procurement agent running:
+ ```bash
+ cd examples/demos/procurement_agent
+ export ENVIRONMENT=development
+ uv run agentex agents run --manifest manifest.yaml
+ ```
+
+## Running Tests
+
+From the `procurement_agent` directory:
+
+```bash
+# Run all tests
+cd evals && uv run pytest
+
+# Run specific test file
+cd evals && uv run pytest tasks/test_shipment_departed.py -v
+
+# Run single test
+cd evals && uv run pytest tasks/test_shipment_departed.py::test_departed_01_no_flag_5_days_early -v
+```
+
+## Test Structure
+
+| File | Event Type | Focus |
+|------|------------|-------|
+| `test_submittal_approved.py` | Submittal_Approved | PO issued, DB entry |
+| `test_shipment_departed.py` | Shipment_Departed | **False positive detection** |
+| `test_shipment_arrived.py` | Shipment_Arrived | Team notification, inspection |
+| `test_inspection_failed.py` | Inspection_Failed | Human-in-the-loop |
+| `test_inspection_passed.py` | Inspection_Passed | Status update |
+
+## Test Cases Summary
+
+| Event | Tests | Key Assertions |
+|-------|-------|----------------|
+| Submittal_Approved | 2 | `issue_purchase_order` called, DB item created |
+| Shipment_Departed | 6 | Forbidden: `flag_potential_issue` when ETA < required_by |
+| Shipment_Arrived | 2 | `notify_team`, `schedule_inspection` called |
+| Inspection_Failed | 3 | Human-in-loop: approve, approve+extra, reject+delete |
+| Inspection_Passed | 2 | Forbidden: `wait_for_human`, `flag_potential_issue` |
+
+## Graders
+
+- **tool_calls.py**: Verifies required and forbidden tool calls in transcripts
+- **database.py**: Verifies database state changes
+
+## False Positive Detection
+
+The `test_shipment_departed.py` tests are specifically designed to catch the false positive issue where the agent incorrectly flags conflicts.
+
+**Conflict logic:**
+- **Flag if** ETA >= required_by (zero/negative buffer)
+- **Don't flag if** ETA < required_by (has buffer remaining)
+
+The tests use `assert_forbidden_tools(["flag_potential_issue"])` to catch cases where the agent incorrectly escalates.
diff --git a/examples/demos/procurement_agent/evals/__init__.py b/examples/demos/procurement_agent/evals/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/examples/demos/procurement_agent/evals/conftest.py b/examples/demos/procurement_agent/evals/conftest.py
new file mode 100644
index 000000000..28e299b45
--- /dev/null
+++ b/examples/demos/procurement_agent/evals/conftest.py
@@ -0,0 +1,227 @@
+"""
+Pytest fixtures for procurement agent evals.
+
+Provides workflow setup, transcript extraction, and human input simulation.
+"""
+from __future__ import annotations
+
+import os
+import uuid
+import asyncio
+from typing import Any, AsyncGenerator
+from datetime import datetime as dt
+
+import pytest
+import pytest_asyncio
+from temporalio.client import Client, WorkflowHandle
+
+from agentex.types.task import Task
+from agentex.types.agent import Agent
+from agentex.lib.types.acp import CreateTaskParams
+
+# Set environment variables for local development
+os.environ.setdefault("AGENT_NAME", "procurement-agent")
+os.environ.setdefault("ACP_URL", "http://localhost:8000")
+os.environ.setdefault("WORKFLOW_NAME", "procurement-agent")
+os.environ.setdefault("WORKFLOW_TASK_QUEUE", "procurement_agent_queue")
+os.environ.setdefault("TEMPORAL_ADDRESS", "localhost:7233")
+
+
+@pytest.fixture(scope="session")
+def event_loop():
+ """Create an event loop for the test session."""
+ loop = asyncio.get_event_loop_policy().new_event_loop()
+ yield loop
+ loop.close()
+
+
+@pytest_asyncio.fixture(scope="session")
+async def temporal_client() -> AsyncGenerator[Client, None]:
+ """Create a Temporal client for the test session."""
+ client = await Client.connect(
+ os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")
+ )
+ yield client
+ # Client doesn't need explicit close
+
+
+@pytest_asyncio.fixture
+async def workflow_handle(temporal_client: Client) -> AsyncGenerator[WorkflowHandle, None]:
+ """
+ Start a fresh workflow for each test.
+
+ Creates a unique workflow ID and starts the procurement agent workflow.
+ Yields the handle for sending signals and querying state.
+ """
+ workflow_id = f"eval-{uuid.uuid4()}"
+ task_queue = os.environ.get("WORKFLOW_TASK_QUEUE", "procurement_agent_queue")
+ workflow_name = os.environ.get("WORKFLOW_NAME", "procurement-agent")
+
+ # Create agent and task params
+ now = dt.now()
+ agent = Agent(
+ id="procurement-agent",
+ name="procurement-agent",
+ acp_type="agentic",
+ description="Procurement agent for construction delivery management",
+ created_at=now,
+ updated_at=now,
+ )
+ task = Task(id=workflow_id)
+ create_task_params = CreateTaskParams(agent=agent, task=task, params=None)
+
+ # Start the workflow
+ handle = await temporal_client.start_workflow(
+ workflow_name,
+ create_task_params,
+ id=workflow_id,
+ task_queue=task_queue,
+ )
+
+ # Give workflow time to initialize
+ await asyncio.sleep(2)
+
+ yield handle
+
+ # Cleanup: terminate the workflow after test
+ try:
+ await handle.terminate("Test completed")
+ except Exception:
+ pass # Workflow may have already completed
+
+
+async def send_event(handle: WorkflowHandle, event: Any) -> None:
+ """
+ Send an event to the workflow via signal.
+
+ Args:
+ handle: The workflow handle
+ event: A Pydantic event model (will be serialized to JSON)
+ """
+ event_json = event.model_dump_json()
+ await handle.signal("send_event", event_json)
+
+
+async def send_human_response(handle: WorkflowHandle, response: str) -> None:
+ """
+ Send a human response to the workflow.
+
+ This simulates a user responding in the UI to a wait_for_human escalation.
+
+ Args:
+ handle: The workflow handle
+ response: The human's text response
+ """
+ # Import here to avoid circular imports
+ from agentex.types.task import Task
+ from agentex.types.agent import Agent
+ from agentex.types.event import Event
+ from agentex.lib.types.acp import SendEventParams
+ from agentex.types.text_content import TextContent
+
+ now = dt.now()
+ agent = Agent(
+ id="procurement-agent",
+ name="procurement-agent",
+ acp_type="agentic",
+ description="Procurement agent for construction delivery management",
+ created_at=now,
+ updated_at=now,
+ )
+ task = Task(id=handle.id)
+ event = Event(
+ id=str(uuid.uuid4()),
+ agent_id="procurement-agent",
+ task_id=handle.id,
+ sequence_id=1,
+ content=TextContent(author="user", content=response),
+ )
+ params = SendEventParams(agent=agent, task=task, event=event)
+
+ await handle.signal("receive_event", params)
+
+
+async def wait_for_processing(_handle: WorkflowHandle, timeout_seconds: float = 60) -> None:
+ """
+ Wait for the workflow to finish processing an event.
+
+ Polls the workflow until no more activities are running.
+
+ Args:
+ _handle: The workflow handle (unused, reserved for future polling)
+ timeout_seconds: Maximum time to wait
+ """
+ # Simple approach: wait a fixed time for agent to process
+ # In production, you'd poll workflow state more intelligently
+ await asyncio.sleep(timeout_seconds)
+
+
+async def get_workflow_transcript(handle: WorkflowHandle) -> list[dict[str, Any]]:
+ """
+ Extract the conversation transcript from workflow history.
+
+ Queries the workflow to get the internal state containing tool calls.
+
+ Args:
+ handle: The workflow handle
+
+ Returns:
+ List of message dicts containing tool calls and responses
+ """
+ # Query workflow state to get the input_list (conversation history)
+ # This requires the workflow to expose a query handler
+
+ # For now, we'll extract from workflow history events
+ # The tool calls appear in activity completions
+ transcript = []
+
+ async for event in handle.fetch_history_events():
+ # Look for activity completed events
+ if hasattr(event, 'activity_task_completed_event_attributes'):
+ attrs = event.activity_task_completed_event_attributes
+ if attrs and hasattr(attrs, 'result'):
+ # Activity results contain tool execution info
+ transcript.append({
+ "type": "activity_completed",
+ "result": str(attrs.result) if attrs.result else None,
+ })
+
+ # Look for activity scheduled events (contains tool name)
+ if hasattr(event, 'activity_task_scheduled_event_attributes'):
+ attrs = event.activity_task_scheduled_event_attributes
+ if attrs and hasattr(attrs, 'activity_type'):
+ activity_name = attrs.activity_type.name if attrs.activity_type else None
+ transcript.append({
+ "type": "function_call",
+ "name": activity_name,
+ })
+
+ return transcript
+
+
+async def get_transcript_event_count(handle: WorkflowHandle) -> int:
+ """Get the current number of events in the transcript."""
+ transcript = await get_workflow_transcript(handle)
+ return len(transcript)
+
+
+def get_new_tool_calls(
+ full_transcript: list[dict[str, Any]],
+ previous_count: int
+) -> list[dict[str, Any]]:
+ """
+ Get only the new tool calls since the previous checkpoint.
+
+ Args:
+ full_transcript: The complete transcript from get_workflow_transcript
+ previous_count: The transcript length before the event was sent
+
+ Returns:
+ List of new tool call entries
+ """
+ return full_transcript[previous_count:]
+
+
+def get_workflow_id(handle: WorkflowHandle) -> str:
+ """Get the workflow ID from a handle."""
+ return handle.id
diff --git a/examples/demos/procurement_agent/evals/fixtures/__init__.py b/examples/demos/procurement_agent/evals/fixtures/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/examples/demos/procurement_agent/evals/fixtures/events.py b/examples/demos/procurement_agent/evals/fixtures/events.py
new file mode 100644
index 000000000..31f1c919b
--- /dev/null
+++ b/examples/demos/procurement_agent/evals/fixtures/events.py
@@ -0,0 +1,108 @@
+"""
+Event fixtures for eval test cases.
+
+Provides factory functions to create events with configurable parameters.
+"""
+from typing import Optional
+from datetime import datetime, timedelta
+
+from project.models.events import (
+ EventType,
+ InspectionFailedEvent,
+ InspectionPassedEvent,
+ SubmitalApprovalEvent,
+ ShipmentArrivedSiteEvent,
+ ShipmentDepartedFactoryEvent,
+)
+
+
+def create_submittal_approved(item: str) -> SubmitalApprovalEvent:
+ """Create a Submittal_Approved event."""
+ return SubmitalApprovalEvent(
+ event_type=EventType.SUBMITTAL_APPROVED,
+ item=item,
+ document_name=f"{item} Submittal.pdf",
+ document_url=f"/submittals/{item.lower().replace(' ', '_')}.pdf",
+ )
+
+
+def create_shipment_departed(
+ item: str,
+ eta: datetime,
+ date_departed: Optional[datetime] = None,
+) -> ShipmentDepartedFactoryEvent:
+ """
+ Create a Shipment_Departed_Factory event.
+
+ Args:
+ item: The item name
+ eta: Estimated time of arrival (this is what gets compared to required_by)
+ date_departed: When shipment left factory (defaults to 7 days before ETA)
+ """
+ if date_departed is None:
+ date_departed = eta - timedelta(days=7)
+
+ return ShipmentDepartedFactoryEvent(
+ event_type=EventType.SHIPMENT_DEPARTED_FACTORY,
+ item=item,
+ eta=eta,
+ date_departed=date_departed,
+ location_address="218 W 18th St, New York, NY 10011",
+ )
+
+
+def create_shipment_arrived(
+ item: str,
+ date_arrived: datetime,
+) -> ShipmentArrivedSiteEvent:
+ """Create a Shipment_Arrived_Site event."""
+ return ShipmentArrivedSiteEvent(
+ event_type=EventType.SHIPMENT_ARRIVED_SITE,
+ item=item,
+ date_arrived=date_arrived,
+ location_address="650 Townsend St, San Francisco, CA 94103",
+ )
+
+
+def create_inspection_failed(
+ item: str,
+ inspection_date: Optional[datetime] = None,
+) -> InspectionFailedEvent:
+ """Create an Inspection_Failed event."""
+ if inspection_date is None:
+ inspection_date = datetime.now()
+
+ return InspectionFailedEvent(
+ event_type=EventType.INSPECTION_FAILED,
+ item=item,
+ inspection_date=inspection_date,
+ document_name=f"{item} Inspection Report.pdf",
+ document_url=f"/inspections/{item.lower().replace(' ', '_')}_failed.pdf",
+ )
+
+
+def create_inspection_passed(
+ item: str,
+ inspection_date: Optional[datetime] = None,
+) -> InspectionPassedEvent:
+ """Create an Inspection_Passed event."""
+ if inspection_date is None:
+ inspection_date = datetime.now()
+
+ return InspectionPassedEvent(
+ event_type=EventType.INSPECTION_PASSED,
+ item=item,
+ inspection_date=inspection_date,
+ document_name=f"{item} Inspection Report.pdf",
+ document_url=f"/inspections/{item.lower().replace(' ', '_')}_passed.pdf",
+ )
+
+
+# Default schedule reference (matches database.py DEFAULT_SCHEDULE)
+SCHEDULE_REFERENCE = {
+ "Steel Beams": {"required_by": "2026-02-15", "buffer_days": 5},
+ "HVAC Units": {"required_by": "2026-03-01", "buffer_days": 7},
+ "Windows": {"required_by": "2026-03-15", "buffer_days": 10},
+ "Flooring Materials": {"required_by": "2026-04-01", "buffer_days": 3},
+ "Electrical Panels": {"required_by": "2026-04-15", "buffer_days": 5},
+}
diff --git a/examples/demos/procurement_agent/evals/graders/__init__.py b/examples/demos/procurement_agent/evals/graders/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/examples/demos/procurement_agent/evals/graders/database.py b/examples/demos/procurement_agent/evals/graders/database.py
new file mode 100644
index 000000000..e1651662c
--- /dev/null
+++ b/examples/demos/procurement_agent/evals/graders/database.py
@@ -0,0 +1,187 @@
+"""
+Database grader - verifies database state after agent actions.
+"""
+from __future__ import annotations
+
+import json
+from typing import Any, Optional
+from pathlib import Path
+
+import aiosqlite # type: ignore[import-not-found]
+
+# Use the same DB path as the main application
+DB_PATH = Path(__file__).parent.parent.parent / "project" / "data" / "procurement.db"
+
+
+async def get_procurement_item(workflow_id: str, item: str) -> Optional[dict[str, Any]]:
+ """
+ Get a procurement item from the database.
+
+ Args:
+ workflow_id: The Temporal workflow ID
+ item: The item name
+
+ Returns:
+ Dict with item fields or None if not found
+ """
+ async with aiosqlite.connect(DB_PATH) as db:
+ db.row_factory = aiosqlite.Row
+ async with db.execute(
+ """
+ SELECT workflow_id, item, status, eta, date_arrived, purchase_order_id,
+ created_at, updated_at
+ FROM procurement_items
+ WHERE workflow_id = ? AND item = ?
+ """,
+ (workflow_id, item)
+ ) as cursor:
+ row = await cursor.fetchone()
+ if row:
+ return dict(row)
+ return None
+
+
+async def get_schedule_delivery(workflow_id: str, item: str) -> Optional[dict[str, Any]]:
+ """
+ Get a delivery item from the master construction schedule.
+
+ Args:
+ workflow_id: The Temporal workflow ID
+ item: The item name
+
+ Returns:
+ Dict with delivery fields or None if not found
+ """
+ async with aiosqlite.connect(DB_PATH) as db:
+ db.row_factory = aiosqlite.Row
+ async with db.execute(
+ """
+ SELECT schedule_json
+ FROM master_construction_schedule
+ WHERE workflow_id = ?
+ """,
+ (workflow_id,)
+ ) as cursor:
+ row = await cursor.fetchone()
+ if row:
+ schedule = json.loads(row["schedule_json"])
+ for delivery in schedule.get("deliveries", []):
+ if delivery.get("item") == item:
+ return delivery
+ return None
+
+
+async def assert_procurement_item_exists(
+ workflow_id: str,
+ item: str,
+ expected_status: Optional[str] = None,
+ expected_po_id_not_null: bool = False,
+ expected_eta: Optional[str] = None,
+ expected_date_arrived: Optional[str] = None,
+) -> dict[str, Any]:
+ """
+ Assert a procurement item exists with expected fields.
+
+ Args:
+ workflow_id: The Temporal workflow ID
+ item: The item name
+ expected_status: If provided, assert status matches
+ expected_po_id_not_null: If True, assert purchase_order_id is not null
+ expected_eta: If provided, assert ETA matches
+ expected_date_arrived: If provided, assert date_arrived matches
+
+ Returns:
+ The procurement item record
+
+ Raises:
+ AssertionError: If item doesn't exist or fields don't match
+ """
+ record = await get_procurement_item(workflow_id, item)
+
+ if record is None:
+ raise AssertionError(
+ f"Procurement item not found: workflow_id={workflow_id}, item={item}"
+ )
+
+ if expected_status is not None:
+ assert record["status"] == expected_status, (
+ f"Expected status '{expected_status}', got '{record['status']}'"
+ )
+
+ if expected_po_id_not_null:
+ assert record["purchase_order_id"] is not None, (
+ "Expected purchase_order_id to be set, but it was null"
+ )
+
+ if expected_eta is not None:
+ assert record["eta"] == expected_eta, (
+ f"Expected ETA '{expected_eta}', got '{record['eta']}'"
+ )
+
+ if expected_date_arrived is not None:
+ assert record["date_arrived"] == expected_date_arrived, (
+ f"Expected date_arrived '{expected_date_arrived}', got '{record['date_arrived']}'"
+ )
+
+ return record
+
+
+async def assert_procurement_item_not_exists(workflow_id: str, item: str) -> None:
+ """
+ Assert a procurement item does NOT exist (was deleted).
+
+ Args:
+ workflow_id: The Temporal workflow ID
+ item: The item name
+
+ Raises:
+ AssertionError: If item still exists
+ """
+ record = await get_procurement_item(workflow_id, item)
+ if record is not None:
+ raise AssertionError(
+ f"Procurement item should not exist but was found: {record}"
+ )
+
+
+async def assert_schedule_item_not_exists(workflow_id: str, item: str) -> None:
+ """
+ Assert an item is NOT in the master construction schedule (was removed).
+
+ Args:
+ workflow_id: The Temporal workflow ID
+ item: The item name
+
+ Raises:
+ AssertionError: If item still in schedule
+ """
+ delivery = await get_schedule_delivery(workflow_id, item)
+ if delivery is not None:
+ raise AssertionError(
+ f"Schedule item should not exist but was found: {delivery}"
+ )
+
+
+async def assert_schedule_delivery_date(
+ workflow_id: str,
+ item: str,
+ expected_required_by: str
+) -> None:
+ """
+ Assert a delivery item has the expected required_by date.
+
+ Args:
+ workflow_id: The Temporal workflow ID
+ item: The item name
+ expected_required_by: The expected date string
+
+ Raises:
+ AssertionError: If date doesn't match
+ """
+ delivery = await get_schedule_delivery(workflow_id, item)
+ if delivery is None:
+ raise AssertionError(f"Schedule delivery not found for item: {item}")
+
+ assert delivery["required_by"] == expected_required_by, (
+ f"Expected required_by '{expected_required_by}', got '{delivery['required_by']}'"
+ )
diff --git a/examples/demos/procurement_agent/evals/graders/tool_calls.py b/examples/demos/procurement_agent/evals/graders/tool_calls.py
new file mode 100644
index 000000000..7c28a8626
--- /dev/null
+++ b/examples/demos/procurement_agent/evals/graders/tool_calls.py
@@ -0,0 +1,80 @@
+"""
+Tool call grader - extracts and verifies tool calls from workflow transcripts.
+"""
+from __future__ import annotations
+
+from typing import Any
+
+
+def extract_tool_calls(transcript: list[dict[str, Any]]) -> list[str]:
+ """
+ Extract tool/function names from a workflow transcript.
+
+ The transcript is the messages array from the agent run, containing
+ items with type="function_call" for tool invocations.
+
+ Args:
+ transcript: List of message dicts from agent execution
+
+ Returns:
+ List of tool names that were called
+ """
+ tool_calls = []
+ for item in transcript:
+ if isinstance(item, dict):
+ # Handle function_call type (from OpenAI agents format)
+ if item.get("type") == "function_call":
+ tool_name = item.get("name")
+ if tool_name:
+ tool_calls.append(tool_name)
+ # Handle tool_calls nested in assistant messages
+ if item.get("role") == "assistant" and "tool_calls" in item:
+ for tc in item.get("tool_calls", []):
+ if isinstance(tc, dict) and "function" in tc:
+ tool_name = tc["function"].get("name")
+ if tool_name:
+ tool_calls.append(tool_name)
+ return tool_calls
+
+
+def assert_required_tools(transcript: list[dict[str, Any]], required: list[str]) -> None:
+ """
+ Assert that all required tools were called.
+
+ Args:
+ transcript: The workflow transcript
+ required: List of tool names that must appear
+
+ Raises:
+ AssertionError: If any required tool is missing
+ """
+ called = set(extract_tool_calls(transcript))
+ missing = set(required) - called
+ if missing:
+ raise AssertionError(
+ f"Required tools not called: {missing}. "
+ f"Tools that were called: {called}"
+ )
+
+
+def assert_forbidden_tools(transcript: list[dict[str, Any]], forbidden: list[str]) -> None:
+ """
+ Assert that forbidden tools were NOT called.
+
+ This is critical for catching false positives (e.g., flagging conflicts
+ when there shouldn't be any).
+
+ Args:
+ transcript: The workflow transcript
+ forbidden: List of tool names that must NOT appear
+
+ Raises:
+ AssertionError: If any forbidden tool was called
+ """
+ called = set(extract_tool_calls(transcript))
+ violations = called & set(forbidden)
+ if violations:
+ raise AssertionError(
+ f"Forbidden tools were called: {violations}. "
+ f"These tools should NOT have been invoked in this scenario."
+ )
diff --git a/examples/demos/procurement_agent/evals/pytest.ini b/examples/demos/procurement_agent/evals/pytest.ini
new file mode 100644
index 000000000..71d66ba7f
--- /dev/null
+++ b/examples/demos/procurement_agent/evals/pytest.ini
@@ -0,0 +1,8 @@
+[pytest]
+asyncio_mode = auto
+testpaths = tasks
+python_files = test_*.py
+python_functions = test_*
+markers =
+ asyncio: mark test as async
+addopts = -v --tb=short
diff --git a/examples/demos/procurement_agent/evals/report.html b/examples/demos/procurement_agent/evals/report.html
new file mode 100644
index 000000000..8b3b6ea04
--- /dev/null
+++ b/examples/demos/procurement_agent/evals/report.html
@@ -0,0 +1,1094 @@
+
+
+
+
+ report.html
+
+
+
+
+ report.html
+ Report generated on 20-Jan-2026 at 11:45:33 by pytest-html
+ v4.2.0
+
+
+
+
+
+ |
+ |
+
+
+
+
+
+ | No results found. Check the filters. |
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Summary
+
+
+
15 tests took 00:23:01.
+
(Un)check the boxes to filter the results.
+
+
+
+
+
+
+
+
+
+
+
+
+ | Result |
+ Test |
+ Duration |
+ Links |
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/demos/procurement_agent/evals/tasks/__init__.py b/examples/demos/procurement_agent/evals/tasks/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/examples/demos/procurement_agent/evals/tasks/test_inspection_failed.py b/examples/demos/procurement_agent/evals/tasks/test_inspection_failed.py
new file mode 100644
index 000000000..a975ecd5d
--- /dev/null
+++ b/examples/demos/procurement_agent/evals/tasks/test_inspection_failed.py
@@ -0,0 +1,162 @@
+"""
+Tests for Inspection_Failed event handling with human-in-the-loop.
+
+Verifies:
+- Agent escalates to human (wait_for_human called)
+- Agent responds correctly to different human inputs:
+ 1. "Yes" - executes recommended action
+ 2. "Yes, and also..." - executes action + additional request
+ 3. "No, delete..." - removes item from schedule
+"""
+from datetime import datetime
+
+import pytest
+
+from evals.conftest import (
+ send_event,
+ get_workflow_id,
+ send_human_response,
+ wait_for_processing,
+ get_workflow_transcript,
+)
+from evals.fixtures.events import (
+ create_shipment_arrived,
+ create_inspection_failed,
+ create_shipment_departed,
+ create_submittal_approved,
+)
+from evals.graders.database import (
+ assert_schedule_delivery_date,
+ assert_procurement_item_exists,
+ assert_schedule_item_not_exists,
+ assert_procurement_item_not_exists,
+)
+from evals.graders.tool_calls import assert_required_tools
+
+
+async def setup_through_arrived(workflow_handle, item: str) -> None:
+ """Helper to set up item through shipment arrived state."""
+ eta = datetime(2026, 2, 15, 11, 0)
+ arrival = datetime(2026, 2, 15, 10, 30)
+
+ await send_event(workflow_handle, create_submittal_approved(item))
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ await send_event(workflow_handle, create_shipment_departed(item, eta=eta))
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ await send_event(workflow_handle, create_shipment_arrived(item, date_arrived=arrival))
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+
+@pytest.mark.asyncio
+async def test_failed_01_human_approves(workflow_handle):
+ """
+ Test Inspection_Failed where human approves recommendation.
+
+ Human response: "Yes"
+ Expected: Agent executes its recommended action
+ """
+ item = "HVAC Units"
+ workflow_id = get_workflow_id(workflow_handle)
+
+ # Setup through arrived state
+ await setup_through_arrived(workflow_handle, item)
+
+ # Send inspection failed
+ event = create_inspection_failed(item)
+ await send_event(workflow_handle, event)
+
+ # Wait for agent to escalate (call wait_for_human)
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ # Send human approval
+ await send_human_response(workflow_handle, "Yes")
+
+ # Wait for agent to process response
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ # Note: wait_for_human is a function_tool (not Temporal activity),
+ # so we verify the workflow responded correctly by checking DB state
+
+ # DB should still have the item (agent executed recommendation)
+ await assert_procurement_item_exists(
+ workflow_id=workflow_id,
+ item=item,
+ )
+
+
+@pytest.mark.asyncio
+async def test_failed_02_human_approves_with_extra_action(workflow_handle):
+ """
+ Test Inspection_Failed where human approves + requests extra action.
+
+ Human response: "Yes, and also update the delivery date to 2026-03-15"
+ Expected: Agent executes recommendation AND updates delivery date
+ """
+ item = "HVAC Units"
+ workflow_id = get_workflow_id(workflow_handle)
+
+ await setup_through_arrived(workflow_handle, item)
+
+ event = create_inspection_failed(item)
+ await send_event(workflow_handle, event)
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ # Human approves AND requests delivery date update
+ await send_human_response(
+ workflow_handle,
+ "Yes, and also update the delivery date to 2026-03-15"
+ )
+ await wait_for_processing(workflow_handle, timeout_seconds=60) # More time for extra action
+
+ transcript = await get_workflow_transcript(workflow_handle)
+ # Note: wait_for_human is a function_tool (not visible in Temporal history)
+ # Verify the agent responded to human input by calling update_delivery_date_for_item
+ assert_required_tools(transcript, [
+ "update_delivery_date_for_item", # Should update schedule
+ ])
+
+ # Verify schedule was updated
+ await assert_schedule_delivery_date(
+ workflow_id=workflow_id,
+ item=item,
+ expected_required_by="2026-03-15",
+ )
+
+
+@pytest.mark.asyncio
+async def test_failed_03_human_rejects_delete(workflow_handle):
+ """
+ Test Inspection_Failed where human rejects and requests deletion.
+
+ Human response: "No, remove it from the master schedule entirely"
+ Expected: Item removed from schedule AND procurement items
+ """
+ item = "HVAC Units"
+ workflow_id = get_workflow_id(workflow_handle)
+
+ await setup_through_arrived(workflow_handle, item)
+
+ event = create_inspection_failed(item)
+ await send_event(workflow_handle, event)
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ # Human rejects and requests deletion
+ await send_human_response(
+ workflow_handle,
+ "No, remove it from the master schedule entirely"
+ )
+ await wait_for_processing(workflow_handle, timeout_seconds=60)
+
+ transcript = await get_workflow_transcript(workflow_handle)
+ # Note: wait_for_human is a function_tool (not visible in Temporal history)
+ # Verify the agent responded to human input by removing/deleting items
+ assert_required_tools(transcript, [
+ "remove_delivery_item", # Remove from schedule
+ "delete_procurement_item_activity", # Delete tracking record
+ ])
+
+ # Verify item was deleted from both places
+ await assert_procurement_item_not_exists(workflow_id, item)
+ await assert_schedule_item_not_exists(workflow_id, item)
diff --git a/examples/demos/procurement_agent/evals/tasks/test_inspection_passed.py b/examples/demos/procurement_agent/evals/tasks/test_inspection_passed.py
new file mode 100644
index 000000000..815a5ab23
--- /dev/null
+++ b/examples/demos/procurement_agent/evals/tasks/test_inspection_passed.py
@@ -0,0 +1,116 @@
+"""
+Tests for Inspection_Passed event handling.
+
+Verifies:
+- Procurement item status updated to passed
+- No escalation to human (forbidden tools)
+"""
+from datetime import datetime
+
+import pytest
+
+from evals.conftest import (
+ send_event,
+ get_workflow_id,
+ get_new_tool_calls,
+ wait_for_processing,
+ get_workflow_transcript,
+ get_transcript_event_count,
+)
+from evals.fixtures.events import (
+ create_shipment_arrived,
+ create_inspection_passed,
+ create_shipment_departed,
+ create_submittal_approved,
+)
+from evals.graders.database import assert_procurement_item_exists
+from evals.graders.tool_calls import assert_required_tools, assert_forbidden_tools
+
+
+async def setup_through_arrived(workflow_handle, item: str, eta: datetime, arrival: datetime) -> None:
+ """Helper to set up item through shipment arrived state."""
+ await send_event(workflow_handle, create_submittal_approved(item))
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ await send_event(workflow_handle, create_shipment_departed(item, eta=eta))
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ await send_event(workflow_handle, create_shipment_arrived(item, date_arrived=arrival))
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+
+@pytest.mark.asyncio
+async def test_passed_01_steel_beams(workflow_handle):
+ """
+ Test Inspection_Passed for Steel Beams.
+
+ Expected:
+ - update_procurement_item_activity called
+ - NO wait_for_human (should not escalate on success)
+ - NO flag_potential_issue
+ - DB shows inspection_passed status
+ """
+ item = "Steel Beams"
+ workflow_id = get_workflow_id(workflow_handle)
+
+ eta = datetime(2026, 2, 10, 14, 30)
+ arrival = datetime(2026, 2, 10, 15, 45)
+ await setup_through_arrived(workflow_handle, item, eta, arrival)
+
+ # Get transcript count BEFORE sending inspection_passed
+ previous_count = await get_transcript_event_count(workflow_handle)
+
+ # Send inspection passed
+ event = create_inspection_passed(item)
+ await send_event(workflow_handle, event)
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ # Verify tool calls for THIS EVENT ONLY (not entire workflow)
+ full_transcript = await get_workflow_transcript(workflow_handle)
+ new_tool_calls = get_new_tool_calls(full_transcript, previous_count)
+ assert_required_tools(new_tool_calls, ["update_procurement_item_activity"])
+ assert_forbidden_tools(new_tool_calls, [
+ "wait_for_human", # Should NOT escalate on success
+ "flag_potential_issue", # Should NOT flag issues
+ ])
+
+ # Verify DB state
+ await assert_procurement_item_exists(
+ workflow_id=workflow_id,
+ item=item,
+ expected_status="inspection_passed",
+ )
+
+
+@pytest.mark.asyncio
+async def test_passed_02_windows(workflow_handle):
+ """
+ Test Inspection_Passed for Windows.
+
+ Same expectations as Steel Beams.
+ """
+ item = "Windows"
+ workflow_id = get_workflow_id(workflow_handle)
+
+ eta = datetime(2026, 3, 5, 16, 0)
+ arrival = datetime(2026, 3, 5, 16, 20)
+ await setup_through_arrived(workflow_handle, item, eta, arrival)
+
+ # Get transcript count BEFORE sending inspection_passed
+ previous_count = await get_transcript_event_count(workflow_handle)
+
+ event = create_inspection_passed(item)
+ await send_event(workflow_handle, event)
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ # Verify tool calls for THIS EVENT ONLY
+ full_transcript = await get_workflow_transcript(workflow_handle)
+ new_tool_calls = get_new_tool_calls(full_transcript, previous_count)
+ assert_required_tools(new_tool_calls, ["update_procurement_item_activity"])
+ assert_forbidden_tools(new_tool_calls, ["wait_for_human", "flag_potential_issue"])
+
+ await assert_procurement_item_exists(
+ workflow_id=workflow_id,
+ item=item,
+ expected_status="inspection_passed",
+ )
diff --git a/examples/demos/procurement_agent/evals/tasks/test_shipment_arrived.py b/examples/demos/procurement_agent/evals/tasks/test_shipment_arrived.py
new file mode 100644
index 000000000..9d9ee293f
--- /dev/null
+++ b/examples/demos/procurement_agent/evals/tasks/test_shipment_arrived.py
@@ -0,0 +1,119 @@
+"""
+Tests for Shipment_Arrived_Site event handling.
+
+Verifies:
+- Team notification sent
+- Inspection scheduled
+- Procurement item updated with arrival date
+"""
+from datetime import datetime
+
+import pytest
+
+from evals.conftest import (
+ send_event,
+ get_workflow_id,
+ get_new_tool_calls,
+ wait_for_processing,
+ get_workflow_transcript,
+ get_transcript_event_count,
+)
+from evals.fixtures.events import (
+ create_shipment_arrived,
+ create_shipment_departed,
+ create_submittal_approved,
+)
+from evals.graders.database import assert_procurement_item_exists
+from evals.graders.tool_calls import assert_required_tools
+
+
+async def setup_through_departed(workflow_handle, item: str, eta: datetime) -> None:
+ """Helper to set up item through shipment departed state."""
+ # Submittal approved
+ await send_event(workflow_handle, create_submittal_approved(item))
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ # Shipment departed
+ await send_event(workflow_handle, create_shipment_departed(item, eta=eta))
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+
+@pytest.mark.asyncio
+async def test_arrived_01_steel_beams(workflow_handle):
+ """
+ Test Shipment_Arrived_Site for Steel Beams.
+
+ Expected:
+ - notify_team_shipment_arrived called
+ - schedule_inspection called
+ - update_procurement_item_activity called
+ - DB shows shipment_arrived status with date
+ """
+ item = "Steel Beams"
+ workflow_id = get_workflow_id(workflow_handle)
+ arrival_date = datetime(2026, 2, 10, 15, 45)
+
+ # Setup through departed state
+ eta = datetime(2026, 2, 10, 14, 30)
+ await setup_through_departed(workflow_handle, item, eta)
+
+ # Get transcript count BEFORE sending arrived event
+ previous_count = await get_transcript_event_count(workflow_handle)
+
+ # Send arrived event
+ event = create_shipment_arrived(item, date_arrived=arrival_date)
+ await send_event(workflow_handle, event)
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ # Verify tool calls for THIS EVENT ONLY
+ full_transcript = await get_workflow_transcript(workflow_handle)
+ new_tool_calls = get_new_tool_calls(full_transcript, previous_count)
+ assert_required_tools(new_tool_calls, [
+ "notify_team_shipment_arrived",
+ "schedule_inspection",
+ "update_procurement_item_activity",
+ ])
+
+ # Verify DB state
+ await assert_procurement_item_exists(
+ workflow_id=workflow_id,
+ item=item,
+ expected_status="shipment_arrived",
+ )
+
+
+@pytest.mark.asyncio
+async def test_arrived_02_windows(workflow_handle):
+ """
+ Test Shipment_Arrived_Site for Windows.
+
+ Same expectations as Steel Beams.
+ """
+ item = "Windows"
+ workflow_id = get_workflow_id(workflow_handle)
+ arrival_date = datetime(2026, 3, 5, 16, 20)
+
+ eta = datetime(2026, 3, 5, 16, 0)
+ await setup_through_departed(workflow_handle, item, eta)
+
+ # Get transcript count BEFORE sending arrived event
+ previous_count = await get_transcript_event_count(workflow_handle)
+
+ event = create_shipment_arrived(item, date_arrived=arrival_date)
+ await send_event(workflow_handle, event)
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ # Verify tool calls for THIS EVENT ONLY
+ full_transcript = await get_workflow_transcript(workflow_handle)
+ new_tool_calls = get_new_tool_calls(full_transcript, previous_count)
+ assert_required_tools(new_tool_calls, [
+ "notify_team_shipment_arrived",
+ "schedule_inspection",
+ "update_procurement_item_activity",
+ ])
+
+ await assert_procurement_item_exists(
+ workflow_id=workflow_id,
+ item=item,
+ expected_status="shipment_arrived",
+ )
diff --git a/examples/demos/procurement_agent/evals/tasks/test_shipment_departed.py b/examples/demos/procurement_agent/evals/tasks/test_shipment_departed.py
new file mode 100644
index 000000000..4e64172e7
--- /dev/null
+++ b/examples/demos/procurement_agent/evals/tasks/test_shipment_departed.py
@@ -0,0 +1,202 @@
+"""
+Tests for Shipment_Departed_Factory event handling.
+
+CRITICAL: These tests catch the false positive issue where the agent
+incorrectly flags conflicts when ETA is before the required_by date.
+
+Conflict logic:
+- Flag if ETA >= required_by (zero/negative buffer)
+- Don't flag if ETA < required_by (has buffer remaining)
+"""
+from datetime import datetime
+
+import pytest
+
+from evals.conftest import (
+ send_event,
+ get_workflow_id,
+ get_new_tool_calls,
+ wait_for_processing,
+ get_workflow_transcript,
+ get_transcript_event_count,
+)
+from evals.fixtures.events import (
+ create_shipment_departed,
+ create_submittal_approved,
+)
+from evals.graders.database import assert_procurement_item_exists
+from evals.graders.tool_calls import assert_required_tools, assert_forbidden_tools
+
+
+async def setup_submittal_approved(workflow_handle, item: str) -> None:
+ """Helper to set up item through submittal approved state."""
+ event = create_submittal_approved(item)
+ await send_event(workflow_handle, event)
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+
+# =============================================================================
+# NO FLAG CASES - ETA < required_by
+# =============================================================================
+
+@pytest.mark.asyncio
+async def test_departed_01_no_flag_5_days_early(workflow_handle):
+ """
+ Steel Beams: ETA 2026-02-10, Required 2026-02-15
+ 5 days early - well within buffer, should NOT flag.
+ """
+ item = "Steel Beams"
+ workflow_id = get_workflow_id(workflow_handle)
+
+ # Setup: submittal approved first
+ await setup_submittal_approved(workflow_handle, item)
+
+ # Get transcript count BEFORE sending departed event
+ previous_count = await get_transcript_event_count(workflow_handle)
+
+ # Send shipment departed with ETA 5 days early
+ eta = datetime(2026, 2, 10, 14, 30) # Feb 10
+ event = create_shipment_departed(item, eta=eta)
+ await send_event(workflow_handle, event)
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ # Verify tool calls for THIS EVENT ONLY
+ full_transcript = await get_workflow_transcript(workflow_handle)
+ new_tool_calls = get_new_tool_calls(full_transcript, previous_count)
+ assert_required_tools(new_tool_calls, ["update_procurement_item_activity"])
+ assert_forbidden_tools(new_tool_calls, ["flag_potential_issue"]) # MUST NOT FLAG
+
+ # Verify DB state
+ await assert_procurement_item_exists(
+ workflow_id=workflow_id,
+ item=item,
+ expected_status="shipment_departed",
+ )
+
+
+@pytest.mark.asyncio
+async def test_departed_02_no_flag_1_day_early(workflow_handle):
+ """
+ Steel Beams: ETA 2026-02-14, Required 2026-02-15
+ 1 day early - boundary case but still OK, should NOT flag.
+ """
+ item = "Steel Beams"
+ workflow_id = get_workflow_id(workflow_handle)
+
+ await setup_submittal_approved(workflow_handle, item)
+
+ previous_count = await get_transcript_event_count(workflow_handle)
+
+ eta = datetime(2026, 2, 14, 14, 30) # Feb 14 - 1 day before required
+ event = create_shipment_departed(item, eta=eta)
+ await send_event(workflow_handle, event)
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ full_transcript = await get_workflow_transcript(workflow_handle)
+ new_tool_calls = get_new_tool_calls(full_transcript, previous_count)
+ assert_required_tools(new_tool_calls, ["update_procurement_item_activity"])
+ assert_forbidden_tools(new_tool_calls, ["flag_potential_issue"]) # MUST NOT FLAG
+
+
+@pytest.mark.asyncio
+async def test_departed_05_no_flag_windows_10_days_early(workflow_handle):
+ """
+ Windows: ETA 2026-03-05, Required 2026-03-15
+ 10 days early - uses buffer but still OK, should NOT flag.
+ """
+ item = "Windows"
+ workflow_id = get_workflow_id(workflow_handle)
+
+ await setup_submittal_approved(workflow_handle, item)
+
+ previous_count = await get_transcript_event_count(workflow_handle)
+
+ eta = datetime(2026, 3, 5, 16, 0) # Mar 5 - 10 days before required
+ event = create_shipment_departed(item, eta=eta)
+ await send_event(workflow_handle, event)
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ full_transcript = await get_workflow_transcript(workflow_handle)
+ new_tool_calls = get_new_tool_calls(full_transcript, previous_count)
+ assert_required_tools(new_tool_calls, ["update_procurement_item_activity"])
+ assert_forbidden_tools(new_tool_calls, ["flag_potential_issue"]) # MUST NOT FLAG
+
+
+@pytest.mark.asyncio
+async def test_departed_06_no_flag_hvac_1_day_early(workflow_handle):
+ """
+ HVAC Units: ETA 2026-02-28, Required 2026-03-01
+ 1 day early - tight boundary case, should NOT flag.
+ """
+ item = "HVAC Units"
+ workflow_id = get_workflow_id(workflow_handle)
+
+ await setup_submittal_approved(workflow_handle, item)
+
+ previous_count = await get_transcript_event_count(workflow_handle)
+
+ eta = datetime(2026, 2, 28, 11, 0) # Feb 28 - 1 day before Mar 1
+ event = create_shipment_departed(item, eta=eta)
+ await send_event(workflow_handle, event)
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ full_transcript = await get_workflow_transcript(workflow_handle)
+ new_tool_calls = get_new_tool_calls(full_transcript, previous_count)
+ assert_required_tools(new_tool_calls, ["update_procurement_item_activity"])
+ assert_forbidden_tools(new_tool_calls, ["flag_potential_issue"]) # MUST NOT FLAG
+
+
+# =============================================================================
+# FLAG CASES - ETA >= required_by
+# =============================================================================
+
+@pytest.mark.asyncio
+async def test_departed_03_flag_on_deadline(workflow_handle):
+ """
+ Steel Beams: ETA 2026-02-15, Required 2026-02-15
+ Arrives ON deadline - zero buffer, SHOULD FLAG.
+ """
+ item = "Steel Beams"
+ workflow_id = get_workflow_id(workflow_handle)
+
+ await setup_submittal_approved(workflow_handle, item)
+
+ previous_count = await get_transcript_event_count(workflow_handle)
+
+ eta = datetime(2026, 2, 15, 14, 30) # Feb 15 = required date
+ event = create_shipment_departed(item, eta=eta)
+ await send_event(workflow_handle, event)
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ full_transcript = await get_workflow_transcript(workflow_handle)
+ new_tool_calls = get_new_tool_calls(full_transcript, previous_count)
+ assert_required_tools(new_tool_calls, [
+ "flag_potential_issue", # MUST FLAG
+ "update_procurement_item_activity",
+ ])
+
+
+@pytest.mark.asyncio
+async def test_departed_04_flag_late(workflow_handle):
+ """
+ Steel Beams: ETA 2026-02-20, Required 2026-02-15
+ 5 days LATE - definite conflict, SHOULD FLAG.
+ """
+ item = "Steel Beams"
+ workflow_id = get_workflow_id(workflow_handle)
+
+ await setup_submittal_approved(workflow_handle, item)
+
+ previous_count = await get_transcript_event_count(workflow_handle)
+
+ eta = datetime(2026, 2, 20, 14, 30) # Feb 20 - 5 days after required
+ event = create_shipment_departed(item, eta=eta)
+ await send_event(workflow_handle, event)
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ full_transcript = await get_workflow_transcript(workflow_handle)
+ new_tool_calls = get_new_tool_calls(full_transcript, previous_count)
+ assert_required_tools(new_tool_calls, [
+ "flag_potential_issue", # MUST FLAG
+ "update_procurement_item_activity",
+ ])
diff --git a/examples/demos/procurement_agent/evals/tasks/test_submittal_approved.py b/examples/demos/procurement_agent/evals/tasks/test_submittal_approved.py
new file mode 100644
index 000000000..3182eab7e
--- /dev/null
+++ b/examples/demos/procurement_agent/evals/tasks/test_submittal_approved.py
@@ -0,0 +1,87 @@
+"""
+Tests for Submittal_Approved event handling.
+
+Verifies:
+- Purchase order is issued (tool call)
+- Procurement item created in DB with correct status and PO ID
+"""
+import pytest
+
+from evals.conftest import (
+ send_event,
+ get_workflow_id,
+ wait_for_processing,
+ get_workflow_transcript,
+)
+from evals.fixtures.events import create_submittal_approved
+from evals.graders.database import assert_procurement_item_exists
+from evals.graders.tool_calls import assert_required_tools
+
+
+@pytest.mark.asyncio
+async def test_submittal_01_steel_beams(workflow_handle):
+ """
+ Test Submittal_Approved for Steel Beams.
+
+ Expected:
+ - issue_purchase_order tool called
+ - create_procurement_item_activity called
+ - DB has procurement item with status and PO ID
+ """
+ item = "Steel Beams"
+ workflow_id = get_workflow_id(workflow_handle)
+
+ # Send event
+ event = create_submittal_approved(item)
+ await send_event(workflow_handle, event)
+
+ # Wait for processing
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ # Get transcript and verify tool calls
+ transcript = await get_workflow_transcript(workflow_handle)
+ assert_required_tools(transcript, [
+ "issue_purchase_order",
+ "create_procurement_item_activity", # Activity name in Temporal
+ ])
+
+ # Verify DB state
+ await assert_procurement_item_exists(
+ workflow_id=workflow_id,
+ item=item,
+ expected_status="purchase_order_issued",
+ expected_po_id_not_null=True,
+ )
+
+
+@pytest.mark.asyncio
+async def test_submittal_02_hvac_units(workflow_handle):
+ """
+ Test Submittal_Approved for HVAC Units.
+
+ Same expectations as Steel Beams - verifies consistency.
+ """
+ item = "HVAC Units"
+ workflow_id = get_workflow_id(workflow_handle)
+
+ # Send event
+ event = create_submittal_approved(item)
+ await send_event(workflow_handle, event)
+
+ # Wait for processing
+ await wait_for_processing(workflow_handle, timeout_seconds=30)
+
+ # Get transcript and verify tool calls
+ transcript = await get_workflow_transcript(workflow_handle)
+ assert_required_tools(transcript, [
+ "issue_purchase_order",
+ "create_procurement_item_activity",
+ ])
+
+ # Verify DB state
+ await assert_procurement_item_exists(
+ workflow_id=workflow_id,
+ item=item,
+ expected_status="purchase_order_issued",
+ expected_po_id_not_null=True,
+ )
diff --git a/examples/demos/procurement_agent/project/agents/procurement_agent.py b/examples/demos/procurement_agent/project/agents/procurement_agent.py
index 475cd83e7..45858d45e 100644
--- a/examples/demos/procurement_agent/project/agents/procurement_agent.py
+++ b/examples/demos/procurement_agent/project/agents/procurement_agent.py
@@ -456,6 +456,19 @@ def new_procurement_agent(master_construction_schedule: str, human_input_learnin
If the user says no or has feedback, please come up with another solution and call the wait_for_human tool again (you can call it as many times as needed).
+## CRITICAL: When to Flag Potential Issues (Shipment_Departed_Factory events)
+
+When processing a Shipment_Departed_Factory event, you MUST compare the ETA to the required_by date from the master schedule:
+
+- **ONLY flag_potential_issue if ETA >= required_by** (zero buffer or late - this is a problem!)
+- **DO NOT flag_potential_issue if ETA < required_by** (there is still buffer remaining - no issue!)
+
+Example 1: Item required_by 2026-02-15, ETA is 2026-02-10 → DO NOT FLAG (5 days buffer remaining)
+Example 2: Item required_by 2026-02-15, ETA is 2026-02-15 → FLAG (zero buffer - on the deadline!)
+Example 3: Item required_by 2026-02-15, ETA is 2026-02-20 → FLAG (5 days late!)
+
+The buffer_days field in the schedule is informational only. What matters is: Does ETA arrive BEFORE the required_by date?
+
## Context
Master Construction Schedule:
diff --git a/examples/demos/procurement_agent/project/scripts/happy_path.py b/examples/demos/procurement_agent/project/scripts/happy_path.py
new file mode 100644
index 000000000..44ed6247c
--- /dev/null
+++ b/examples/demos/procurement_agent/project/scripts/happy_path.py
@@ -0,0 +1,184 @@
+#!/usr/bin/env python
+"""
+Happy path demo script - shows two items going through successfully.
+Both items pass inspection and arrive within time buffers.
+"""
+
+import os
+import sys
+import asyncio
+from datetime import datetime
+
+from temporalio.client import Client
+
+from project.models.events import (
+ EventType,
+ InspectionPassedEvent,
+ SubmitalApprovalEvent,
+ ShipmentArrivedSiteEvent,
+ ShipmentDepartedFactoryEvent,
+)
+from agentex.lib.utils.logging import make_logger
+from agentex.lib.environment_variables import EnvironmentVariables
+
+# Set defaults for local development
+os.environ.setdefault("AGENT_NAME", "procurement-agent")
+os.environ.setdefault("ACP_URL", "http://localhost:8000")
+os.environ.setdefault("WORKFLOW_NAME", "procurement-agent")
+os.environ.setdefault("WORKFLOW_TASK_QUEUE", "procurement_agent_queue")
+os.environ.setdefault("TEMPORAL_ADDRESS", "localhost:7233")
+
+logger = make_logger(__name__)
+environment_variables = EnvironmentVariables.refresh()
+
+# Delay between events (seconds)
+EVENT_DELAY = 15
+
+
+async def send_happy_path_events(workflow_id: str):
+ """Send happy path events: two items, both pass inspection."""
+
+ # Connect to Temporal
+ temporal_url = environment_variables.TEMPORAL_ADDRESS or "localhost:7233"
+ client = await Client.connect(temporal_url)
+
+ # Get handle to the workflow
+ handle = client.get_workflow_handle(workflow_id)
+
+ # Item 1: Steel Beams - will PASS inspection
+ # Required by: 2026-02-15, Buffer: 5 days
+ # Arriving on 2026-02-10 (5 days early - within buffer)
+ steel_events = [
+ SubmitalApprovalEvent(
+ event_type=EventType.SUBMITTAL_APPROVED,
+ item="Steel Beams",
+ document_name="Steel Beams Submittal.pdf",
+ document_url="/submittal_approval.pdf"
+ ),
+ ShipmentDepartedFactoryEvent(
+ event_type=EventType.SHIPMENT_DEPARTED_FACTORY,
+ item="Steel Beams",
+ eta=datetime(2026, 2, 10, 14, 30),
+ date_departed=datetime(2026, 2, 3, 9, 15),
+ location_address="218 W 18th St, New York, NY 10011"
+ ),
+ ShipmentArrivedSiteEvent(
+ event_type=EventType.SHIPMENT_ARRIVED_SITE,
+ item="Steel Beams",
+ date_arrived=datetime(2026, 2, 10, 15, 45),
+ location_address="650 Townsend St, San Francisco, CA 94103"
+ ),
+ InspectionPassedEvent(
+ event_type=EventType.INSPECTION_PASSED,
+ item="Steel Beams",
+ inspection_date=datetime(2026, 2, 11, 10, 20),
+ document_name="Steel Beams Inspection Report.pdf",
+ document_url="/inspection_passed.pdf"
+ )
+ ]
+
+ # Item 2: Windows - will PASS inspection
+ # Required by: 2026-03-15, Buffer: 10 days
+ # Arriving on 2026-03-05 (10 days early - within buffer)
+ windows_events = [
+ SubmitalApprovalEvent(
+ event_type=EventType.SUBMITTAL_APPROVED,
+ item="Windows",
+ document_name="Windows Submittal.pdf",
+ document_url="/submittal_approval.pdf"
+ ),
+ ShipmentDepartedFactoryEvent(
+ event_type=EventType.SHIPMENT_DEPARTED_FACTORY,
+ item="Windows",
+ eta=datetime(2026, 3, 5, 16, 0),
+ date_departed=datetime(2026, 2, 20, 8, 30),
+ location_address="218 W 18th St, New York, NY 10011"
+ ),
+ ShipmentArrivedSiteEvent(
+ event_type=EventType.SHIPMENT_ARRIVED_SITE,
+ item="Windows",
+ date_arrived=datetime(2026, 3, 5, 16, 20),
+ location_address="650 Townsend St, San Francisco, CA 94103"
+ ),
+ InspectionPassedEvent(
+ event_type=EventType.INSPECTION_PASSED,
+ item="Windows",
+ inspection_date=datetime(2026, 3, 6, 9, 45),
+ document_name="Windows Inspection Report.pdf",
+ document_url="/inspection_passed.pdf"
+ )
+ ]
+
+ all_events = [
+ ("Steel Beams", steel_events),
+ ("Windows", windows_events),
+ ]
+
+ print(f"Connected to workflow: {workflow_id}")
+ print("=" * 60)
+ print("HAPPY PATH DEMO: Two items, both pass inspection")
+ print(f"Event delay: {EVENT_DELAY}s")
+ print("=" * 60)
+
+ for item_name, events in all_events:
+ print(f"\n{'=' * 60}")
+ print(f"Processing: {item_name}")
+ print("=" * 60)
+
+ for i, event in enumerate(events, 1):
+ print(f"\n[{i}/4] Sending: {event.event_type.value}")
+ print(f" Item: {event.item}")
+
+ if hasattr(event, 'eta'):
+ print(f" ETA: {event.eta}")
+ if hasattr(event, 'date_arrived'):
+ print(f" Date Arrived: {event.date_arrived}")
+ if hasattr(event, 'inspection_date'):
+ print(f" Inspection Date: {event.inspection_date}")
+
+ try:
+ event_data = event.model_dump_json()
+ await handle.signal("send_event", event_data)
+ print(f" ✓ Sent!")
+
+ await asyncio.sleep(EVENT_DELAY)
+
+ except Exception as e:
+ print(f" ✗ Error: {e}")
+ logger.error(f"Failed to send event: {e}")
+
+ print("\n" + "=" * 60)
+ print("Happy path demo complete! Both items passed inspection.")
+ print("Check the UI to see processed events.")
+ print("=" * 60)
+
+
+async def main():
+ """Main entry point."""
+
+ if len(sys.argv) > 1:
+ workflow_id = sys.argv[1]
+ else:
+ print("Enter Workflow ID:")
+ workflow_id = input("Workflow ID: ").strip()
+
+ if not workflow_id:
+ print("Error: Workflow ID required!")
+ print("\nUsage: python happy_path.py [workflow_id]")
+ return
+
+ try:
+ await send_happy_path_events(workflow_id)
+ except KeyboardInterrupt:
+ print("\n\nInterrupted. Goodbye!")
+ except Exception as e:
+ logger.error(f"Unexpected error: {e}")
+ print(f"Error: {e}")
+ print("\nMake sure:")
+ print("1. The workflow is running")
+ print("2. The workflow ID is correct")
+ print("3. Temporal is accessible at", environment_variables.TEMPORAL_ADDRESS)
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/demos/procurement_agent/project/scripts/human_in_the_loop.py b/examples/demos/procurement_agent/project/scripts/human_in_the_loop.py
new file mode 100644
index 000000000..c2e2ebc53
--- /dev/null
+++ b/examples/demos/procurement_agent/project/scripts/human_in_the_loop.py
@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+"""
+Human-in-the-loop demo script - shows an item that fails inspection.
+Demonstrates the need for human intervention when inspection fails.
+"""
+
+import os
+import sys
+import asyncio
+from datetime import datetime
+
+from temporalio.client import Client
+
+from project.models.events import (
+ EventType,
+ InspectionFailedEvent,
+ SubmitalApprovalEvent,
+ ShipmentArrivedSiteEvent,
+ ShipmentDepartedFactoryEvent,
+)
+from agentex.lib.utils.logging import make_logger
+from agentex.lib.environment_variables import EnvironmentVariables
+
+# Set defaults for local development
+os.environ.setdefault("AGENT_NAME", "procurement-agent")
+os.environ.setdefault("ACP_URL", "http://localhost:8000")
+os.environ.setdefault("WORKFLOW_NAME", "procurement-agent")
+os.environ.setdefault("WORKFLOW_TASK_QUEUE", "procurement_agent_queue")
+os.environ.setdefault("TEMPORAL_ADDRESS", "localhost:7233")
+
+logger = make_logger(__name__)
+environment_variables = EnvironmentVariables.refresh()
+
+# Delay between events (seconds)
+EVENT_DELAY = 3
+# Longer delay after inspection failure to observe the failure handling
+POST_FAILURE_DELAY = 30
+
+
+async def send_human_in_the_loop_events(workflow_id: str):
+ """Send events for one item that fails inspection."""
+
+ # Connect to Temporal
+ temporal_url = environment_variables.TEMPORAL_ADDRESS or "localhost:7233"
+ client = await Client.connect(temporal_url)
+
+ # Get handle to the workflow
+ handle = client.get_workflow_handle(workflow_id)
+
+ # HVAC Units - will FAIL inspection
+ # Required by: 2026-03-01, Buffer: 7 days
+ # Arriving on 2026-02-15 (14 days early - well within buffer)
+ hvac_events = [
+ SubmitalApprovalEvent(
+ event_type=EventType.SUBMITTAL_APPROVED,
+ item="HVAC Units",
+ document_name="HVAC Units Submittal.pdf",
+ document_url="/submittal_approval.pdf"
+ ),
+ ShipmentDepartedFactoryEvent(
+ event_type=EventType.SHIPMENT_DEPARTED_FACTORY,
+ item="HVAC Units",
+ eta=datetime(2026, 2, 15, 11, 0),
+ date_departed=datetime(2026, 2, 8, 13, 45),
+ location_address="218 W 18th St, New York, NY 10011"
+ ),
+ ShipmentArrivedSiteEvent(
+ event_type=EventType.SHIPMENT_ARRIVED_SITE,
+ item="HVAC Units",
+ date_arrived=datetime(2026, 2, 15, 10, 30),
+ location_address="650 Townsend St, San Francisco, CA 94103"
+ ),
+ InspectionFailedEvent(
+ event_type=EventType.INSPECTION_FAILED,
+ item="HVAC Units",
+ inspection_date=datetime(2026, 2, 16, 14, 15),
+ document_name="HVAC Units Inspection Report.pdf",
+ document_url="/inspection_failed.pdf"
+ )
+ ]
+
+ print(f"Connected to workflow: {workflow_id}")
+ print("=" * 60)
+ print("HUMAN-IN-THE-LOOP DEMO: Item fails inspection")
+ print(f"Event delay: {EVENT_DELAY}s")
+ print("=" * 60)
+
+ print(f"\n{'=' * 60}")
+ print("Processing: HVAC Units (will FAIL inspection)")
+ print("=" * 60)
+
+ for i, event in enumerate(hvac_events, 1):
+ print(f"\n[{i}/4] Sending: {event.event_type.value}")
+ print(f" Item: {event.item}")
+
+ if hasattr(event, 'eta'):
+ print(f" ETA: {event.eta}")
+ if hasattr(event, 'date_arrived'):
+ print(f" Date Arrived: {event.date_arrived}")
+ if hasattr(event, 'inspection_date'):
+ print(f" Inspection Date: {event.inspection_date}")
+
+ try:
+ event_data = event.model_dump_json()
+ await handle.signal("send_event", event_data)
+ print(f" ✓ Sent!")
+
+ # Use longer delay after inspection failure
+ is_last_event = (i == len(hvac_events))
+ if is_last_event:
+ print(f"\n ⚠️ INSPECTION FAILED!")
+ print(f" ⏳ Waiting {POST_FAILURE_DELAY}s to observe failure handling...")
+ print(f" 💡 Check the UI - agent should request human input")
+ await asyncio.sleep(POST_FAILURE_DELAY)
+ else:
+ await asyncio.sleep(EVENT_DELAY)
+
+ except Exception as e:
+ print(f" ✗ Error: {e}")
+ logger.error(f"Failed to send event: {e}")
+
+ print("\n" + "=" * 60)
+ print("Human-in-the-loop demo complete!")
+ print("The agent should now be waiting for human input to resolve")
+ print("the inspection failure. Check the UI to provide input.")
+ print("=" * 60)
+
+
+async def main():
+ """Main entry point."""
+
+ if len(sys.argv) > 1:
+ workflow_id = sys.argv[1]
+ else:
+ print("Enter Workflow ID:")
+ workflow_id = input("Workflow ID: ").strip()
+
+ if not workflow_id:
+ print("Error: Workflow ID required!")
+ print("\nUsage: python human_in_the_loop.py [workflow_id]")
+ return
+
+ try:
+ await send_human_in_the_loop_events(workflow_id)
+ except KeyboardInterrupt:
+ print("\n\nInterrupted. Goodbye!")
+ except Exception as e:
+ logger.error(f"Unexpected error: {e}")
+ print(f"Error: {e}")
+ print("\nMake sure:")
+ print("1. The workflow is running")
+ print("2. The workflow ID is correct")
+ print("3. Temporal is accessible at", environment_variables.TEMPORAL_ADDRESS)
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/demos/procurement_agent/project/scripts/out_of_order.py b/examples/demos/procurement_agent/project/scripts/out_of_order.py
new file mode 100644
index 000000000..164c4a9e5
--- /dev/null
+++ b/examples/demos/procurement_agent/project/scripts/out_of_order.py
@@ -0,0 +1,180 @@
+#!/usr/bin/env python
+"""
+Out-of-order events demo script - tests agent's ability to handle duplicate/out-of-order signals.
+Sends a submittal approval event again after shipment arrives but before inspection,
+to verify the agent recognizes it already happened and ignores the duplicate.
+"""
+
+import os
+import sys
+import asyncio
+from datetime import datetime
+
+from temporalio.client import Client
+
+from project.models.events import (
+ EventType,
+ InspectionPassedEvent,
+ SubmitalApprovalEvent,
+ ShipmentArrivedSiteEvent,
+ ShipmentDepartedFactoryEvent,
+)
+from agentex.lib.utils.logging import make_logger
+from agentex.lib.environment_variables import EnvironmentVariables
+
+# Set defaults for local development
+os.environ.setdefault("AGENT_NAME", "procurement-agent")
+os.environ.setdefault("ACP_URL", "http://localhost:8000")
+os.environ.setdefault("WORKFLOW_NAME", "procurement-agent")
+os.environ.setdefault("WORKFLOW_TASK_QUEUE", "procurement_agent_queue")
+os.environ.setdefault("TEMPORAL_ADDRESS", "localhost:7233")
+
+logger = make_logger(__name__)
+environment_variables = EnvironmentVariables.refresh()
+
+# Delay between events (seconds)
+EVENT_DELAY = 3
+# Longer delay after duplicate to observe how agent handles it
+POST_DUPLICATE_DELAY = 10
+
+
+async def send_out_of_order_events(workflow_id: str):
+ """Send events with a duplicate submittal approval after shipment arrives."""
+
+ # Connect to Temporal
+ temporal_url = environment_variables.TEMPORAL_ADDRESS or "localhost:7233"
+ client = await Client.connect(temporal_url)
+
+ # Get handle to the workflow
+ handle = client.get_workflow_handle(workflow_id)
+
+ # Flooring Materials - will PASS inspection, but with duplicate submittal event
+ # Required by: 2026-04-01, Buffer: 3 days (so buffer deadline is 2026-03-29)
+ # Arriving on 2026-03-20 (12 days early - well within buffer, no warnings)
+ events = [
+ # 1. Normal: Submittal approved
+ SubmitalApprovalEvent(
+ event_type=EventType.SUBMITTAL_APPROVED,
+ item="Flooring Materials",
+ document_name="Flooring Materials Submittal.pdf",
+ document_url="/submittal_approval.pdf"
+ ),
+ # 2. Normal: Shipment departs
+ ShipmentDepartedFactoryEvent(
+ event_type=EventType.SHIPMENT_DEPARTED_FACTORY,
+ item="Flooring Materials",
+ eta=datetime(2026, 3, 20, 13, 15),
+ date_departed=datetime(2026, 3, 13, 11, 30),
+ location_address="218 W 18th St, New York, NY 10011"
+ ),
+ # 3. Normal: Shipment arrives
+ ShipmentArrivedSiteEvent(
+ event_type=EventType.SHIPMENT_ARRIVED_SITE,
+ item="Flooring Materials",
+ date_arrived=datetime(2026, 3, 20, 12, 45),
+ location_address="650 Townsend St, San Francisco, CA 94103"
+ ),
+ # 4. OUT OF ORDER: Duplicate submittal approval (should be ignored)
+ SubmitalApprovalEvent(
+ event_type=EventType.SUBMITTAL_APPROVED,
+ item="Flooring Materials",
+ document_name="Flooring Materials Submittal.pdf",
+ document_url="/submittal_approval.pdf"
+ ),
+ # 5. Normal: Inspection passes
+ InspectionPassedEvent(
+ event_type=EventType.INSPECTION_PASSED,
+ item="Flooring Materials",
+ inspection_date=datetime(2026, 3, 21, 15, 30),
+ document_name="Flooring Materials Inspection Report.pdf",
+ document_url="/inspection_passed.pdf"
+ )
+ ]
+
+ event_labels = [
+ "Submittal Approved (initial)",
+ "Shipment Departed",
+ "Shipment Arrived",
+ "Submittal Approved (DUPLICATE - should be ignored)",
+ "Inspection Passed"
+ ]
+
+ print(f"Connected to workflow: {workflow_id}")
+ print("=" * 60)
+ print("OUT-OF-ORDER DEMO: Testing duplicate event handling")
+ print(f"Event delay: {EVENT_DELAY}s")
+ print("=" * 60)
+
+ print(f"\n{'=' * 60}")
+ print("Processing: Flooring Materials (with duplicate submittal)")
+ print("=" * 60)
+
+ for i, (event, label) in enumerate(zip(events, event_labels), 1):
+ is_duplicate = (i == 4)
+
+ print(f"\n[{i}/5] Sending: {label}")
+ print(f" Event Type: {event.event_type.value}")
+ print(f" Item: {event.item}")
+
+ if is_duplicate:
+ print(f" ⚠️ This is a DUPLICATE event - agent should recognize and ignore")
+
+ if hasattr(event, 'eta'):
+ print(f" ETA: {event.eta}")
+ if hasattr(event, 'date_arrived'):
+ print(f" Date Arrived: {event.date_arrived}")
+ if hasattr(event, 'inspection_date'):
+ print(f" Inspection Date: {event.inspection_date}")
+
+ try:
+ event_data = event.model_dump_json()
+ await handle.signal("send_event", event_data)
+ print(f" ✓ Sent!")
+
+ # Use longer delay after duplicate to observe handling
+ if is_duplicate:
+ print(f" ⏳ Waiting {POST_DUPLICATE_DELAY}s to observe duplicate handling...")
+ await asyncio.sleep(POST_DUPLICATE_DELAY)
+ else:
+ await asyncio.sleep(EVENT_DELAY)
+
+ except Exception as e:
+ print(f" ✗ Error: {e}")
+ logger.error(f"Failed to send event: {e}")
+
+ print("\n" + "=" * 60)
+ print("Out-of-order demo complete!")
+ print("The agent should have recognized the duplicate submittal")
+ print("approval and ignored it. Check the UI to verify.")
+ print("=" * 60)
+
+
+async def main():
+ """Main entry point."""
+
+ if len(sys.argv) > 1:
+ workflow_id = sys.argv[1]
+ else:
+ print("Enter Workflow ID:")
+ workflow_id = input("Workflow ID: ").strip()
+
+ if not workflow_id:
+ print("Error: Workflow ID required!")
+ print("\nUsage: python out_of_order.py [workflow_id]")
+ return
+
+ try:
+ await send_out_of_order_events(workflow_id)
+ except KeyboardInterrupt:
+ print("\n\nInterrupted. Goodbye!")
+ except Exception as e:
+ logger.error(f"Unexpected error: {e}")
+ print(f"Error: {e}")
+ print("\nMake sure:")
+ print("1. The workflow is running")
+ print("2. The workflow ID is correct")
+ print("3. Temporal is accessible at", environment_variables.TEMPORAL_ADDRESS)
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/demos/procurement_agent/pyproject.toml b/examples/demos/procurement_agent/pyproject.toml
index 7ccbf80e0..555819a5d 100644
--- a/examples/demos/procurement_agent/pyproject.toml
+++ b/examples/demos/procurement_agent/pyproject.toml
@@ -13,6 +13,7 @@ dependencies = [
"temporalio>=1.18.2",
"scale-gp",
"aiosqlite",
+ "pytest-html>=4.2.0",
]
[project.optional-dependencies]
@@ -33,4 +34,4 @@ target-version = ['py312']
[tool.isort]
profile = "black"
-line_length = 88
\ No newline at end of file
+line_length = 88
diff --git a/uv.lock b/uv.lock
index 391297102..0bbcf36b0 100644
--- a/uv.lock
+++ b/uv.lock
@@ -2675,4 +2675,4 @@ source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e3/02/0f2892c661036d50ede074e376733dca2ae7c6eb617489437771209d4180/zipp-3.23.0.tar.gz", hash = "sha256:a07157588a12518c9d4034df3fbbee09c814741a33ff63c05fa29d26a2404166", size = 25547, upload-time = "2025-06-08T17:06:39.4Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2e/54/647ade08bf0db230bfea292f893923872fd20be6ac6f53b2b936ba839d75/zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e", size = 10276, upload-time = "2025-06-08T17:06:38.034Z" },
-]
+]
\ No newline at end of file