From b2845fc4239ee791c7c465adc970d7ba05e02969 Mon Sep 17 00:00:00 2001 From: Darren Bolduc Date: Mon, 22 Dec 2025 16:40:23 -0500 Subject: [PATCH 1/2] impl(pubsub): StreamingPull builder --- src/pubsub/src/subscriber.rs | 1 + src/pubsub/src/subscriber/builder.rs | 174 +++++++++++++++++++++++++++ 2 files changed, 175 insertions(+) create mode 100644 src/pubsub/src/subscriber/builder.rs diff --git a/src/pubsub/src/subscriber.rs b/src/pubsub/src/subscriber.rs index 92d233a53d..c00fe691bf 100644 --- a/src/pubsub/src/subscriber.rs +++ b/src/pubsub/src/subscriber.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod builder; pub(crate) mod handler; pub(crate) mod keepalive; pub(crate) mod lease_state; diff --git a/src/pubsub/src/subscriber/builder.rs b/src/pubsub/src/subscriber/builder.rs new file mode 100644 index 0000000000..c092842937 --- /dev/null +++ b/src/pubsub/src/subscriber/builder.rs @@ -0,0 +1,174 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::stub::Stub; +use std::sync::Arc; + +/// Builder for the `client::Subscriber::streaming_pull` method. +pub struct StreamingPull +where + S: Stub, +{ + // TODO(#4061) - Use a dynamic stub to remove the generic. + pub(crate) inner: Arc, + pub(crate) subscription: String, + pub(crate) ack_deadline_seconds: i32, + pub(crate) max_outstanding_messages: i64, + pub(crate) max_outstanding_bytes: i64, +} + +impl StreamingPull +where + S: Stub, +{ + pub(crate) fn new(inner: Arc, subscription: String) -> Self { + Self { + inner, + subscription, + ack_deadline_seconds: 10, + max_outstanding_messages: 1000, + max_outstanding_bytes: 100 * 1024 * 1024, + } + } + + /// Sets the ack deadline to use for the stream. + /// + /// This value represents how long the application has to ack or nack an + /// incoming message. Note that this value is independent of the deadline + /// configured on the server-side subscription. + /// + /// The minimum deadline you can specify is 10 seconds. The maximum deadline + /// you can specify is 600 seconds (10 minutes). + /// + /// The default value is 10 seconds. + /// + /// To use the server-side subscription deadline, specify a value of `0`. + /// + /// # Example + /// + /// ```no_rust + /// # use google_cloud_pubsub::client::Subscriber; + /// # async fn sample() -> anyhow::Result<()> { + /// # let client = Subscriber::builder().build().await?; + /// let session = client.streaming_pull("projects/my-project/subscriptions/my-subscription") + /// .set_ack_deadline_seconds(20) + /// .start(); + /// # Ok(()) } + /// ``` + pub fn set_ack_deadline_seconds>(mut self, v: T) -> Self { + self.ack_deadline_seconds = v.into(); + self + } + + /// Flow control settings for the maximum number of outstanding messages. + /// + /// The server will stop sending messages to a client when this many + /// messages are outstanding (i.e. that have not been acked or nacked). + /// + /// The server resumes sending messages when the outstanding message count + /// drops below this value. + /// + /// Use a value <= 0 to set no limit on the number of outstanding messages. + /// + /// # Example + /// + /// ```no_rust + /// # use google_cloud_pubsub::client::Subscriber; + /// # async fn sample() -> anyhow::Result<()> { + /// # let client = Subscriber::builder().build().await?; + /// let session = client.streaming_pull("projects/my-project/subscriptions/my-subscription") + /// .set_max_outstanding_messages(100000) + /// .start(); + /// # Ok(()) } + /// ``` + pub fn set_max_outstanding_messages>(mut self, v: T) -> Self { + self.max_outstanding_messages = v.into(); + self + } + + /// Flow control settings for the maximum number of outstanding bytes. + /// + /// The server will stop sending messages to a client when this many bytes + /// of messages are outstanding (i.e. that have not been acked or nacked). + /// + /// The server resumes sending messages when the outstanding byte count + /// drops below this value. + /// + /// Use a value <= 0 to set no limit on the number of outstanding bytes. + /// + /// # Example + /// + /// ```no_rust + /// # use google_cloud_pubsub::client::Subscriber; + /// # async fn sample() -> anyhow::Result<()> { + /// # let client = Subscriber::builder().build().await?; + /// let session = client.streaming_pull("projects/my-project/subscriptions/my-subscription") + /// .set_max_outstanding_bytes(1024 * 1024 * 1024) + /// .start(); + /// # Ok(()) } + /// ``` + pub fn set_max_outstanding_bytes>(mut self, v: T) -> Self { + self.max_outstanding_bytes = v.into(); + self + } +} + +#[cfg(test)] +mod tests { + use super::super::stub::tests::MockStub; + use super::*; + + #[test] + fn reasonable_defaults() { + let mock = MockStub::new(); + let builder = StreamingPull::new( + Arc::new(mock), + "projects/my-project/subscriptions/my-subscription".to_string(), + ); + assert_eq!( + builder.subscription, + "projects/my-project/subscriptions/my-subscription" + ); + assert_eq!(builder.ack_deadline_seconds, 10); + assert!( + 1000000 > builder.max_outstanding_messages && builder.max_outstanding_messages > 100, + "{}", + builder.max_outstanding_messages + ); + assert!( + builder.max_outstanding_bytes > 100000, + "{}", + builder.max_outstanding_messages + ); + } + + #[test] + fn options() { + let mock = MockStub::new(); + let builder = StreamingPull::new( + Arc::new(mock), + "projects/my-project/subscriptions/my-subscription".to_string(), + ) + .set_ack_deadline_seconds(20) + .set_max_outstanding_messages(12345) + .set_max_outstanding_bytes(6789000); + assert_eq!( + builder.subscription, + "projects/my-project/subscriptions/my-subscription" + ); + assert_eq!(builder.ack_deadline_seconds, 20); + assert_eq!(builder.max_outstanding_messages, 12345); + assert_eq!(builder.max_outstanding_bytes, 6789000); + } +} From 639d81b2e663edea7aea324cd9dd539d8245a3b3 Mon Sep 17 00:00:00 2001 From: Darren Bolduc Date: Mon, 29 Dec 2025 12:49:11 -0500 Subject: [PATCH 2/2] address review comments --- src/pubsub/src/subscriber/builder.rs | 35 ++++++++++++++++++---------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/src/pubsub/src/subscriber/builder.rs b/src/pubsub/src/subscriber/builder.rs index c092842937..fe1fefc1d2 100644 --- a/src/pubsub/src/subscriber/builder.rs +++ b/src/pubsub/src/subscriber/builder.rs @@ -15,6 +15,8 @@ use super::stub::Stub; use std::sync::Arc; +const MIB: i64 = 1024 * 1024; + /// Builder for the `client::Subscriber::streaming_pull` method. pub struct StreamingPull where @@ -38,7 +40,7 @@ where subscription, ack_deadline_seconds: 10, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1024 * 1024, + max_outstanding_bytes: 100 * MIB, } } @@ -48,13 +50,15 @@ where /// incoming message. Note that this value is independent of the deadline /// configured on the server-side subscription. /// + /// If the server does not hear back from the client within this deadline + /// (e.g. if an application crashes), it will resend any unacknowledged + /// messages to another subscriber. + /// /// The minimum deadline you can specify is 10 seconds. The maximum deadline /// you can specify is 600 seconds (10 minutes). /// /// The default value is 10 seconds. /// - /// To use the server-side subscription deadline, specify a value of `0`. - /// /// # Example /// /// ```no_rust @@ -81,6 +85,8 @@ where /// /// Use a value <= 0 to set no limit on the number of outstanding messages. /// + /// The default value is 1000 messages. + /// /// # Example /// /// ```no_rust @@ -88,7 +94,7 @@ where /// # async fn sample() -> anyhow::Result<()> { /// # let client = Subscriber::builder().build().await?; /// let session = client.streaming_pull("projects/my-project/subscriptions/my-subscription") - /// .set_max_outstanding_messages(100000) + /// .set_max_outstanding_messages(2000) /// .start(); /// # Ok(()) } /// ``` @@ -107,14 +113,17 @@ where /// /// Use a value <= 0 to set no limit on the number of outstanding bytes. /// + /// The default value is 100 MiB. + /// /// # Example /// /// ```no_rust /// # use google_cloud_pubsub::client::Subscriber; /// # async fn sample() -> anyhow::Result<()> { /// # let client = Subscriber::builder().build().await?; + /// const MIB: i64 = 1024 * 1024; /// let session = client.streaming_pull("projects/my-project/subscriptions/my-subscription") - /// .set_max_outstanding_bytes(1024 * 1024 * 1024) + /// .set_max_outstanding_bytes(200 * MIB) /// .start(); /// # Ok(()) } /// ``` @@ -129,6 +138,8 @@ mod tests { use super::super::stub::tests::MockStub; use super::*; + const KIB: i64 = 1024; + #[test] fn reasonable_defaults() { let mock = MockStub::new(); @@ -142,14 +153,14 @@ mod tests { ); assert_eq!(builder.ack_deadline_seconds, 10); assert!( - 1000000 > builder.max_outstanding_messages && builder.max_outstanding_messages > 100, - "{}", + 100_000 > builder.max_outstanding_messages && builder.max_outstanding_messages > 100, + "max_outstanding_messages={}", builder.max_outstanding_messages ); assert!( - builder.max_outstanding_bytes > 100000, - "{}", - builder.max_outstanding_messages + builder.max_outstanding_bytes > 100 * KIB, + "max_outstanding_bytes={}", + builder.max_outstanding_bytes ); } @@ -162,13 +173,13 @@ mod tests { ) .set_ack_deadline_seconds(20) .set_max_outstanding_messages(12345) - .set_max_outstanding_bytes(6789000); + .set_max_outstanding_bytes(6789 * KIB); assert_eq!( builder.subscription, "projects/my-project/subscriptions/my-subscription" ); assert_eq!(builder.ack_deadline_seconds, 20); assert_eq!(builder.max_outstanding_messages, 12345); - assert_eq!(builder.max_outstanding_bytes, 6789000); + assert_eq!(builder.max_outstanding_bytes, 6789 * KIB); } }