From ce7877594c71adf33a8919d0b671acd1bae52f05 Mon Sep 17 00:00:00 2001 From: Eitan Yarmush Date: Sun, 30 Mar 2025 13:01:04 +0000 Subject: [PATCH 01/11] feat: created transport trait and impl --- crates/rmcp/src/transport/sse.rs | 90 +++++++++++++++++++++++++- examples/servers/src/common/counter.rs | 15 ++--- 2 files changed, 96 insertions(+), 9 deletions(-) diff --git a/crates/rmcp/src/transport/sse.rs b/crates/rmcp/src/transport/sse.rs index 6854e370..39fd28f6 100644 --- a/crates/rmcp/src/transport/sse.rs +++ b/crates/rmcp/src/transport/sse.rs @@ -60,6 +60,94 @@ impl Default for SseTransportRetryConfig { } } +pub trait SseClient: Send + Sync { + fn connect( + &self, + last_event_id: Option, + ) -> BoxFuture<'static, Result>, SseTransportError>>; + + fn post(&self, session_id: &str, message: ClientJsonRpcMessage) -> impl Future>; +} + +pub struct RetryConfig { + pub max_times: Option, + pub min_duration: Duration, +} + +pub struct ReqwestSseClient { + http_client: HttpClient, + sse_url: Url, +} +impl ReqwestSseClient { + pub fn new(url: U) -> Result + where + U: IntoUrl, + { + let url = url.into_url()?; + Ok(Self { http_client: HttpClient::default(), sse_url: url }) + } + + pub async fn new_with_timeout(url: U, timeout: Duration) -> Result + where + U: IntoUrl, + { + let mut client = HttpClient::builder(); + client = client.timeout(timeout); + let client = client.build()?; + let url = url.into_url()?; + Ok(Self { http_client: client, sse_url: url }) + } +} + +impl SseClient for ReqwestSseClient { + fn connect( + &self, + last_event_id: Option, + ) -> BoxFuture<'static, Result>, SseTransportError>> + { + let client = self.http_client.clone(); + let sse_url = self.sse_url.as_ref().to_string(); + let last_event_id = last_event_id.clone(); + let fut = async move { + let mut request_builder = client.get(&sse_url).header(ACCEPT, MIME_TYPE); + if let Some(last_event_id) = last_event_id { + request_builder = request_builder.header(HEADER_LAST_EVENT_ID, last_event_id); + } + let response = request_builder.send().await?; + let response = response.error_for_status()?; + match response.headers().get(reqwest::header::CONTENT_TYPE) { + Some(ct) => { + if ct.as_bytes() != MIME_TYPE.as_bytes() { + return Err(SseTransportError::UnexpectedContentType(Some(ct.clone()))); + } + } + None => { + return Err(SseTransportError::UnexpectedContentType(None)); + } + } + let event_stream = SseStream::from_byte_stream(response.bytes_stream()).boxed(); + Ok(event_stream) + }; + fut.boxed() + } + + fn post(&self, session_id: &str, message: ClientJsonRpcMessage) -> impl Future> { + let client = self.http_client.clone(); + let sse_url = self.sse_url.clone(); + let session_id = session_id.to_string(); + async move { + let uri = sse_url.join(&session_id).map_err(SseTransportError::from)?; + let request_builder = client.post(uri.as_ref()).json(&message); + request_builder + .send() + .await + .and_then(|resp| resp.error_for_status()) + .map_err(SseTransportError::from) + .map(drop) + } + } +} + /// # Transport for client sse /// /// Call [`SseTransport::start`] to create a new transport from url. @@ -159,7 +247,7 @@ impl SseTransport { .map(Duration::from_millis); let config_retry_duration = self.retry_config.min_duration; recommended_retry_duration - .map(|d| d.max(config_retry_duration)) + .map(|d: Duration| d.max(config_retry_duration)) .unwrap_or(config_retry_duration) }; let client = self.http_client.clone(); diff --git a/examples/servers/src/common/counter.rs b/examples/servers/src/common/counter.rs index dbdfb421..aeeccd91 100644 --- a/examples/servers/src/common/counter.rs +++ b/examples/servers/src/common/counter.rs @@ -164,14 +164,13 @@ impl ServerHandler for Counter { match name.as_str() { "example_prompt" => { let message = arguments - .and_then( - |json| - json.get("message") - ?.as_str() - .map(|s| s.to_string())) - .ok_or_else(|| McpError::invalid_params("No message provided to example_prompt", None))?; - - let prompt = format!("This is an example prompt with your message here: '{message}'"); + .and_then(|json| json.get("message")?.as_str().map(|s| s.to_string())) + .ok_or_else(|| { + McpError::invalid_params("No message provided to example_prompt", None) + })?; + + let prompt = + format!("This is an example prompt with your message here: '{message}'"); Ok(GetPromptResult { description: None, messages: vec![PromptMessage { From a0bcc58cd22f4321115baa34ad523e715549e84c Mon Sep 17 00:00:00 2001 From: Eitan Yarmush Date: Sun, 30 Mar 2025 14:14:58 +0000 Subject: [PATCH 02/11] feat(transport): abstract sse transport from the client definition --- crates/rmcp/src/transport/sse.rs | 237 +++++++++++++++---------------- 1 file changed, 111 insertions(+), 126 deletions(-) diff --git a/crates/rmcp/src/transport/sse.rs b/crates/rmcp/src/transport/sse.rs index 39fd28f6..80731864 100644 --- a/crates/rmcp/src/transport/sse.rs +++ b/crates/rmcp/src/transport/sse.rs @@ -14,13 +14,13 @@ const MIME_TYPE: &str = "text/event-stream"; const HEADER_LAST_EVENT_ID: &str = "Last-Event-ID"; #[derive(Error, Debug)] -pub enum SseTransportError { +pub enum SseTransportError { #[error("SSE error: {0}")] Sse(#[from] SseError), #[error("IO error: {0}")] Io(#[from] std::io::Error), - #[error("Reqwest error: {0}")] - Reqwest(#[from] reqwest::Error), + #[error("Transport error: {0}")] + Transport(E), #[error("unexpected end of stream")] UnexpectedEndOfStream, #[error("Url error: {0}")] @@ -29,13 +29,13 @@ pub enum SseTransportError { UnexpectedContentType(Option), } -enum SseTransportState { +enum SseTransportState { Connected(BoxStream<'static, Result>), Retrying { times: usize, fut: BoxFuture< 'static, - Result>, SseTransportError>, + Result>, SseTransportError>, >, }, Fatal { @@ -60,13 +60,23 @@ impl Default for SseTransportRetryConfig { } } -pub trait SseClient: Send + Sync { +impl From for SseTransportError { + fn from(e: reqwest::Error) -> Self { + SseTransportError::Transport(e) + } +} + +pub trait SseClient: Clone + Send + Sync + 'static { fn connect( &self, last_event_id: Option, - ) -> BoxFuture<'static, Result>, SseTransportError>>; + ) -> BoxFuture<'static, Result>, SseTransportError>>; - fn post(&self, session_id: &str, message: ClientJsonRpcMessage) -> impl Future>; + fn post( + &self, + endpoint: &str, + message: ClientJsonRpcMessage, + ) -> BoxFuture<'static, Result<(), SseTransportError>>; } pub struct RetryConfig { @@ -74,20 +84,27 @@ pub struct RetryConfig { pub min_duration: Duration, } +#[derive(Clone)] pub struct ReqwestSseClient { http_client: HttpClient, sse_url: Url, } impl ReqwestSseClient { - pub fn new(url: U) -> Result + pub fn new(url: U) -> Result> where U: IntoUrl, { let url = url.into_url()?; - Ok(Self { http_client: HttpClient::default(), sse_url: url }) + Ok(Self { + http_client: HttpClient::default(), + sse_url: url, + }) } - pub async fn new_with_timeout(url: U, timeout: Duration) -> Result + pub async fn new_with_timeout( + url: U, + timeout: Duration, + ) -> Result> where U: IntoUrl, { @@ -95,16 +112,35 @@ impl ReqwestSseClient { client = client.timeout(timeout); let client = client.build()?; let url = url.into_url()?; - Ok(Self { http_client: client, sse_url: url }) + Ok(Self { + http_client: client, + sse_url: url, + }) + } + + pub async fn new_with_client( + url: U, + client: HttpClient, + ) -> Result> + where + U: IntoUrl, + { + let url = url.into_url()?; + Ok(Self { + http_client: client, + sse_url: url, + }) } } -impl SseClient for ReqwestSseClient { +impl SseClient for ReqwestSseClient { fn connect( &self, last_event_id: Option, - ) -> BoxFuture<'static, Result>, SseTransportError>> - { + ) -> BoxFuture< + 'static, + Result>, SseTransportError>, + > { let client = self.http_client.clone(); let sse_url = self.sse_url.as_ref().to_string(); let last_event_id = last_event_id.clone(); @@ -131,20 +167,24 @@ impl SseClient for ReqwestSseClient { fut.boxed() } - fn post(&self, session_id: &str, message: ClientJsonRpcMessage) -> impl Future> { - let client = self.http_client.clone(); - let sse_url = self.sse_url.clone(); - let session_id = session_id.to_string(); - async move { - let uri = sse_url.join(&session_id).map_err(SseTransportError::from)?; - let request_builder = client.post(uri.as_ref()).json(&message); - request_builder - .send() - .await - .and_then(|resp| resp.error_for_status()) - .map_err(SseTransportError::from) - .map(drop) - } + fn post( + &self, + session_id: &str, + message: ClientJsonRpcMessage, + ) -> BoxFuture<'static, Result<(), SseTransportError>> { + let client = self.http_client.clone(); + let sse_url = self.sse_url.clone(); + let session_id = session_id.to_string(); + Box::pin(async move { + let uri = sse_url.join(&session_id).map_err(SseTransportError::from)?; + let request_builder = client.post(uri.as_ref()).json(&message); + request_builder + .send() + .await + .and_then(|resp| resp.error_for_status()) + .map_err(SseTransportError::from) + .map(drop) + }) } } @@ -153,62 +193,35 @@ impl SseClient for ReqwestSseClient { /// Call [`SseTransport::start`] to create a new transport from url. /// /// Call [`SseTransport::start_with_client`] to create a new transport with a customized reqwest client. -pub struct SseTransport { - http_client: HttpClient, - state: SseTransportState, - post_url: Arc, - sse_url: Arc, +pub struct SseTransport, E: std::error::Error + Send + Sync + 'static> { + client: Arc, + state: SseTransportState, last_event_id: Option, recommended_retry_duration_ms: Option, + session_id: String, #[allow(clippy::type_complexity)] - request_queue: VecDeque>>, + request_queue: VecDeque>>>, pub retry_config: SseTransportRetryConfig, } -impl SseTransport { - pub async fn start_with_timeout(url: U, timeout: Duration) -> Result +impl SseTransport { + pub async fn start( + url: U, + ) -> Result, SseTransportError> where U: IntoUrl, { - let mut client = HttpClient::builder(); - client = client.timeout(timeout); - let client = client.build()?; - Self::start_with_client(url, client).await - } - - pub async fn start(url: U) -> Result - where - U: IntoUrl, - { - Self::start_with_client(url, HttpClient::default()).await + let client = ReqwestSseClient::new(url)?; + SseTransport::start_with_client(client).await } +} - /// Start with a reqwest client, this would be helpful when you want to customize the client behavior like default headers or tls stuff. - pub async fn start_with_client(url: U, client: HttpClient) -> Result - where - U: IntoUrl, - { - let url = url.into_url()?; - let response = client - .get(url.clone()) - .header(ACCEPT, MIME_TYPE) - .send() - .await?; - let response = response.error_for_status()?; - match response.headers().get(reqwest::header::CONTENT_TYPE) { - Some(ct) => { - if !ct.as_bytes().starts_with(MIME_TYPE.as_bytes()) { - return Err(SseTransportError::UnexpectedContentType(Some(ct.clone()))); - } - } - None => { - return Err(SseTransportError::UnexpectedContentType(None)); - } - } - let mut event_stream = SseStream::from_byte_stream(response.bytes_stream()).boxed(); +impl, E: std::error::Error + Send + Sync + 'static> SseTransport { + pub async fn start_with_client(client: C) -> Result> { + let mut event_stream = client.connect(None).await?; let mut last_event_id = None; let mut retry = None; - let post_url = loop { + let session_id = loop { let next_event = event_stream .next() .await @@ -223,15 +236,12 @@ impl SseTransport { break next_event.data.unwrap_or_default(); } }; - tracing::info!("will post event on {post_url}"); - let post_url = url.join(&post_url)?; Ok(SseTransport { - http_client: client, + client: Arc::new(client), state: SseTransportState::Connected(Box::pin(event_stream)), - post_url: Arc::from(post_url), last_event_id, recommended_retry_duration_ms: retry, - sse_url: Arc::from(url), + session_id, request_queue: Default::default(), retry_config: Default::default(), }) @@ -239,7 +249,7 @@ impl SseTransport { fn retry_connection( &self, - ) -> BoxFuture<'static, Result>, SseTransportError>> + ) -> BoxFuture<'static, Result>, SseTransportError>> { let retry_duration = { let recommended_retry_duration = self @@ -250,35 +260,13 @@ impl SseTransport { .map(|d: Duration| d.max(config_retry_duration)) .unwrap_or(config_retry_duration) }; - let client = self.http_client.clone(); - let sse_url = self.sse_url.as_ref().clone(); + std::thread::sleep(retry_duration); let last_event_id = self.last_event_id.clone(); - let fut = async move { - tokio::time::sleep(retry_duration).await; - let mut request_builder = client.get(sse_url).header(ACCEPT, MIME_TYPE); - if let Some(last_event_id) = last_event_id { - request_builder = request_builder.header(HEADER_LAST_EVENT_ID, last_event_id); - } - let response = request_builder.send().await?; - let response = response.error_for_status()?; - match response.headers().get(reqwest::header::CONTENT_TYPE) { - Some(ct) => { - if ct.as_bytes() != MIME_TYPE.as_bytes() { - return Err(SseTransportError::UnexpectedContentType(Some(ct.clone()))); - } - } - None => { - return Err(SseTransportError::UnexpectedContentType(None)); - } - } - let event_stream = SseStream::from_byte_stream(response.bytes_stream()).boxed(); - Ok(event_stream) - }; - fut.boxed() + self.client.connect(last_event_id) } } -impl Stream for SseTransport { +impl, E: std::error::Error + Send + Sync + 'static> Stream for SseTransport { type Item = ServerJsonRpcMessage; fn poll_next( @@ -286,17 +274,16 @@ impl Stream for SseTransport { cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { let retry_config = self.retry_config; - let state = &mut self.state; - match state { + match &mut self.state { SseTransportState::Connected(event_stream) => { let event = std::task::ready!(event_stream.poll_next_unpin(cx)); match event { Some(Ok(event)) => { if let Some(retry) = event.retry { - self.recommended_retry_duration_ms = Some(retry); + self.as_mut().recommended_retry_duration_ms = Some(retry); } if let Some(id) = event.id { - self.last_event_id = Some(id); + self.as_mut().last_event_id = Some(id); } if let Some(data) = event.data { match serde_json::from_str(&data) { @@ -314,7 +301,7 @@ impl Stream for SseTransport { Some(Err(e)) => { tracing::error!(error = %e, "sse event stream encounter an error"); let fut = self.retry_connection(); - self.state = SseTransportState::Retrying { times: 1, fut }; + self.as_mut().state = SseTransportState::Retrying { times: 1, fut }; self.poll_next(cx) } None => std::task::Poll::Ready(None), @@ -324,14 +311,14 @@ impl Stream for SseTransport { let retry_result = std::task::ready!(fut.poll_unpin(cx)); match retry_result { Ok(stream) => { - self.state = SseTransportState::Connected(stream); + self.as_mut().state = SseTransportState::Connected(stream); self.poll_next(cx) } Err(e) => { tracing::warn!(error = %e, "retrying failed"); if let Some(max_retry_times) = retry_config.max_times { if *times >= max_retry_times { - self.state = SseTransportState::Fatal { + self.as_mut().state = SseTransportState::Fatal { reason: format!("retrying failed after {} times: {}", times, e), }; return self.poll_next(cx); @@ -339,7 +326,7 @@ impl Stream for SseTransport { } let times = *times + 1; let fut = self.retry_connection(); - self.state = SseTransportState::Retrying { times, fut }; + self.as_mut().state = SseTransportState::Retrying { times, fut }; self.poll_next(cx) } } @@ -352,17 +339,20 @@ impl Stream for SseTransport { } } -impl Sink for SseTransport { - type Error = SseTransportError; +impl, E: std::error::Error + Send + Sync + 'static> Sink + for SseTransport +{ + type Error = SseTransportError; fn poll_ready( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { const QUEUE_SIZE: usize = 16; - if self.request_queue.len() >= QUEUE_SIZE { + if self.as_mut().request_queue.len() >= QUEUE_SIZE { std::task::ready!( - self.request_queue + self.as_mut() + .request_queue .front_mut() .expect("queue is not empty") .poll_unpin(cx) @@ -376,21 +366,16 @@ impl Sink for SseTransport { mut self: std::pin::Pin<&mut Self>, item: ClientJsonRpcMessage, ) -> Result<(), Self::Error> { - let client = self.http_client.clone(); - let uri = self.post_url.clone(); - let (tx, rx) = tokio::sync::oneshot::channel(); - let request_builder = client.post(uri.as_ref().clone()).json(&item); + let client = self.client.clone(); + let session_id = self.session_id.clone(); + let (tx, rx) = tokio::sync::oneshot::channel(); + let session_id = session_id.clone(); tokio::spawn(async move { - let result = request_builder - .send() - .await - .and_then(|resp| resp.error_for_status()) - .map_err(SseTransportError::from) - .map(drop); + let result = { client.post(&session_id, item).await }; let _ = tx.send(result); }); - self.as_mut().request_queue.push_back(rx); + self.request_queue.push_back(rx); Ok(()) } From cfd9869da5ad83c2f07f276626506d096b37cd67 Mon Sep 17 00:00:00 2001 From: Jefry Dewangga Date: Sun, 30 Mar 2025 16:22:14 +0000 Subject: [PATCH 03/11] test: add test with js server (#65) --- crates/rmcp/tests/test_with_js.rs | 34 +++++++++++++++++++- crates/rmcp/tests/test_with_js/package.json | 1 - crates/rmcp/tests/test_with_js/server.js | 35 +++++++++++++++++++++ 3 files changed, 68 insertions(+), 2 deletions(-) create mode 100644 crates/rmcp/tests/test_with_js/server.js diff --git a/crates/rmcp/tests/test_with_js.rs b/crates/rmcp/tests/test_with_js.rs index 77e5a996..fdc54d98 100644 --- a/crates/rmcp/tests/test_with_js.rs +++ b/crates/rmcp/tests/test_with_js.rs @@ -1,4 +1,7 @@ -use rmcp::transport::sse_server::SseServer; +use rmcp::{ + ServiceExt, + transport::{SseServer, TokioChildProcess}, +}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; mod common; use common::calculator::Calculator; @@ -34,3 +37,32 @@ async fn test_with_js_client() -> anyhow::Result<()> { ct.cancel(); Ok(()) } + +#[tokio::test] +async fn test_with_js_server() -> anyhow::Result<()> { + let _ = tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "debug".to_string().into()), + ) + .with(tracing_subscriber::fmt::layer()) + .try_init(); + tokio::process::Command::new("npm") + .arg("install") + .current_dir("tests/test_with_js") + .spawn()? + .wait() + .await?; + let transport = TokioChildProcess::new( + tokio::process::Command::new("node").arg("tests/test_with_js/server.js"), + )?; + + let client = ().serve(transport).await?; + let resources = client.list_all_resources().await?; + tracing::info!("{:#?}", resources); + let tools = client.list_all_tools().await?; + tracing::info!("{:#?}", tools); + + client.cancel().await?; + Ok(()) +} diff --git a/crates/rmcp/tests/test_with_js/package.json b/crates/rmcp/tests/test_with_js/package.json index 19ee4749..6dee815c 100644 --- a/crates/rmcp/tests/test_with_js/package.json +++ b/crates/rmcp/tests/test_with_js/package.json @@ -6,7 +6,6 @@ "name": "test_with_ts", "version": "1.0.0", "main": "index.js", - "devDependencies": {}, "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, diff --git a/crates/rmcp/tests/test_with_js/server.js b/crates/rmcp/tests/test_with_js/server.js new file mode 100644 index 00000000..c128340f --- /dev/null +++ b/crates/rmcp/tests/test_with_js/server.js @@ -0,0 +1,35 @@ +import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; +import { z } from "zod"; + +const server = new McpServer({ + name: "Demo", + version: "1.0.0" +}); + +server.resource( + "greeting", + new ResourceTemplate("greeting://{name}", { list: undefined }), + async (uri, { name }) => ({ + contents: [{ + uri: uri.href, + text: `Hello, ${name}` + }] + }) +); + +server.tool( + "add", + { a: z.number(), b: z.number() }, + async ({ a, b }) => ({ + "content": [ + { + "type": "text", + "text": `${a + b}` + } + ] + }) +); + +const transport = new StdioServerTransport(); +await server.connect(transport); From 5f35c0d58cb0bd6477f436b9fea86e19050e3efc Mon Sep 17 00:00:00 2001 From: Eitan Yarmush Date: Mon, 31 Mar 2025 04:09:46 +0000 Subject: [PATCH 04/11] fix: use prefix to check mime type --- crates/rmcp/src/transport/sse.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/rmcp/src/transport/sse.rs b/crates/rmcp/src/transport/sse.rs index 80731864..c3fee821 100644 --- a/crates/rmcp/src/transport/sse.rs +++ b/crates/rmcp/src/transport/sse.rs @@ -153,7 +153,7 @@ impl SseClient for ReqwestSseClient { let response = response.error_for_status()?; match response.headers().get(reqwest::header::CONTENT_TYPE) { Some(ct) => { - if ct.as_bytes() != MIME_TYPE.as_bytes() { + if !ct.as_bytes().starts_with(MIME_TYPE.as_bytes()) { return Err(SseTransportError::UnexpectedContentType(Some(ct.clone()))); } } @@ -190,7 +190,7 @@ impl SseClient for ReqwestSseClient { /// # Transport for client sse /// -/// Call [`SseTransport::start`] to create a new transport from url. +/// Call [`SseTransport::start`] to create a new transport from url. /// /// Call [`SseTransport::start_with_client`] to create a new transport with a customized reqwest client. pub struct SseTransport, E: std::error::Error + Send + Sync + 'static> { From 9a05599d5763f6d0d62f3de551493e59958467c7 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Mon, 31 Mar 2025 13:05:35 +0800 Subject: [PATCH 05/11] fix(test): fix tool deserialization error (#68) --- Cargo.toml | 2 +- crates/rmcp-macros/Cargo.toml | 2 +- crates/rmcp-macros/src/tool.rs | 2 +- crates/rmcp/Cargo.toml | 4 +-- crates/rmcp/src/model/tool.rs | 5 ++-- crates/rmcp/tests/test_deserialization.rs | 15 ++++++++++ .../tool_list_result.json | 28 +++++++++++++++++++ crates/rmcp/tests/test_with_js.rs | 4 +-- examples/clients/Cargo.toml | 2 +- examples/rig-integration/Cargo.toml | 2 -- examples/rig-integration/src/mcp_adaptor.rs | 25 +++++------------ examples/servers/Cargo.toml | 2 +- examples/transport/Cargo.toml | 2 -- examples/wasi/Cargo.toml | 2 +- 14 files changed, 63 insertions(+), 34 deletions(-) create mode 100644 crates/rmcp/tests/test_deserialization.rs create mode 100644 crates/rmcp/tests/test_deserialization/tool_list_result.json diff --git a/Cargo.toml b/Cargo.toml index 1faa043a..317e479b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,4 @@ -cargo-features = ["edition2024"] + [workspace] members = ["crates/rmcp", "crates/rmcp-macros", "examples/*"] diff --git a/crates/rmcp-macros/Cargo.toml b/crates/rmcp-macros/Cargo.toml index 9a277ffe..3f46bdf1 100644 --- a/crates/rmcp-macros/Cargo.toml +++ b/crates/rmcp-macros/Cargo.toml @@ -1,4 +1,4 @@ -cargo-features = ["edition2024"] + [package] name = "rmcp-macros" diff --git a/crates/rmcp-macros/src/tool.rs b/crates/rmcp-macros/src/tool.rs index 09b124df..feed5350 100644 --- a/crates/rmcp-macros/src/tool.rs +++ b/crates/rmcp-macros/src/tool.rs @@ -347,7 +347,7 @@ pub(crate) fn tool_fn_item(attr: TokenStream, mut input_fn: ItemFn) -> syn::Resu #input_fn_vis fn #tool_attr_fn_ident() -> rmcp::model::Tool { rmcp::model::Tool { name: #name.into(), - description: #description.into(), + description: Some(#description.into()), input_schema: #schema.into(), } } diff --git a/crates/rmcp/Cargo.toml b/crates/rmcp/Cargo.toml index ed426a0f..e8b95120 100644 --- a/crates/rmcp/Cargo.toml +++ b/crates/rmcp/Cargo.toml @@ -1,4 +1,4 @@ -cargo-features = ["edition2024"] + [package] name = "rmcp" @@ -95,5 +95,5 @@ path = "tests/test_with_python.rs" [[test]] name = "test_with_js" -required-features = ["server", "transport-sse-server"] +required-features = ["server", "client", "transport-sse-server", "transport-child-process"] path = "tests/test_with_js.rs" \ No newline at end of file diff --git a/crates/rmcp/src/model/tool.rs b/crates/rmcp/src/model/tool.rs index 864a30aa..cb3e359b 100644 --- a/crates/rmcp/src/model/tool.rs +++ b/crates/rmcp/src/model/tool.rs @@ -14,7 +14,8 @@ pub struct Tool { /// The name of the tool pub name: Cow<'static, str>, /// A description of what the tool does - pub description: Cow<'static, str>, + #[serde(skip_serializing_if = "Option::is_none")] + pub description: Option>, /// A JSON Schema object defining the expected parameters for the tool pub input_schema: Arc, } @@ -29,7 +30,7 @@ impl Tool { { Tool { name: name.into(), - description: description.into(), + description: Some(description.into()), input_schema: input_schema.into(), } } diff --git a/crates/rmcp/tests/test_deserialization.rs b/crates/rmcp/tests/test_deserialization.rs new file mode 100644 index 00000000..73621f48 --- /dev/null +++ b/crates/rmcp/tests/test_deserialization.rs @@ -0,0 +1,15 @@ +use rmcp::model::{JsonRpcResponse, ServerJsonRpcMessage, ServerResult}; +#[test] +fn test_tool_list_result() { + let json = std::fs::read("tests/test_deserialization/tool_list_result.json").unwrap(); + let result: ServerJsonRpcMessage = serde_json::from_slice(&json).unwrap(); + println!("{result:#?}"); + + assert!(matches!( + result, + ServerJsonRpcMessage::Response(JsonRpcResponse { + result: ServerResult::ListToolsResult(_), + .. + }) + )); +} diff --git a/crates/rmcp/tests/test_deserialization/tool_list_result.json b/crates/rmcp/tests/test_deserialization/tool_list_result.json new file mode 100644 index 00000000..674fdc05 --- /dev/null +++ b/crates/rmcp/tests/test_deserialization/tool_list_result.json @@ -0,0 +1,28 @@ +{ + "result": { + "tools": [ + { + "name": "add", + "inputSchema": { + "type": "object", + "properties": { + "a": { + "type": "number" + }, + "b": { + "type": "number" + } + }, + "required": [ + "a", + "b" + ], + "additionalProperties": false, + "$schema": "http://json-schema.org/draft-07/schema#" + } + } + ] + }, + "jsonrpc": "2.0", + "id": 2 +} \ No newline at end of file diff --git a/crates/rmcp/tests/test_with_js.rs b/crates/rmcp/tests/test_with_js.rs index fdc54d98..f5431bee 100644 --- a/crates/rmcp/tests/test_with_js.rs +++ b/crates/rmcp/tests/test_with_js.rs @@ -10,13 +10,13 @@ const BIND_ADDRESS: &str = "127.0.0.1:8000"; #[tokio::test] async fn test_with_js_client() -> anyhow::Result<()> { - tracing_subscriber::registry() + let _ = tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| "debug".to_string().into()), ) .with(tracing_subscriber::fmt::layer()) - .init(); + .try_init(); tokio::process::Command::new("npm") .arg("install") .current_dir("tests/test_with_js") diff --git a/examples/clients/Cargo.toml b/examples/clients/Cargo.toml index d9c5640f..1cec3a2b 100644 --- a/examples/clients/Cargo.toml +++ b/examples/clients/Cargo.toml @@ -1,4 +1,4 @@ -cargo-features = ["edition2024"] + [package] name = "mcp-client-examples" diff --git a/examples/rig-integration/Cargo.toml b/examples/rig-integration/Cargo.toml index 9c21eb7c..e630bfa0 100644 --- a/examples/rig-integration/Cargo.toml +++ b/examples/rig-integration/Cargo.toml @@ -1,5 +1,3 @@ -cargo-features = ["edition2024"] - [package] name = "rig-integration" edition = { workspace = true } diff --git a/examples/rig-integration/src/mcp_adaptor.rs b/examples/rig-integration/src/mcp_adaptor.rs index 9f2a0dc6..6beb9aa5 100644 --- a/examples/rig-integration/src/mcp_adaptor.rs +++ b/examples/rig-integration/src/mcp_adaptor.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use rig::tool::{ToolDyn as RigTool, ToolEmbeddingDyn, ToolSet}; +use rig::tool::{ToolDyn as RigTool, ToolSet}; use rmcp::{ RoleClient, model::{CallToolRequestParam, CallToolResult, Tool as McpTool}, @@ -24,7 +24,12 @@ impl RigTool for McpToolAdaptor { { Box::pin(std::future::ready(rig::completion::ToolDefinition { name: self.name(), - description: self.tool.description.to_string(), + description: self + .tool + .description + .as_deref() + .unwrap_or_default() + .to_string(), parameters: self.tool.schema_as_json_value(), })) } @@ -51,22 +56,6 @@ impl RigTool for McpToolAdaptor { } } -impl ToolEmbeddingDyn for McpToolAdaptor { - fn embedding_docs(&self) -> Vec { - vec![ - self.tool.description.clone().to_string(), - format!("Tool name: {}", self.tool.name), - format!("Tool capability: {}", self.tool.description), - ] - } - - fn context(&self) -> serde_json::Result { - Ok(serde_json::json!({ - "tool_name": self.tool.name, - })) - } -} - pub struct McpManager { pub clients: HashMap>, } diff --git a/examples/servers/Cargo.toml b/examples/servers/Cargo.toml index 4973871b..01569b88 100644 --- a/examples/servers/Cargo.toml +++ b/examples/servers/Cargo.toml @@ -1,4 +1,4 @@ -cargo-features = ["edition2024"] + [package] name = "mcp-server-examples" diff --git a/examples/transport/Cargo.toml b/examples/transport/Cargo.toml index 5c9dc9c0..06708bb9 100644 --- a/examples/transport/Cargo.toml +++ b/examples/transport/Cargo.toml @@ -1,5 +1,3 @@ -cargo-features = ["edition2024"] - [package] name = "transport" edition = { workspace = true } diff --git a/examples/wasi/Cargo.toml b/examples/wasi/Cargo.toml index 4daa1a3e..bf9b948a 100644 --- a/examples/wasi/Cargo.toml +++ b/examples/wasi/Cargo.toml @@ -1,4 +1,4 @@ -cargo-features = ["edition2024"] + [package] name = "wasi" From 0499111b4cbe19c246faf4d5b6d31066f423bdd9 Mon Sep 17 00:00:00 2001 From: jokemanfire Date: Mon, 31 Mar 2025 13:46:31 +0800 Subject: [PATCH 06/11] ci: add documentation generation job (#59) * ci: add documentation generation job 1. add doc ci in workflow 2. remove the readme in rmcp crate Signed-off-by: jokemanfire * docs: fix doc test in README.md 1) fix doc test in readme 2) fix some fmt Signed-off-by: jokemanfire --------- Signed-off-by: jokemanfire --- .github/workflows/ci.yml | 17 ++++ README.md | 10 ++- crates/rmcp/README.md | 183 --------------------------------------- crates/rmcp/src/lib.rs | 1 - 4 files changed, 24 insertions(+), 187 deletions(-) delete mode 100644 crates/rmcp/README.md diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 57e56dbb..a2932cec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -90,6 +90,23 @@ jobs: - name: Run tests run: cargo test --all-features + doc: + name: Generate Documentation + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - uses: Swatinem/rust-cache@v2 + + - name: Generate documentation + run: | + cargo doc --no-deps -p rmcp -p rmcp-macros + env: + RUSTDOCFLAGS: -Dwarnings + release: name: Release crates runs-on: ubuntu-latest diff --git a/README.md b/README.md index 7612347f..e682c725 100644 --- a/README.md +++ b/README.md @@ -24,9 +24,13 @@ Start a client in one line: use rmcp::{ServiceExt, transport::TokioChildProcess}; use tokio::process::Command; -let client = ().serve( - TokioChildProcess::new(Command::new("npx").arg("-y").arg("@modelcontextprotocol/server-everything"))? -).await?; +#[tokio::main] +async fn main() -> Result<(), Box> { + let client = ().serve( + TokioChildProcess::new(Command::new("npx").arg("-y").arg("@modelcontextprotocol/server-everything"))? + ).await?; + Ok(()) +} ``` #### 1. Build a transport diff --git a/crates/rmcp/README.md b/crates/rmcp/README.md deleted file mode 100644 index 80b15e20..00000000 --- a/crates/rmcp/README.md +++ /dev/null @@ -1,183 +0,0 @@ -# RMCP -[![Crates.io Version](https://img.shields.io/crates/v/rmcp)](https://crates.io/crates/rmcp) -![Release status](https://github.commodelcontextprotocol/rust-sdk/actions/workflows/release.yml/badge.svg) -[![docs.rs](https://img.shields.io/docsrs/rmcp)](https://docs.rs/rmcp/latest/rmcp) - -A better and clean rust Model Context Protocol SDK implementation with tokio async runtime. - -## Comparing to official SDK - -The [Official SDK](https://github.com/modelcontextprotocol/rust-sdk/pulls) has too much limit and it was originally built for [goose](https://github.com/block/goose) rather than general using purpose. - -All the features listed on specification would be implemented in this crate. And the first and most important thing is, this crate has the correct and intact data [types](crate::model). See it yourself. - -## Usage - -### Import -```toml -rmcp = { version = "0.1", features = ["server"] } -``` - -### Quick start -Start a client in one line: -```rust,ignore -# use rmcp::{ServiceExt, transport::child_process::TokioChildProcess}; -# use tokio::process::Command; - -let client = ().serve( - TokioChildProcess::new(Command::new("npx").arg("-y").arg("@modelcontextprotocol/server-everything"))? -).await?; -``` - - -Start a client in one line: -```rust,ignore -# use rmcp::{ServiceExt, transport::TokioChildProcess}; -# use tokio::process::Command; - -let client = ().serve( - TokioChildProcess::new(Command::new("npx").arg("-y").arg("@modelcontextprotocol/server-everything"))? -).await?; -``` - - -#### 1. Build a transport -The transport type must implemented [`IntoTransport`](crate::transport::IntoTransport) trait, which allow split into a sink and a stream. - -For client, the sink item is [`ClientJsonRpcMessage`](crate::model::ClientJsonRpcMessage) and stream item is [`ServerJsonRpcMessage`](crate::model::ServerJsonRpcMessage) - -For server, the sink item is [`ServerJsonRpcMessage`](crate::model::ServerJsonRpcMessage) and stream item is [`ClientJsonRpcMessage`](crate::model::ClientJsonRpcMessage) - -##### These types is automatically implemented [`IntoTransport`](crate::transport::IntoTransport) trait -1. For type that already implement both [`Sink`](futures::Sink) and [`Stream`](futures::Stream) trait, they are automatically implemented [`IntoTransport`](crate::transport::IntoTransport) trait -2. For tuple of sink `Tx` and stream `Rx`, type `(Tx, Rx)` are automatically implemented [`IntoTransport`](crate::transport::IntoTransport) trait -3. For type that implement both [`tokio::io::AsyncRead`] and [`tokio::io::AsyncWrite`] trait, they are automatically implemented [`IntoTransport`](crate::transport::IntoTransport) trait -4. For tuple of [`tokio::io::AsyncRead`] `R `and [`tokio::io::AsyncWrite`] `W`, type `(R, W)` are automatically implemented [`IntoTransport`](crate::transport::IntoTransport) trait - - -```rust, ignore -use tokio::io::{stdin, stdout}; -let transport = (stdin(), stdout()); -``` - -#### 2. Build a service -You can easily build a service by using [`ServerHandler`](crate::handler::server) or [`ClientHandler`](crate::handler::client). - -```rust, ignore -let service = common::counter::Counter::new(); -``` - -Or if you want to use `tower`, you can [`TowerHandler`] as a adapter. - -You can reference the [server examples](https://github.commodelcontextprotocol/rust-sdk/tree/release/examples/servers). - -#### 3. Serve them together -```rust, ignore -// this call will finish the initialization process -let server = service.serve(transport).await?; -``` - -#### 4. Interact with the server -Once the server is initialized, you can send requests or notifications: - -```rust, ignore -// request -let roots = server.list_roots().await?; - -// or send notification -server.notify_cancelled(...).await?; -``` - -#### 5. Waiting for service shutdown -```rust, ignore -let quit_reason = server.waiting().await?; -// or cancel it -let quit_reason = server.cancel().await?; -``` - -### Use marcos to declaring tool -Use `toolbox` and `tool` macros to create tool quickly. - -Check this [file](https://github.commodelcontextprotocol/rust-sdk/tree/release/examples/servers/src/common/calculator.rs). -```rust, ignore -use rmcp::{ServerHandler, model::ServerInfo, schemars, tool}; - -use super::counter::Counter; - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -pub struct SumRequest { - #[schemars(description = "the left hand side number")] - pub a: i32, - #[schemars(description = "the right hand side number")] - pub b: i32, -} -#[derive(Debug, Clone)] -pub struct Calculator; - -// create a static toolbox to store the tool attributes -#[tool(tool_box)] -impl Calculator { - // async function - #[tool(description = "Calculate the sum of two numbers")] - async fn sum(&self, #[tool(aggr)] SumRequest { a, b }: SumRequest) -> String { - (a + b).to_string() - } - - // sync function - #[tool(description = "Calculate the sum of two numbers")] - fn sub( - &self, - #[tool(param)] - // this macro will transfer the schemars and serde's attributes - #[schemars(description = "the left hand side number")] - a: i32, - #[tool(param)] - #[schemars(description = "the right hand side number")] - b: i32, - ) -> String { - (a - b).to_string() - } -} - -// impl call_tool and list_tool by querying static toolbox -#[tool(tool_box)] -impl ServerHandler for Calculator { - fn get_info(&self) -> ServerInfo { - ServerInfo { - instructions: Some("A simple calculator".into()), - ..Default::default() - } - } -} - -``` -The only thing you should do is to make the function's return type implement `IntoCallToolResult`. - -And you can just implement `IntoContents`, and the return value will be marked as success automatically. - -If you return a type of `Result` where `T` and `E` both implemented `IntoContents`, it's also OK. - -### Manage Multi Services -For many cases you need to manage several service in a collection, you can call `into_dyn` to convert services into the same type. -```rust, ignore -let service = service.into_dyn(); -``` - - -### Examples -See [examples](https://github.commodelcontextprotocol/rust-sdk/tree/release/examples/README.md) - -### Features -- `client`: use client side sdk -- `server`: use server side sdk -- `macros`: macros default -#### Transports -- `transport-io`: Server stdio transport -- `transport-sse-server`: Server SSE transport -- `transport-child-process`: Client stdio transport -- `transport-sse`: Client sse transport - -## Related Resources -- [MCP Specification](https://spec.modelcontextprotocol.io/specification/2024-11-05/) - -- [Schema](https://github.com/modelcontextprotocol/specification/blob/main/schema/2024-11-05/schema.ts) diff --git a/crates/rmcp/src/lib.rs b/crates/rmcp/src/lib.rs index b01ef2cd..08a1908b 100644 --- a/crates/rmcp/src/lib.rs +++ b/crates/rmcp/src/lib.rs @@ -1,4 +1,3 @@ -#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))] mod error; pub use error::Error; From 5e27ab1db743878dd9e4d14cadc035d4063718f7 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Mon, 31 Mar 2025 13:50:27 +0800 Subject: [PATCH 07/11] fix(notification): fix wrongly error report in notification (#70) --- crates/rmcp/Cargo.toml | 8 +- crates/rmcp/src/handler/server.rs | 2 +- crates/rmcp/src/service.rs | 20 +++-- crates/rmcp/tests/test_notification.rs | 101 +++++++++++++++++++++++++ 4 files changed, 121 insertions(+), 10 deletions(-) create mode 100644 crates/rmcp/tests/test_notification.rs diff --git a/crates/rmcp/Cargo.toml b/crates/rmcp/Cargo.toml index e8b95120..8da5ba89 100644 --- a/crates/rmcp/Cargo.toml +++ b/crates/rmcp/Cargo.toml @@ -96,4 +96,10 @@ path = "tests/test_with_python.rs" [[test]] name = "test_with_js" required-features = ["server", "client", "transport-sse-server", "transport-child-process"] -path = "tests/test_with_js.rs" \ No newline at end of file +path = "tests/test_with_js.rs" + +[[test]] +name = "test_notification" +required-features = ["server", "client"] +path = "tests/test_notification.rs" + diff --git a/crates/rmcp/src/handler/server.rs b/crates/rmcp/src/handler/server.rs index 1fa90dd8..9d1a0b35 100644 --- a/crates/rmcp/src/handler/server.rs +++ b/crates/rmcp/src/handler/server.rs @@ -103,7 +103,7 @@ impl Service for H { } #[allow(unused_variables)] -pub trait ServerHandler: Sized + Clone + Send + Sync + 'static { +pub trait ServerHandler: Sized + Send + Sync + 'static { fn ping( &self, context: RequestContext, diff --git a/crates/rmcp/src/service.rs b/crates/rmcp/src/service.rs index e99ef587..9ddb37e7 100644 --- a/crates/rmcp/src/service.rs +++ b/crates/rmcp/src/service.rs @@ -342,10 +342,12 @@ impl Peer { self.tx .send(PeerSinkMessage::Notification(notification, responder)) .await - .map_err(|_m| ServiceError::Transport(std::io::Error::other("disconnected")))?; - receiver - .await - .map_err(|_e| ServiceError::Transport(std::io::Error::other("disconnected")))? + .map_err(|_m| { + ServiceError::Transport(std::io::Error::other("disconnected: receiver dropped")) + })?; + receiver.await.map_err(|_e| { + ServiceError::Transport(std::io::Error::other("disconnected: responder dropped")) + })? } pub async fn send_request(&self, request: R::Req) -> Result { self.send_cancellable_request(request, PeerRequestOptions::no_options()) @@ -578,10 +580,12 @@ where let send_result = sink .send(Message::Notification(notification).into_json_rpc_message()) .await; - if let Err(e) = send_result { - let _ = - responder.send(Err(ServiceError::Transport(std::io::Error::other(e)))); - } + let response = if let Err(e) = send_result { + Err(ServiceError::Transport(std::io::Error::other(e))) + } else { + Ok(()) + }; + let _ = responder.send(response); if let Some(param) = cancellation_param { if let Some(responder) = local_responder_pool.remove(¶m.request_id) { tracing::info!(id = %param.request_id, reason = param.reason, "cancelled"); diff --git a/crates/rmcp/tests/test_notification.rs b/crates/rmcp/tests/test_notification.rs new file mode 100644 index 00000000..4d4c0f6e --- /dev/null +++ b/crates/rmcp/tests/test_notification.rs @@ -0,0 +1,101 @@ +use std::sync::Arc; + +use rmcp::{ + ClientHandler, Peer, RoleClient, ServerHandler, ServiceExt, + model::{ + ResourceUpdatedNotificationParam, ServerCapabilities, ServerInfo, SubscribeRequestParam, + }, +}; +use tokio::sync::Notify; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +pub struct Server {} + +impl ServerHandler for Server { + fn get_info(&self) -> ServerInfo { + ServerInfo { + capabilities: ServerCapabilities::builder() + .enable_resources() + .enable_resources_subscribe() + .enable_resources_list_changed() + .build(), + ..Default::default() + } + } + + async fn subscribe( + &self, + request: rmcp::model::SubscribeRequestParam, + context: rmcp::service::RequestContext, + ) -> Result<(), rmcp::Error> { + let uri = request.uri; + let peer = context.peer; + + tokio::spawn(async move { + let span = tracing::info_span!("subscribe", uri = %uri); + let _enter = span.enter(); + + if let Err(e) = peer + .notify_resource_updated(ResourceUpdatedNotificationParam { uri: uri.clone() }) + .await + { + panic!("Failed to send notification: {}", e); + } + }); + + Ok(()) + } +} + +pub struct Client { + receive_signal: Arc, + peer: Option>, +} + +impl ClientHandler for Client { + async fn on_resource_updated(&self, params: rmcp::model::ResourceUpdatedNotificationParam) { + let uri = params.uri; + tracing::info!("Resource updated: {}", uri); + self.receive_signal.notify_one(); + } + + fn set_peer(&mut self, peer: Peer) { + self.peer.replace(peer); + } + + fn get_peer(&self) -> Option> { + self.peer.clone() + } +} + +#[tokio::test] +async fn test_server_notification() -> anyhow::Result<()> { + let _ = tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "debug".to_string().into()), + ) + .with(tracing_subscriber::fmt::layer()) + .try_init(); + let (server_transport, client_transport) = tokio::io::duplex(4096); + tokio::spawn(async move { + let server = Server {}.serve(server_transport).await?; + server.waiting().await?; + anyhow::Ok(()) + }); + let receive_signal = Arc::new(Notify::new()); + let client = Client { + peer: Default::default(), + receive_signal: receive_signal.clone(), + } + .serve(client_transport) + .await?; + client + .subscribe(SubscribeRequestParam { + uri: "test://test-resource".to_owned(), + }) + .await?; + receive_signal.notified().await; + client.cancel().await?; + Ok(()) +} From 4338c8e4b38c6ca671d795747749ae2ceabc95ed Mon Sep 17 00:00:00 2001 From: Eitan Yarmush Date: Mon, 31 Mar 2025 07:55:02 +0000 Subject: [PATCH 08/11] chore: fix lint errors by creating a type alias --- crates/rmcp/src/transport/sse.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/crates/rmcp/src/transport/sse.rs b/crates/rmcp/src/transport/sse.rs index c3fee821..1b1a632c 100644 --- a/crates/rmcp/src/transport/sse.rs +++ b/crates/rmcp/src/transport/sse.rs @@ -29,14 +29,14 @@ pub enum SseTransportError { UnexpectedContentType(Option), } +type SseStreamFuture = + BoxFuture<'static, Result>, SseTransportError>>; + enum SseTransportState { Connected(BoxStream<'static, Result>), Retrying { times: usize, - fut: BoxFuture< - 'static, - Result>, SseTransportError>, - >, + fut: SseStreamFuture, }, Fatal { reason: String, @@ -70,7 +70,7 @@ pub trait SseClient: Clone + Send + Sync + ' fn connect( &self, last_event_id: Option, - ) -> BoxFuture<'static, Result>, SseTransportError>>; + ) -> SseStreamFuture; fn post( &self, @@ -137,10 +137,7 @@ impl SseClient for ReqwestSseClient { fn connect( &self, last_event_id: Option, - ) -> BoxFuture< - 'static, - Result>, SseTransportError>, - > { + ) -> SseStreamFuture { let client = self.http_client.clone(); let sse_url = self.sse_url.as_ref().to_string(); let last_event_id = last_event_id.clone(); @@ -249,7 +246,7 @@ impl, E: std::error::Error + Send + Sync + 'static> SseTransport fn retry_connection( &self, - ) -> BoxFuture<'static, Result>, SseTransportError>> + ) -> SseStreamFuture { let retry_duration = { let recommended_retry_duration = self From 5eaf2a22ea668e364d68404b2f3424558a738fb0 Mon Sep 17 00:00:00 2001 From: Eitan Yarmush Date: Mon, 31 Mar 2025 08:16:19 +0000 Subject: [PATCH 09/11] chore: resolve conflict --- crates/rmcp/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/rmcp/Cargo.toml b/crates/rmcp/Cargo.toml index 8da5ba89..cbaa0ae0 100644 --- a/crates/rmcp/Cargo.toml +++ b/crates/rmcp/Cargo.toml @@ -102,4 +102,3 @@ path = "tests/test_with_js.rs" name = "test_notification" required-features = ["server", "client"] path = "tests/test_notification.rs" - From 67e477095f3843c01390718bcfe629486a12629b Mon Sep 17 00:00:00 2001 From: Eitan Yarmush Date: Mon, 31 Mar 2025 08:12:12 +0000 Subject: [PATCH 10/11] chore: fix fmt --- crates/rmcp/src/transport/sse.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/crates/rmcp/src/transport/sse.rs b/crates/rmcp/src/transport/sse.rs index 1b1a632c..201f138b 100644 --- a/crates/rmcp/src/transport/sse.rs +++ b/crates/rmcp/src/transport/sse.rs @@ -67,10 +67,7 @@ impl From for SseTransportError { } pub trait SseClient: Clone + Send + Sync + 'static { - fn connect( - &self, - last_event_id: Option, - ) -> SseStreamFuture; + fn connect(&self, last_event_id: Option) -> SseStreamFuture; fn post( &self, @@ -134,10 +131,7 @@ impl ReqwestSseClient { } impl SseClient for ReqwestSseClient { - fn connect( - &self, - last_event_id: Option, - ) -> SseStreamFuture { + fn connect(&self, last_event_id: Option) -> SseStreamFuture { let client = self.http_client.clone(); let sse_url = self.sse_url.as_ref().to_string(); let last_event_id = last_event_id.clone(); @@ -244,10 +238,7 @@ impl, E: std::error::Error + Send + Sync + 'static> SseTransport }) } - fn retry_connection( - &self, - ) -> SseStreamFuture - { + fn retry_connection(&self) -> SseStreamFuture { let retry_duration = { let recommended_retry_duration = self .recommended_retry_duration_ms From 7c23c60508390f6bb79810d46ce2b6c0bba12f61 Mon Sep 17 00:00:00 2001 From: Eitan Yarmush Date: Mon, 31 Mar 2025 08:17:55 +0000 Subject: [PATCH 11/11] chore: revert --- crates/rmcp/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/rmcp/Cargo.toml b/crates/rmcp/Cargo.toml index cbaa0ae0..8da5ba89 100644 --- a/crates/rmcp/Cargo.toml +++ b/crates/rmcp/Cargo.toml @@ -102,3 +102,4 @@ path = "tests/test_with_js.rs" name = "test_notification" required-features = ["server", "client"] path = "tests/test_notification.rs" +