Skip to content

Commit 84c6e4b

Browse files
committed
Clean up LRO code and update docs
1 parent 0d48861 commit 84c6e4b

File tree

5 files changed

+33
-35
lines changed

5 files changed

+33
-35
lines changed

README.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1663,7 +1663,7 @@ For more information on mounting applications in Starlette, see the [Starlette d
16631663

16641664
### Persistent Async Operations
16651665

1666-
For production deployments, you may want async operations to survive server restarts. The `ServerAsyncOperationManager` uses pluggable `AsyncOperationStore` and `AsyncOperationBroker` components to handle operation persistence and task queuing.
1666+
For production deployments, you may want async operations to survive server restarts. The `ServerAsyncOperationManager` uses pluggable `OperationEventQueue`, `AsyncOperationStore`, and `AsyncOperationBroker` components to handle operation persistence and task queuing.
16671667

16681668
#### Operation Lifecycle
16691669

@@ -1679,14 +1679,20 @@ Async operations follow this lifecycle:
16791679
from mcp.server.fastmcp import FastMCP
16801680
from mcp.shared.async_operations import ServerAsyncOperationManager
16811681

1682+
# Create custom event queues
1683+
custom_request_queue = MyAsyncOperationEventQueue()
1684+
custom_response_queue = MyAsyncOperationEventQueue()
1685+
16821686
# Create custom store and broker implementations
16831687
custom_store = MyAsyncOperationStore()
16841688
custom_broker = MyAsyncOperationBroker()
16851689

16861690
# Create operation manager with custom components
16871691
operation_manager = ServerAsyncOperationManager(
16881692
store=custom_store,
1689-
broker=custom_broker
1693+
broker=custom_broker,
1694+
operation_request_queue=custom_request_queue,
1695+
operation_response_queue=custom_response_queue,
16901696
)
16911697

16921698
# Use with FastMCP

examples/servers/sqlite-async-operations/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ This example demonstrates how to implement custom async operations storage and t
66

77
The example showcases the pluggable architecture of the async operations system:
88

9+
- `SQLiteOperationEventQueue`: Custom event queue that manages operation messages for disconnected clients
910
- `SQLiteAsyncOperationStore`: Custom implementation that persists operations to SQLite
1011
- `SQLiteAsyncOperationBroker`: Custom implementation that persists pending tasks to SQLite
1112
- `ServerAsyncOperationManager`: Uses both custom store and broker for full persistence
@@ -30,7 +31,7 @@ uv run mcp-sqlite-async-operations --transport streamable-http --port 8000
3031
## Testing Persistent Async Operations
3132

3233
1. Start the server
33-
2. Call one of the async tools (`long_computation` or `fetch_data`)
34+
2. Call the async tool (`fetch_data`)
3435
3. **Restart the server while the operation is running**
3536
4. The operation will automatically resume and complete
3637
5. Use the operation token to check status and retrieve results

examples/servers/sqlite-async-operations/mcp_sqlite_async_operations/server.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -216,24 +216,25 @@ async def cleanup_expired(self) -> int:
216216
class SQLiteOperationEventQueue(OperationEventQueue):
217217
"""SQLite-based implementation of OperationEventQueue for operation-specific event delivery."""
218218

219-
def __init__(self, db_path: str = "async_operations.db"):
219+
def __init__(self, db_path: str = "async_operations.db", table_name: str = "operation_events"):
220220
self.db_path = db_path
221+
self.table_name = table_name
221222
self._init_db()
222223

223224
def _init_db(self):
224225
"""Initialize the SQLite database for operation event queuing."""
225226
with sqlite3.connect(self.db_path) as conn:
226-
conn.execute("""
227-
CREATE TABLE IF NOT EXISTS operation_events (
227+
conn.execute(f"""
228+
CREATE TABLE IF NOT EXISTS {self.table_name} (
228229
id INTEGER PRIMARY KEY AUTOINCREMENT,
229230
operation_token TEXT NOT NULL,
230231
message TEXT NOT NULL,
231232
created_at REAL NOT NULL
232233
)
233234
""")
234-
conn.execute("""
235+
conn.execute(f"""
235236
CREATE INDEX IF NOT EXISTS idx_operation_events_token_created
236-
ON operation_events(operation_token, created_at)
237+
ON {self.table_name}(operation_token, created_at)
237238
""")
238239
conn.commit()
239240

@@ -244,8 +245,8 @@ async def enqueue_event(self, operation_token: str, message: types.JSONRPCMessag
244245

245246
with sqlite3.connect(self.db_path) as conn:
246247
conn.execute(
247-
"""
248-
INSERT INTO operation_events (operation_token, message, created_at)
248+
f"""
249+
INSERT INTO {self.table_name} (operation_token, message, created_at)
249250
VALUES (?, ?, ?)
250251
""",
251252
(operation_token, message_json, created_at),
@@ -259,8 +260,8 @@ async def dequeue_events(self, operation_token: str) -> list[types.JSONRPCMessag
259260

260261
# Get all events for this operation token
261262
cursor = conn.execute(
262-
"""
263-
SELECT id, message FROM operation_events
263+
f"""
264+
SELECT id, message FROM {self.table_name}
264265
WHERE operation_token = ?
265266
ORDER BY created_at
266267
""",
@@ -279,7 +280,7 @@ async def dequeue_events(self, operation_token: str) -> list[types.JSONRPCMessag
279280
# Delete the dequeued events
280281
if event_ids:
281282
placeholders = ",".join("?" * len(event_ids))
282-
conn.execute(f"DELETE FROM operation_events WHERE id IN ({placeholders})", event_ids)
283+
conn.execute(f"DELETE FROM {self.table_name} WHERE id IN ({placeholders})", event_ids)
283284
conn.commit()
284285

285286
return events
@@ -419,13 +420,18 @@ class UserPreferences(BaseModel):
419420
def main(port: int, transport: str, db_path: str):
420421
"""Run the SQLite async operations example server."""
421422
# Create components with specified database path
422-
operation_event_queue = SQLiteOperationEventQueue(db_path)
423+
operation_request_queue = SQLiteOperationEventQueue(db_path, "operation_requests")
424+
operation_response_queue = SQLiteOperationEventQueue(db_path, "operation_responses")
423425
broker = SQLiteAsyncOperationBroker(db_path)
424426
store = SQLiteAsyncOperationStore(db_path)
425-
manager = ServerAsyncOperationManager(store=store, broker=broker, operation_request_queue=operation_event_queue)
427+
manager = ServerAsyncOperationManager(
428+
store=store,
429+
broker=broker,
430+
operation_request_queue=operation_request_queue,
431+
operation_response_queue=operation_response_queue,
432+
)
426433
mcp = FastMCP(
427434
"SQLite Async Operations Demo",
428-
operation_event_queue=operation_event_queue,
429435
async_operations=manager,
430436
)
431437

src/mcp/server/fastmcp/server.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
from mcp.server.streamable_http import EventStore
4646
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
4747
from mcp.server.transport_security import TransportSecuritySettings
48-
from mcp.shared.async_operations import OperationEventQueue
4948
from mcp.shared.context import LifespanContextT, RequestContext, RequestT
5049
from mcp.types import (
5150
AnyFunction,
@@ -148,7 +147,6 @@ def __init__( # noqa: PLR0913
148147
event_store: EventStore | None = None,
149148
*,
150149
async_operations: ServerAsyncOperationManager | None = None,
151-
operation_event_queue: OperationEventQueue | None = None,
152150
tools: list[Tool] | None = None,
153151
debug: bool = False,
154152
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
@@ -168,7 +166,7 @@ def __init__( # noqa: PLR0913
168166
auth: AuthSettings | None = None,
169167
transport_security: TransportSecuritySettings | None = None,
170168
):
171-
from mcp.shared.async_operations import InMemoryOperationEventQueue, ServerAsyncOperationManager
169+
from mcp.shared.async_operations import ServerAsyncOperationManager
172170

173171
self.settings = Settings(
174172
debug=debug,
@@ -190,21 +188,14 @@ def __init__( # noqa: PLR0913
190188
transport_security=transport_security,
191189
)
192190

193-
self._operation_event_queue = operation_event_queue or InMemoryOperationEventQueue()
194-
self._operation_response_queue = InMemoryOperationEventQueue()
195-
self._async_operations = async_operations or ServerAsyncOperationManager(
196-
operation_request_queue=self._operation_event_queue,
197-
operation_response_queue=self._operation_response_queue,
198-
)
191+
self._async_operations = async_operations or ServerAsyncOperationManager()
199192

200193
self._mcp_server = MCPServer(
201194
name=name or "FastMCP",
202195
instructions=instructions,
203196
website_url=website_url,
204197
icons=icons,
205198
async_operations=self._async_operations,
206-
operation_request_queue=self._operation_event_queue,
207-
operation_response_queue=self._operation_response_queue,
208199
# TODO(Marcelo): It seems there's a type mismatch between the lifespan type from an FastMCP and Server.
209200
# We need to create a Lifespan type that is a generic on the server type, like Starlette does.
210201
lifespan=(lifespan_wrapper(self, self.settings.lifespan) if self.settings.lifespan else default_lifespan), # type: ignore
@@ -228,7 +219,6 @@ def __init__( # noqa: PLR0913
228219
if auth_server_provider and not token_verifier:
229220
self._token_verifier = ProviderTokenVerifier(auth_server_provider)
230221
self._event_store = event_store
231-
self._operation_event_queue = operation_event_queue
232222
self._custom_starlette_routes: list[Route] = []
233223
self.dependencies = self.settings.dependencies
234224
self._session_manager: StreamableHTTPSessionManager | None = None

src/mcp/server/lowlevel/server.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ async def main():
9595
from mcp.types import NEXT_PROTOCOL_VERSION, Operation, RequestId
9696

9797
if TYPE_CHECKING:
98-
from mcp.shared.async_operations import OperationEventQueue, ServerAsyncOperationManager
98+
from mcp.shared.async_operations import ServerAsyncOperationManager
9999

100100
logger = logging.getLogger(__name__)
101101

@@ -145,8 +145,6 @@ def __init__(
145145
website_url: str | None = None,
146146
icons: list[types.Icon] | None = None,
147147
async_operations: ServerAsyncOperationManager | None = None,
148-
operation_request_queue: OperationEventQueue | None = None,
149-
operation_response_queue: OperationEventQueue | None = None,
150148
lifespan: Callable[
151149
[Server[LifespanResultT, RequestT]],
152150
AbstractAsyncContextManager[LifespanResultT],
@@ -160,10 +158,7 @@ def __init__(
160158
self.website_url = website_url
161159
self.icons = icons
162160
self.lifespan = lifespan
163-
self.async_operations = async_operations or ServerAsyncOperationManager(
164-
operation_request_queue=operation_request_queue,
165-
operation_response_queue=operation_response_queue,
166-
)
161+
self.async_operations = async_operations or ServerAsyncOperationManager()
167162
self.async_operations.set_handler(self._execute_tool_async)
168163
# Track request ID to operation token mapping for cancellation
169164
self._request_to_operation: dict[RequestId, str] = {}

0 commit comments

Comments
 (0)