Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies = [

[project.optional-dependencies]
community = [
"fastmcp>=2.7.0,!=2.9.*",
"fastmcp==3.0.0b1",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't pin version here, right?

]
dev = [
"pytest>=7.0.0",
Expand Down
114 changes: 69 additions & 45 deletions src/mcpcat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,23 @@
from typing import Any

from mcpcat.modules.overrides.mcp_server import override_lowlevel_mcp_server
from mcpcat.modules.session import (
get_session_info,
new_session_id,
)
from mcpcat.modules.session import get_session_info, new_session_id

from .modules.compatibility import (
is_community_fastmcp_server,
COMPATIBILITY_ERROR_MESSAGE,
is_community_fastmcp_v2,
is_community_fastmcp_v3,
is_compatible_server,
is_official_fastmcp_server,
COMPATIBILITY_ERROR_MESSAGE,
)
from .modules.internal import set_server_tracking_data
from .modules.logging import write_to_log, set_debug_mode
from .modules.logging import set_debug_mode, write_to_log
from .types import (
IdentifyFunction,
MCPCatData,
MCPCatOptions,
UserIdentity,
IdentifyFunction,
RedactionFunction,
UserIdentity,
)


Expand All @@ -44,87 +42,71 @@ def track(
ValueError: If neither project_id nor exporters are provided
TypeError: If server is not a compatible MCP server instance
"""
# Use default options if not provided
if options is None:
options = MCPCatOptions()

# Update global debug_mode value
set_debug_mode(options.debug_mode)

# Validate configuration
if not project_id and not options.exporters:
raise ValueError(
"Either project_id or exporters must be provided. "
"Use project_id for MCPCat, exporters for telemetry-only mode, or both."
)

# Validate server compatibility
if not is_compatible_server(server):
raise TypeError(COMPATIBILITY_ERROR_MESSAGE)

lowlevel_server = server
is_fastmcp = is_official_fastmcp_server(server) or is_community_fastmcp_server(server)
is_community_v3 = is_community_fastmcp_v3(server)
is_community_v2 = is_community_fastmcp_v2(server)
is_official_fastmcp = is_official_fastmcp_server(server)
is_community_fastmcp = is_community_fastmcp_server(server)
is_fastmcp_v2 = is_official_fastmcp or is_community_v2

if is_fastmcp:
# Determine where to store tracking data:
# - v2 FastMCP servers use server._mcp_server
# - v3 and low-level servers use the server itself
if is_fastmcp_v2:
lowlevel_server = server._mcp_server
else:
lowlevel_server = server

# Initialize telemetry if exporters configured
if options.exporters:
from mcpcat.modules.telemetry import TelemetryManager
from mcpcat.modules.event_queue import set_telemetry_manager
from mcpcat.modules.telemetry import TelemetryManager

telemetry_manager = TelemetryManager(options.exporters)
set_telemetry_manager(telemetry_manager)
write_to_log(f"Telemetry initialized with {len(options.exporters)} exporter(s)")
write_to_log(
f"Telemetry initialized with {len(options.exporters)} exporter(s)"
)

# Create and store tracking data
session_id = new_session_id()
session_info = get_session_info(lowlevel_server)
data = MCPCatData(
session_id=session_id,
project_id=project_id,
last_activity=datetime.now(timezone.utc),
session_info=session_info,
identified_sessions=dict(),
identified_sessions={},
options=options,
)
set_server_tracking_data(lowlevel_server, data)

try:
# Always initialize dynamic tracking for complete tool coverage
from mcpcat.modules.overrides.official.monkey_patch import apply_official_fastmcp_patches

# Initialize the dynamic tracking system by setting the flag
if not data.tracker_initialized:
data.tracker_initialized = True
write_to_log(
f"Dynamic tracking initialized for server {id(lowlevel_server)}"
)

# Apply appropriate tracking method based on server type
if is_official_fastmcp:
# For FastMCP servers, use monkey-patching for tool tracking
apply_official_fastmcp_patches(server, data)
# Only apply minimal overrides for non-tool events (like initialize, list_tools display)
from mcpcat.modules.overrides.mcp_server import (
override_lowlevel_mcp_server_minimal,
)

override_lowlevel_mcp_server_minimal(lowlevel_server, data)
elif is_community_fastmcp:
# For community FastMCP servers, use community-specific patches
from mcpcat.modules.overrides.community.monkey_patch import patch_community_fastmcp
patch_community_fastmcp(server)
write_to_log(f"Applied community FastMCP patches for server {id(server)}")
else:
# For low-level servers, use the traditional overrides (no monkey patching needed)
override_lowlevel_mcp_server(lowlevel_server, data)
_apply_server_tracking(
server, lowlevel_server, data,
is_community_v3, is_official_fastmcp, is_community_v2
)

if project_id:
write_to_log(
f"MCPCat initialized with dynamic tracking for session {session_id} on project {project_id}"
f"MCPCat initialized with dynamic tracking for session "
f"{session_id} on project {project_id}"
)
else:
write_to_log(
Expand All @@ -137,6 +119,48 @@ def track(
return server


def _apply_server_tracking(
server: Any,
lowlevel_server: Any,
data: MCPCatData,
is_community_v3: bool,
is_official_fastmcp: bool,
is_community_v2: bool,
) -> None:
"""Apply the appropriate tracking method based on server type."""
if is_community_v3:
from mcpcat.modules.overrides.community_v3.integration import (
apply_community_v3_integration,
)

apply_community_v3_integration(server, data)
write_to_log(
f"Applied Community FastMCP v3 middleware for server {id(server)}"
)

elif is_official_fastmcp:
from mcpcat.modules.overrides.mcp_server import (
override_lowlevel_mcp_server_minimal,
)
from mcpcat.modules.overrides.official.monkey_patch import (
apply_official_fastmcp_patches,
)

apply_official_fastmcp_patches(server, data)
override_lowlevel_mcp_server_minimal(lowlevel_server, data)

elif is_community_v2:
from mcpcat.modules.overrides.community.monkey_patch import (
patch_community_fastmcp,
)

patch_community_fastmcp(server)
write_to_log(f"Applied Community FastMCP v2 patches for server {id(server)}")

else:
override_lowlevel_mcp_server(lowlevel_server, data)


__all__ = [
# Main API
"track",
Expand Down
Loading