Skip to content

Commit 305b051

Browse files
author
Lasim
committed
feat(all): add SSE endpoint for streaming MCP client activity
1 parent 9e75a89 commit 305b051

File tree

8 files changed

+513
-66
lines changed

8 files changed

+513
-66
lines changed

services/backend/api-spec.json

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4033,6 +4033,65 @@
40334033
}
40344034
}
40354035
},
4036+
"/api/users/me/mcp/client-activity/stream": {
4037+
"get": {
4038+
"summary": "Stream MCP client activity via SSE",
4039+
"tags": [
4040+
"Users",
4041+
"MCP"
4042+
],
4043+
"description": "Real-time stream of MCP client activity using Server-Sent Events. Pushes updates every 10 seconds.",
4044+
"parameters": [
4045+
{
4046+
"schema": {
4047+
"type": "string",
4048+
"minLength": 1
4049+
},
4050+
"in": "query",
4051+
"name": "team_id",
4052+
"required": true,
4053+
"description": "Team ID to filter activity by"
4054+
},
4055+
{
4056+
"schema": {
4057+
"type": "integer",
4058+
"minimum": 1,
4059+
"maximum": 100,
4060+
"default": 20
4061+
},
4062+
"in": "query",
4063+
"name": "limit",
4064+
"required": false,
4065+
"description": "Number of results (1-100)"
4066+
},
4067+
{
4068+
"schema": {
4069+
"type": "integer",
4070+
"minimum": 1,
4071+
"maximum": 1440,
4072+
"default": 30
4073+
},
4074+
"in": "query",
4075+
"name": "active_within_minutes",
4076+
"required": false,
4077+
"description": "Show clients active within N minutes (1-1440)"
4078+
}
4079+
],
4080+
"security": [
4081+
{
4082+
"cookieAuth": []
4083+
},
4084+
{
4085+
"bearerAuth": []
4086+
}
4087+
],
4088+
"responses": {
4089+
"200": {
4090+
"description": "Default Response"
4091+
}
4092+
}
4093+
}
4094+
},
40364095
"/api/users/me/preferences": {
40374096
"get": {
40384097
"summary": "Get user preferences",

services/backend/api-spec.yaml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2817,6 +2817,46 @@ paths:
28172817
- success
28182818
- error
28192819
description: Internal Server Error
2820+
/api/users/me/mcp/client-activity/stream:
2821+
get:
2822+
summary: Stream MCP client activity via SSE
2823+
tags:
2824+
- Users
2825+
- MCP
2826+
description: Real-time stream of MCP client activity using Server-Sent Events.
2827+
Pushes updates every 10 seconds.
2828+
parameters:
2829+
- schema:
2830+
type: string
2831+
minLength: 1
2832+
in: query
2833+
name: team_id
2834+
required: true
2835+
description: Team ID to filter activity by
2836+
- schema:
2837+
type: integer
2838+
minimum: 1
2839+
maximum: 100
2840+
default: 20
2841+
in: query
2842+
name: limit
2843+
required: false
2844+
description: Number of results (1-100)
2845+
- schema:
2846+
type: integer
2847+
minimum: 1
2848+
maximum: 1440
2849+
default: 30
2850+
in: query
2851+
name: active_within_minutes
2852+
required: false
2853+
description: Show clients active within N minutes (1-1440)
2854+
security:
2855+
- cookieAuth: []
2856+
- bearerAuth: []
2857+
responses:
2858+
"200":
2859+
description: Default Response
28202860
/api/users/me/preferences:
28212861
get:
28222862
summary: Get user preferences
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
import type { FastifyInstance } from 'fastify';
2+
import { getDb, getSchema } from '../../db';
3+
import { eq, and, gt, sql } from 'drizzle-orm';
4+
import { requireAuthenticationAny } from '../../middleware/oauthMiddleware';
5+
6+
const QUERY_PARAMS_SCHEMA = {
7+
type: 'object',
8+
properties: {
9+
team_id: {
10+
type: 'string',
11+
minLength: 1,
12+
description: 'Team ID to filter activity by'
13+
},
14+
limit: {
15+
type: 'integer',
16+
minimum: 1,
17+
maximum: 100,
18+
default: 20,
19+
description: 'Number of results (1-100)'
20+
},
21+
active_within_minutes: {
22+
type: 'integer',
23+
minimum: 1,
24+
maximum: 1440,
25+
default: 30,
26+
description: 'Show clients active within N minutes (1-1440)'
27+
}
28+
},
29+
required: ['team_id'],
30+
additionalProperties: false
31+
} as const;
32+
33+
interface QueryParams {
34+
team_id: string;
35+
limit?: number;
36+
active_within_minutes?: number;
37+
}
38+
39+
interface ActivityItem {
40+
id: string;
41+
client_name: string | null;
42+
satellite: {
43+
id: string;
44+
name: string;
45+
};
46+
last_activity_at: string;
47+
total_requests: number;
48+
total_tool_calls: number;
49+
user_agent: string | null;
50+
first_seen_at: string;
51+
}
52+
53+
interface ActivityRecord {
54+
id: string;
55+
client_name: string | null;
56+
user_agent: string | null;
57+
last_activity_at: Date;
58+
first_seen_at: Date;
59+
total_requests: number;
60+
total_tool_calls: number;
61+
satellite_id: string;
62+
satellite_name: string;
63+
}
64+
65+
async function fetchClientActivity(
66+
userId: string,
67+
teamId: string,
68+
limit: number,
69+
activeWithinMinutes: number
70+
): Promise<ActivityItem[]> {
71+
const db = getDb();
72+
const { mcpClientActivity, satellites } = getSchema();
73+
74+
const cutoffTime = new Date(Date.now() - activeWithinMinutes * 60 * 1000);
75+
76+
const activityRecords = await db
77+
.select({
78+
id: mcpClientActivity.id,
79+
client_name: mcpClientActivity.client_name,
80+
user_agent: mcpClientActivity.user_agent,
81+
last_activity_at: mcpClientActivity.last_activity_at,
82+
first_seen_at: mcpClientActivity.first_seen_at,
83+
total_requests: mcpClientActivity.total_requests,
84+
total_tool_calls: mcpClientActivity.total_tool_calls,
85+
satellite_id: satellites.id,
86+
satellite_name: satellites.name
87+
})
88+
.from(mcpClientActivity)
89+
.innerJoin(satellites, eq(mcpClientActivity.satellite_id, satellites.id))
90+
.where(
91+
and(
92+
eq(mcpClientActivity.user_id, userId),
93+
eq(mcpClientActivity.team_id, teamId),
94+
gt(mcpClientActivity.last_activity_at, cutoffTime)
95+
)
96+
)
97+
.orderBy(sql`${mcpClientActivity.last_activity_at} DESC`)
98+
.limit(limit);
99+
100+
return activityRecords.map((record: ActivityRecord) => ({
101+
id: record.id,
102+
client_name: record.client_name,
103+
satellite: {
104+
id: record.satellite_id,
105+
name: record.satellite_name
106+
},
107+
last_activity_at: record.last_activity_at.toISOString(),
108+
total_requests: record.total_requests,
109+
total_tool_calls: record.total_tool_calls,
110+
user_agent: record.user_agent,
111+
first_seen_at: record.first_seen_at.toISOString()
112+
}));
113+
}
114+
115+
export default async function getMcpClientActivityStreamRoute(server: FastifyInstance) {
116+
server.get('/users/me/mcp/client-activity/stream', {
117+
sse: true,
118+
preValidation: [requireAuthenticationAny()],
119+
schema: {
120+
tags: ['Users', 'MCP'],
121+
summary: 'Stream MCP client activity via SSE',
122+
description: 'Real-time stream of MCP client activity using Server-Sent Events. Pushes updates every 10 seconds.',
123+
security: [
124+
{ cookieAuth: [] },
125+
{ bearerAuth: [] }
126+
],
127+
querystring: QUERY_PARAMS_SCHEMA
128+
}
129+
}, async (request, reply) => {
130+
const userId = request.user!.id;
131+
const query = request.query as QueryParams;
132+
const teamId = query.team_id;
133+
const limit = query.limit || 20;
134+
const activeWithinMinutes = query.active_within_minutes || 30;
135+
136+
let updateInterval: NodeJS.Timeout | null = null;
137+
let lastDataHash = '';
138+
139+
// Keep connection open
140+
reply.sse.keepAlive();
141+
142+
// Send initial data
143+
try {
144+
const activities = await fetchClientActivity(userId, teamId, limit, activeWithinMinutes);
145+
lastDataHash = JSON.stringify(activities);
146+
147+
reply.sse.send({
148+
event: 'client_activity',
149+
data: { activities }
150+
});
151+
} catch (error) {
152+
server.log.error(error, 'SSE: Error fetching initial client activity');
153+
reply.sse.send({
154+
event: 'error',
155+
data: { error: 'Failed to fetch client activity' }
156+
});
157+
}
158+
159+
// Set up periodic updates (every 10 seconds)
160+
updateInterval = setInterval(async () => {
161+
if (!reply.sse.isConnected) {
162+
if (updateInterval) clearInterval(updateInterval);
163+
return;
164+
}
165+
166+
try {
167+
const activities = await fetchClientActivity(userId, teamId, limit, activeWithinMinutes);
168+
const currentHash = JSON.stringify(activities);
169+
170+
// Only send if data changed
171+
if (currentHash !== lastDataHash) {
172+
lastDataHash = currentHash;
173+
reply.sse.send({
174+
event: 'client_activity',
175+
data: { activities }
176+
});
177+
}
178+
} catch (error) {
179+
server.log.error(error, 'SSE: Error fetching client activity update');
180+
}
181+
}, 10000);
182+
183+
// Cleanup on disconnect
184+
reply.sse.onClose(() => {
185+
if (updateInterval) {
186+
clearInterval(updateInterval);
187+
updateInterval = null;
188+
}
189+
server.log.debug({ userId, teamId }, 'SSE: Client activity stream closed');
190+
});
191+
});
192+
}

services/backend/src/routes/users/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import getCurrentUserRoute from './getCurrentUser';
1010
import getCurrentUserTeamsRoute from './getCurrentUserTeams';
1111
import getUserTeamsRoute from './getUserTeams';
1212
import getMcpClientActivityRoute from './getMcpClientActivity';
13+
import getMcpClientActivityStreamRoute from './getMcpClientActivityStream';
1314
import preferencesRoutes from './preferences';
1415
import satelliteRoutes from './satellite';
1516
import metricsRoutes from './me/metrics';
@@ -27,6 +28,7 @@ export default async function usersRoute(server: FastifyInstance) {
2728
await server.register(getCurrentUserTeamsRoute);
2829
await server.register(getUserTeamsRoute);
2930
await server.register(getMcpClientActivityRoute);
31+
await server.register(getMcpClientActivityStreamRoute);
3032
await server.register(preferencesRoutes);
3133
await server.register(satelliteRoutes);
3234
await server.register(metricsRoutes, { prefix: '/me/metrics' });

services/backend/tests/unit/fastify/plugins/index.test.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,20 @@ vi.mock('@fastify/rate-limit', () => ({
1717
default: vi.fn()
1818
}))
1919

20+
// Mock @fastify/sse
21+
vi.mock('@fastify/sse', () => ({
22+
default: vi.fn()
23+
}))
24+
2025
import fastifyCors from '@fastify/cors'
2126
import fastifyFormbody from '@fastify/formbody'
2227
import fastifyRateLimit from '@fastify/rate-limit'
28+
import fastifySSE from '@fastify/sse'
2329

2430
const mockFastifyCors = fastifyCors as MockedFunction<typeof fastifyCors>
2531
const mockFastifyFormbody = fastifyFormbody as MockedFunction<typeof fastifyFormbody>
2632
const mockFastifyRateLimit = fastifyRateLimit as MockedFunction<typeof fastifyRateLimit>
33+
const mockFastifySSE = fastifySSE as MockedFunction<typeof fastifySSE>
2734

2835
describe('Fastify Plugins', () => {
2936
let mockServer: FastifyInstance
@@ -62,8 +69,8 @@ describe('Fastify Plugins', () => {
6269
it('should register formbody plugin first, then CORS plugin with default origins', async () => {
6370
await registerFastifyPlugins(mockServer)
6471

65-
// Check that register was called 3 times (formbody, CORS, rate-limit)
66-
expect(mockServer.register).toHaveBeenCalledTimes(3)
72+
// Check that register was called 4 times (formbody, CORS, rate-limit, SSE)
73+
expect(mockServer.register).toHaveBeenCalledTimes(4)
6774

6875
// Check first call - formbody plugin
6976
expect(mockServer.register).toHaveBeenNthCalledWith(1, mockFastifyFormbody)
@@ -86,6 +93,11 @@ describe('Fastify Plugins', () => {
8693
max: 1000,
8794
timeWindow: '1 minute'
8895
})
96+
97+
// Check fourth call - SSE plugin
98+
expect(mockServer.register).toHaveBeenNthCalledWith(4, mockFastifySSE, {
99+
heartbeatInterval: 30000
100+
})
89101
})
90102

91103
it('should include frontend URL from environment variable when provided', async () => {
@@ -177,10 +189,10 @@ describe('Fastify Plugins', () => {
177189
)
178190
})
179191

180-
it('should register all three plugins (formbody, CORS, and rate-limit)', async () => {
192+
it('should register all four plugins (formbody, CORS, rate-limit, and SSE)', async () => {
181193
await registerFastifyPlugins(mockServer)
182194

183-
expect(mockServer.register).toHaveBeenCalledTimes(3)
195+
expect(mockServer.register).toHaveBeenCalledTimes(4)
184196
})
185197

186198
it('should handle plugin registration errors gracefully', async () => {
@@ -246,13 +258,14 @@ describe('Fastify Plugins', () => {
246258
})
247259

248260
describe('Plugin Registration Order', () => {
249-
it('should register plugins in correct order: formbody, CORS, rate-limit', async () => {
261+
it('should register plugins in correct order: formbody, CORS, rate-limit, SSE', async () => {
250262
await registerFastifyPlugins(mockServer)
251263

252264
// Verify the order of plugin registration
253265
expect(mockServer.register).toHaveBeenNthCalledWith(1, mockFastifyFormbody)
254266
expect(mockServer.register).toHaveBeenNthCalledWith(2, mockFastifyCors, expect.any(Object))
255267
expect(mockServer.register).toHaveBeenNthCalledWith(3, mockFastifyRateLimit, expect.any(Object))
268+
expect(mockServer.register).toHaveBeenNthCalledWith(4, mockFastifySSE, expect.any(Object))
256269
})
257270
})
258271

0 commit comments

Comments
 (0)