Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions packages/opencode/src/cli/cmd/tui/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { RouteProvider, useRoute } from "@tui/context/route"
import { Switch, Match, createEffect, untrack, ErrorBoundary, createSignal, onMount, batch, Show, on } from "solid-js"
import { win32DisableProcessedInput, win32FlushInputBuffer, win32InstallCtrlCGuard } from "./win32"
import { Installation } from "@/installation"
import { MCP } from "@/mcp"
import { Flag } from "@/flag/flag"
import { DialogProvider, useDialog } from "@tui/ui/dialog"
import { DialogProvider as DialogProviderList } from "@tui/component/dialog-provider"
Expand Down Expand Up @@ -729,6 +730,58 @@ function App() {
})
})

sdk.event.on(MCP.ResourceUpdated.type, (evt) => {
toast.show({
title: "Resource Updated",
message: `${evt.properties.uri} (${evt.properties.server})`,
variant: "info",
duration: 5000,
})

// If autoprompt is enabled for this server, trigger AI with updated resource info
const mcp = sync.data.config.mcp?.[evt.properties.server]
if (mcp && typeof mcp === "object" && "autoprompt" in mcp && mcp.autoprompt) {
const prompt = {
system: `An MCP resource has been updated. Resource URI: "${evt.properties.uri}" from server "${evt.properties.server}". Read the resource to review the latest content and take appropriate action.`,
parts: [
{
type: "text" as const,
text: `Resource updated: ${evt.properties.uri} (${evt.properties.server})`,
},
],
}
if (route.data.type === "session") {
const status = sync.data.session_status?.[route.data.sessionID]
if (!status || status.type === "idle") {
sdk.client.session
.promptAsync({ sessionID: route.data.sessionID, ...prompt })
.catch((e) => console.error("failed to trigger AI for resource update", e))
}
} else {
sdk.client.session
.create({})
.then((res) => {
const id = res.data?.id
if (!id) return
route.navigate({ type: "session", sessionID: id })
sdk.client.session
.promptAsync({ sessionID: id, ...prompt })
.catch((e) => console.error("failed to trigger AI for resource update", e))
})
.catch((e) => console.error("failed to create session for resource update", e))
}
}
})

sdk.event.on(MCP.ResourceListChanged.type, (evt) => {
toast.show({
title: "MCP Resources Changed",
message: `Server "${evt.properties.server}" resource list updated`,
variant: "info",
duration: 3000,
})
})

return (
<box
width={dimensions().width}
Expand Down
16 changes: 16 additions & 0 deletions packages/opencode/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,14 @@ export namespace Config {
.positive()
.optional()
.describe("Timeout in ms for MCP server requests. Defaults to 5000 (5 seconds) if not specified."),
subscriptions: z
.array(z.string())
.optional()
.describe("Resource URIs to automatically subscribe to for update notifications"),
autoprompt: z
.boolean()
.optional()
.describe("Automatically prompt the AI when a subscribed resource is updated. Defaults to false."),
})
.strict()
.meta({
Expand Down Expand Up @@ -585,6 +593,14 @@ export namespace Config {
.positive()
.optional()
.describe("Timeout in ms for MCP server requests. Defaults to 5000 (5 seconds) if not specified."),
subscriptions: z
.array(z.string())
.optional()
.describe("Resource URIs to automatically subscribe to for update notifications"),
autoprompt: z
.boolean()
.optional()
.describe("Automatically prompt the AI when a subscribed resource is updated. Defaults to false."),
})
.strict()
.meta({
Expand Down
212 changes: 210 additions & 2 deletions packages/opencode/src/mcp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
CallToolResultSchema,
type Tool as MCPToolDef,
ToolListChangedNotificationSchema,
ResourceUpdatedNotificationSchema,
ResourceListChangedNotificationSchema,
} from "@modelcontextprotocol/sdk/types.js"
import { Config } from "../config/config"
import { Log } from "../util/log"
Expand Down Expand Up @@ -54,6 +56,21 @@ export namespace MCP {
}),
)

export const ResourceUpdated = BusEvent.define(
"mcp.resource.updated",
z.object({
server: z.string(),
uri: z.string(),
}),
)

export const ResourceListChanged = BusEvent.define(
"mcp.resource.list.changed",
z.object({
server: z.string(),
}),
)

export const Failed = NamedError.create(
"MCPFailed",
z.object({
Expand Down Expand Up @@ -114,6 +131,45 @@ export namespace MCP {
log.info("tools list changed notification received", { server: serverName })
Bus.publish(ToolsChanged, { server: serverName })
})

client.setNotificationHandler(ResourceUpdatedNotificationSchema, async (notification) => {
const uri = notification.params.uri
log.info("resource updated notification received", { server: serverName, uri })
Bus.publish(ResourceUpdated, { server: serverName, uri })
})

client.setNotificationHandler(ResourceListChangedNotificationSchema, async () => {
log.info("resource list changed notification received", { server: serverName })
Bus.publish(ResourceListChanged, { server: serverName })
// Subscribe to any newly listed resources
if (supportsSubscriptions(client)) {
const s = await state()
const existing = s.subscriptions.get(serverName) ?? new Set()
const listed = await client.listResources().catch((e) => {
log.warn("failed to list resources after list changed", { server: serverName, error: e instanceof Error ? e.message : String(e) })
return undefined
})
listed?.resources
.filter((r) => !existing.has(r.uri))
.forEach((r) => {
existing.add(r.uri)
client
.subscribeResource({ uri: r.uri })
.then(() => {
log.info("subscribed to new resource", { server: serverName, uri: r.uri })
})
.catch((e) => {
existing.delete(r.uri)
log.error("failed to subscribe to new resource", { server: serverName, uri: r.uri, error: e instanceof Error ? e.message : String(e) })
})
})
if (existing.size > 0) s.subscriptions.set(serverName, existing)
}
})
}

function supportsSubscriptions(client: MCPClient): boolean {
return client.getServerCapabilities()?.resources?.subscribe === true
}

// Convert MCP tool definition to AI SDK Tool type
Expand Down Expand Up @@ -190,9 +246,44 @@ export namespace MCP {
}
}),
)
const subscriptionMap = new Map<string, Set<string>>()

