Skip to content

Commit cb1255b

Browse files
authored
impl(pubsub): StreamingPull builder (#4118)
Part of the work for #3941 Add a builder for a streaming pull bidi RPC. This class will eventually be public, and thus it must have documentation. For now, this thing just sets options. It will gain a `fn start(self) -> Result<Session>` later, when `Session` is a thing.
1 parent 8117b0f commit cb1255b

File tree

2 files changed

+186
-0
lines changed

2 files changed

+186
-0
lines changed

src/pubsub/src/subscriber.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
pub(crate) mod builder;
1516
pub(crate) mod handler;
1617
pub(crate) mod keepalive;
1718
pub(crate) mod lease_loop;
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use super::stub::Stub;
16+
use std::sync::Arc;
17+
18+
const MIB: i64 = 1024 * 1024;
19+
20+
/// Builder for the `client::Subscriber::streaming_pull` method.
21+
pub struct StreamingPull<S>
22+
where
23+
S: Stub,
24+
{
25+
// TODO(#4061) - Use a dynamic stub to remove the generic.
26+
pub(crate) inner: Arc<S>,
27+
pub(crate) subscription: String,
28+
pub(crate) ack_deadline_seconds: i32,
29+
pub(crate) max_outstanding_messages: i64,
30+
pub(crate) max_outstanding_bytes: i64,
31+
}
32+
33+
impl<S> StreamingPull<S>
34+
where
35+
S: Stub,
36+
{
37+
pub(crate) fn new(inner: Arc<S>, subscription: String) -> Self {
38+
Self {
39+
inner,
40+
subscription,
41+
ack_deadline_seconds: 10,
42+
max_outstanding_messages: 1000,
43+
max_outstanding_bytes: 100 * MIB,
44+
}
45+
}
46+
47+
/// Sets the ack deadline to use for the stream.
48+
///
49+
/// This value represents how long the application has to ack or nack an
50+
/// incoming message. Note that this value is independent of the deadline
51+
/// configured on the server-side subscription.
52+
///
53+
/// If the server does not hear back from the client within this deadline
54+
/// (e.g. if an application crashes), it will resend any unacknowledged
55+
/// messages to another subscriber.
56+
///
57+
/// The minimum deadline you can specify is 10 seconds. The maximum deadline
58+
/// you can specify is 600 seconds (10 minutes).
59+
///
60+
/// The default value is 10 seconds.
61+
///
62+
/// # Example
63+
///
64+
/// ```no_rust
65+
/// # use google_cloud_pubsub::client::Subscriber;
66+
/// # async fn sample() -> anyhow::Result<()> {
67+
/// # let client = Subscriber::builder().build().await?;
68+
/// let session = client.streaming_pull("projects/my-project/subscriptions/my-subscription")
69+
/// .set_ack_deadline_seconds(20)
70+
/// .start();
71+
/// # Ok(()) }
72+
/// ```
73+
pub fn set_ack_deadline_seconds<T: Into<i32>>(mut self, v: T) -> Self {
74+
self.ack_deadline_seconds = v.into();
75+
self
76+
}
77+
78+
/// Flow control settings for the maximum number of outstanding messages.
79+
///
80+
/// The server will stop sending messages to a client when this many
81+
/// messages are outstanding (i.e. that have not been acked or nacked).
82+
///
83+
/// The server resumes sending messages when the outstanding message count
84+
/// drops below this value.
85+
///
86+
/// Use a value <= 0 to set no limit on the number of outstanding messages.
87+
///
88+
/// The default value is 1000 messages.
89+
///
90+
/// # Example
91+
///
92+
/// ```no_rust
93+
/// # use google_cloud_pubsub::client::Subscriber;
94+
/// # async fn sample() -> anyhow::Result<()> {
95+
/// # let client = Subscriber::builder().build().await?;
96+
/// let session = client.streaming_pull("projects/my-project/subscriptions/my-subscription")
97+
/// .set_max_outstanding_messages(2000)
98+
/// .start();
99+
/// # Ok(()) }
100+
/// ```
101+
pub fn set_max_outstanding_messages<T: Into<i64>>(mut self, v: T) -> Self {
102+
self.max_outstanding_messages = v.into();
103+
self
104+
}
105+
106+
/// Flow control settings for the maximum number of outstanding bytes.
107+
///
108+
/// The server will stop sending messages to a client when this many bytes
109+
/// of messages are outstanding (i.e. that have not been acked or nacked).
110+
///
111+
/// The server resumes sending messages when the outstanding byte count
112+
/// drops below this value.
113+
///
114+
/// Use a value <= 0 to set no limit on the number of outstanding bytes.
115+
///
116+
/// The default value is 100 MiB.
117+
///
118+
/// # Example
119+
///
120+
/// ```no_rust
121+
/// # use google_cloud_pubsub::client::Subscriber;
122+
/// # async fn sample() -> anyhow::Result<()> {
123+
/// # let client = Subscriber::builder().build().await?;
124+
/// const MIB: i64 = 1024 * 1024;
125+
/// let session = client.streaming_pull("projects/my-project/subscriptions/my-subscription")
126+
/// .set_max_outstanding_bytes(200 * MIB)
127+
/// .start();
128+
/// # Ok(()) }
129+
/// ```
130+
pub fn set_max_outstanding_bytes<T: Into<i64>>(mut self, v: T) -> Self {
131+
self.max_outstanding_bytes = v.into();
132+
self
133+
}
134+
}
135+
136+
#[cfg(test)]
137+
mod tests {
138+
use super::super::stub::tests::MockStub;
139+
use super::*;
140+
141+
const KIB: i64 = 1024;
142+
143+
#[test]
144+
fn reasonable_defaults() {
145+
let mock = MockStub::new();
146+
let builder = StreamingPull::new(
147+
Arc::new(mock),
148+
"projects/my-project/subscriptions/my-subscription".to_string(),
149+
);
150+
assert_eq!(
151+
builder.subscription,
152+
"projects/my-project/subscriptions/my-subscription"
153+
);
154+
assert_eq!(builder.ack_deadline_seconds, 10);
155+
assert!(
156+
100_000 > builder.max_outstanding_messages && builder.max_outstanding_messages > 100,
157+
"max_outstanding_messages={}",
158+
builder.max_outstanding_messages
159+
);
160+
assert!(
161+
builder.max_outstanding_bytes > 100 * KIB,
162+
"max_outstanding_bytes={}",
163+
builder.max_outstanding_bytes
164+
);
165+
}
166+
167+
#[test]
168+
fn options() {
169+
let mock = MockStub::new();
170+
let builder = StreamingPull::new(
171+
Arc::new(mock),
172+
"projects/my-project/subscriptions/my-subscription".to_string(),
173+
)
174+
.set_ack_deadline_seconds(20)
175+
.set_max_outstanding_messages(12345)
176+
.set_max_outstanding_bytes(6789 * KIB);
177+
assert_eq!(
178+
builder.subscription,
179+
"projects/my-project/subscriptions/my-subscription"
180+
);
181+
assert_eq!(builder.ack_deadline_seconds, 20);
182+
assert_eq!(builder.max_outstanding_messages, 12345);
183+
assert_eq!(builder.max_outstanding_bytes, 6789 * KIB);
184+
}
185+
}

0 commit comments

Comments
 (0)