Skip to content

Commit 3812024

Browse files
committed
checkpointer
1 parent a447e5b commit 3812024

File tree

4 files changed

+501
-8
lines changed

4 files changed

+501
-8
lines changed

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ dependencies = [
4848
"yaspin>=3.1.0",
4949
"claude-agent-sdk>=0.1.0",
5050
"anthropic>=0.40.0",
51+
"langgraph-checkpoint-postgres>=2.0.0",
52+
"psycopg[binary]>=3.0.0",
5153
]
5254

5355
requires-python = ">= 3.12,<4"

src/agentex/lib/adk/__init__.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from agentex.lib.adk._modules.acp import ACPModule
66
from agentex.lib.adk._modules.agents import AgentsModule
77
from agentex.lib.adk._modules.agent_task_tracker import AgentTaskTrackerModule
8+
from agentex.lib.adk._modules.checkpointer import AgentexCheckpointer
89
from agentex.lib.adk._modules.events import EventsModule
910
from agentex.lib.adk._modules.messages import MessagesModule
1011
from agentex.lib.adk._modules.state import StateModule
@@ -27,16 +28,19 @@
2728

2829
__all__ = [
2930
# Core
30-
"acp",
31+
"acp",
3132
"agents",
32-
"tasks",
33-
"messages",
34-
"state",
35-
"streaming",
36-
"tracing",
33+
"tasks",
34+
"messages",
35+
"state",
36+
"streaming",
37+
"tracing",
3738
"events",
3839
"agent_task_tracker",
3940

41+
# Checkpointing
42+
"AgentexCheckpointer",
43+
4044
# Providers
4145
"providers",
4246
# Utils
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import os
5+
6+
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
7+
from psycopg_pool import AsyncConnectionPool
8+
9+
from agentex.lib.utils.logging import make_logger
10+
11+
logger = make_logger(__name__)
12+
13+
# Pool configuration - sized for single agent with concurrent task execution
14+
# Checkpoint operations are fast (single row upserts), so connections are released quickly
15+
_POOL_MIN_SIZE = 2 # Keep a couple connections warm
16+
_POOL_MAX_SIZE = 20 # Handle concurrent tasks within a single agent pod
17+
_POOL_TIMEOUT = 60.0 # Wait for connection before failing
18+
_POOL_MAX_IDLE = 300.0 # Clean up idle connections after 5 minutes
19+
20+
21+
class AgentexCheckpointer:
22+
"""LangGraph checkpointer using AgentEx PostgreSQL.
23+
24+
Provides a singleton PostgreSQL connection pool for LangGraph checkpoint storage.
25+
Thread-safe initialization with proper cleanup support.
26+
27+
Usage:
28+
checkpointer = await AgentexCheckpointer.create()
29+
# Use checkpointer with LangGraph...
30+
31+
# On shutdown:
32+
await AgentexCheckpointer.close()
33+
"""
34+
35+
_pool: AsyncConnectionPool | None = None
36+
_checkpointer: AsyncPostgresSaver | None = None
37+
_lock: asyncio.Lock | None = None
38+
39+
@classmethod
40+
def _get_lock(cls) -> asyncio.Lock:
41+
"""Get or create the initialization lock."""
42+
if cls._lock is None:
43+
cls._lock = asyncio.Lock()
44+
return cls._lock
45+
46+
@classmethod
47+
async def create(cls) -> AsyncPostgresSaver:
48+
"""Create a PostgreSQL checkpointer.
49+
50+
Uses DATABASE_URL from environment (same as AgentEx).
51+
Tables are auto-created on first use via CREATE IF NOT EXISTS.
52+
53+
Returns:
54+
AsyncPostgresSaver configured for LangGraph checkpoint storage.
55+
56+
Raises:
57+
ValueError: If DATABASE_URL is not set.
58+
Exception: If connection to PostgreSQL fails.
59+
"""
60+
# Fast path - already initialized
61+
if cls._checkpointer is not None:
62+
return cls._checkpointer
63+
64+
async with cls._get_lock():
65+
# Double-check after acquiring lock
66+
if cls._checkpointer is not None:
67+
return cls._checkpointer
68+
69+
db_url = os.environ.get("DATABASE_URL")
70+
if not db_url:
71+
raise ValueError(
72+
"DATABASE_URL not set. "
73+
"Add it to your manifest.yaml env section."
74+
)
75+
76+
pool = None
77+
try:
78+
logger.info("Initializing PostgreSQL checkpointer connection pool")
79+
80+
pool = AsyncConnectionPool(
81+
conninfo=db_url,
82+
min_size=_POOL_MIN_SIZE,
83+
max_size=_POOL_MAX_SIZE,
84+
timeout=_POOL_TIMEOUT,
85+
max_idle=_POOL_MAX_IDLE,
86+
open=False,
87+
kwargs={"autocommit": True},
88+
)
89+
await pool.open()
90+
91+
checkpointer = AsyncPostgresSaver(pool)
92+
await checkpointer.setup() # Idempotent - creates tables if needed
93+
94+
# Only set class state after everything succeeds
95+
cls._pool = pool
96+
cls._checkpointer = checkpointer
97+
98+
logger.info("PostgreSQL checkpointer initialized successfully")
99+
return cls._checkpointer
100+
101+
except Exception as e:
102+
# Clean up pool if it was created
103+
if pool is not None:
104+
try:
105+
await pool.close()
106+
except Exception as close_error:
107+
logger.warning(f"Error closing pool during cleanup: {close_error}")
108+
109+
logger.error(f"Failed to initialize PostgreSQL checkpointer: {e}")
110+
raise
111+
112+
@classmethod
113+
async def close(cls) -> None:
114+
"""Close the connection pool and clean up resources.
115+
116+
Safe to call multiple times. Should be called on application shutdown.
117+
"""
118+
async with cls._get_lock():
119+
if cls._pool is not None:
120+
try:
121+
logger.info("Closing PostgreSQL checkpointer connection pool")
122+
await cls._pool.close()
123+
logger.info("PostgreSQL checkpointer connection pool closed")
124+
except Exception as e:
125+
logger.error(f"Error closing PostgreSQL connection pool: {e}")
126+
raise
127+
finally:
128+
cls._pool = None
129+
cls._checkpointer = None
130+
131+
@classmethod
132+
def is_initialized(cls) -> bool:
133+
"""Check if the checkpointer has been initialized."""
134+
return cls._checkpointer is not None

0 commit comments

Comments
 (0)