// Auto-subscribe to all resources for servers that support subscriptions
await Promise.all(
Object.entries(clients)
.filter(([key]) => status[key]?.status === "connected" && supportsSubscriptions(clients[key]))
.map(async ([key, client]) => {
const mcp = config[key]
const cfg = isMcpConfigured(mcp) && mcp.subscriptions ? mcp.subscriptions : []
const listed = await client.listResources().catch((e) => {
log.warn("failed to list resources for auto-subscribe", { key, error: e instanceof Error ? e.message : String(e) })
return undefined
})
const uris = new Set([...cfg, ...(listed?.resources.map((r) => r.uri) ?? [])])
if (uris.size === 0) return
uris.forEach((uri) => {
client
.subscribeResource({ uri })
.then(() => {
log.info("auto-subscribed to resource", { key, uri })
})
.catch((e) => {
uris.delete(uri)
log.error("failed to auto-subscribe to resource", {
key,
uri,
error: e instanceof Error ? e.message : String(e),
})
})
})
subscriptionMap.set(key, uris)
}),
)

return {
status,
clients,
subscriptions: subscriptionMap,
}
},
async (state) => {
Expand Down Expand Up @@ -548,6 +639,38 @@ export namespace MCP {
})
}
s.clients[name] = result.mcpClient

// Re-subscribe: merge previous subscriptions, config URIs, and freshly listed resources
if (supportsSubscriptions(result.mcpClient)) {
const cfg = isMcpConfigured(mcp) && mcp.subscriptions ? mcp.subscriptions : []
const listed = await result.mcpClient.listResources().catch((e) => {
log.warn("failed to list resources on reconnect", { name, error: e instanceof Error ? e.message : String(e) })
return undefined
})
const tracked = new Set([
...(s.subscriptions.get(name) ?? []),
...cfg,
...(listed?.resources.map((r) => r.uri) ?? []),
])
if (tracked.size > 0) {
tracked.forEach((uri) => {
result.mcpClient!
.subscribeResource({ uri })
.then(() => {
log.info("re-subscribed to resource", { name, uri })
})
.catch((e) => {
tracked.delete(uri)
log.error("failed to re-subscribe to resource", {
name,
uri,
error: e instanceof Error ? e.message : String(e),
})
})
})
s.subscriptions.set(name, tracked)
}
}
}
}

Expand All @@ -560,6 +683,7 @@ export namespace MCP {
})
delete s.clients[name]
}
s.subscriptions.delete(name)
s.status[name] = { status: "disabled" }
}

Expand Down Expand Up @@ -680,7 +804,7 @@ export namespace MCP {
const client = clientsSnapshot[clientName]

if (!client) {
log.warn("client not found for prompt", {
log.warn("client not found for resource", {
clientName: clientName,
})
return undefined
Expand All @@ -691,17 +815,101 @@ export namespace MCP {
uri: resourceUri,
})
.catch((e) => {
log.error("failed to get prompt from MCP server", {
log.error("failed to read resource from MCP server", {
clientName: clientName,
resourceUri: resourceUri,
error: e.message,
})
return undefined
})

// Auto-subscribe to the resource for update notifications (fire-and-forget)
if (result) {
subscribe(clientName, resourceUri).catch(() => {})
}

return result
}

export async function subscribe(clientName: string, uri: string): Promise<boolean> {
const client = (await clients())[clientName]

if (!client) {
log.warn("client not found for subscription", { clientName })
return false
}

if (!supportsSubscriptions(client)) {
log.debug("server does not support resource subscriptions", { clientName })
return false
}

const s = await state()
if (s.subscriptions.get(clientName)?.has(uri)) {
return true // already subscribed
}

return client
.subscribeResource({ uri })
.then(() => {
if (!s.subscriptions.has(clientName)) {
s.subscriptions.set(clientName, new Set())
}
s.subscriptions.get(clientName)!.add(uri)
log.info("subscribed to resource", { clientName, uri })
return true
})
.catch((e) => {
log.error("failed to subscribe to resource", {
clientName,
uri,
error: e instanceof Error ? e.message : String(e),
})
return false
})
}

export async function unsubscribe(clientName: string, uri: string): Promise<boolean> {
const client = (await clients())[clientName]
const s = await state()

// Remove from tracking regardless of whether the server call succeeds
s.subscriptions.get(clientName)?.delete(uri)

if (!client) {
log.warn("client not found for unsubscription", { clientName })
return false
}

if (!supportsSubscriptions(client)) {
return true // already removed from tracking above
}

return client
.unsubscribeResource({ uri })
.then(() => {
log.info("unsubscribed from resource", { clientName, uri })
return true
})
.catch((e) => {
log.error("failed to unsubscribe from resource", {
clientName,
uri,
error: e instanceof Error ? e.message : String(e),
})
return false
})
}

export async function subscriptions(): Promise<Record<string, string[]>> {
const s = await state()
return Object.fromEntries(
[...s.subscriptions]
.filter(([, uris]) => uris.size > 0)
.map(([server, uris]) => [server, [...uris]]),
)
}

/**
* Start OAuth authentication flow for an MCP server.
* Returns the authorization URL that should be opened in a browser.
Expand Down
Loading
Loading