From 3c996bb06dee619fa5d0b6fe3946a773dc70e6e2 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 4 Dec 2025 15:27:00 +0000 Subject: [PATCH 1/6] impl(auth): add IAM API based signing provider --- src/auth/src/signer.rs | 10 +- src/auth/src/signer/iam.rs | 214 +++++++++++++++++++++++++++++++++++++ 2 files changed, 220 insertions(+), 4 deletions(-) create mode 100644 src/auth/src/signer/iam.rs diff --git a/src/auth/src/signer.rs b/src/auth/src/signer.rs index 0b68276ff4..3a1cd5a4f3 100644 --- a/src/auth/src/signer.rs +++ b/src/auth/src/signer.rs @@ -14,6 +14,8 @@ use std::sync::Arc; +pub(crate) mod iam; + pub type Result = std::result::Result; /// An implementation of [crate::signer::SigningProvider] that wraps a dynamic provider. @@ -47,8 +49,7 @@ impl Signer { /// Signs the provided content using the underlying provider. /// /// The content is typically a string-to-sign generated by the caller. - /// Returns the signature as a base64 encoded string (or other format depending on implementation, - /// but typically hex or base64). + /// Returns the signature as a hex encoded string. pub async fn sign(&self, content: T) -> Result where T: AsRef<[u8]> + Send + Sync, @@ -61,8 +62,9 @@ impl Signer { pub trait SigningProvider: std::fmt::Debug { /// Returns the email address of the authorizer. /// - /// It is typically the Google service account client email address from the Google Developers Console - /// in the form of "xxx@developer.gserviceaccount.com". Required. + /// It is typically the Google service account client email address + /// from the Google Developers Console in the form of + /// "xxx@developer.gserviceaccount.com". fn client_email(&self) -> impl Future> + Send; /// Signs the content. diff --git a/src/auth/src/signer/iam.rs b/src/auth/src/signer/iam.rs new file mode 100644 index 0000000000..6945a279dc --- /dev/null +++ b/src/auth/src/signer/iam.rs @@ -0,0 +1,214 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::credentials::{CacheableResource, Credentials}; +use crate::signer::{Result, SigningError, dynamic::SigningProvider}; +use http::Extensions; +use reqwest::Client; + +// Implements Signer using IAM signBlob API and reusing using existing [Credentials] to +// authenticate to it. +#[derive(Clone, Debug)] +pub(crate) struct IamSigner { + client_email: String, + inner: Credentials, + endpoint: String, +} + +#[derive(Debug, serde::Serialize)] +struct SignBlobRequest { + payload: String, +} + +#[derive(Debug, serde::Deserialize)] +struct SignBlobResponse { + #[serde(rename = "signedBlob")] + signed_blob: String, +} + +impl IamSigner { + pub(crate) fn new(client_email: String, inner: Credentials) -> Self { + Self { + client_email, + inner, + endpoint: "https://iamcredentials.googleapis.com".to_string(), + } + } +} + +#[async_trait::async_trait] +impl SigningProvider for IamSigner { + async fn client_email(&self) -> Result { + Ok(self.client_email.clone()) + } + + async fn sign(&self, content: &[u8]) -> Result { + use base64::{Engine, prelude::BASE64_STANDARD}; + + let source_headers = self + .inner + .headers(Extensions::new()) + .await + .map_err(SigningError::transport)?; + let source_headers = match source_headers { + CacheableResource::New { data, .. } => data, + CacheableResource::NotModified => { + unreachable!("requested source credentials without a caching etag") + } + }; + + let client_email = self.client_email.clone(); + let url = format!( + "{}/v1/projects/-/serviceAccounts/{}:signBlob", + self.endpoint, client_email + ); + + let client = Client::new(); + let payload = BASE64_STANDARD.encode(content); + let body = SignBlobRequest { payload }; + + let response = client + .post(url) + .header("Content-Type", "application/json") + .headers(source_headers) + .json(&body) + .send() + .await + .map_err(SigningError::transport)?; + + if !response.status().is_success() { + let err_text = response.text().await.map_err(SigningError::transport)?; + return Err(SigningError::transport(format!("err status: {err_text:?}"))); + } + + let res = response + .json::() + .await + .map_err(SigningError::transport)?; + + let signature = BASE64_STANDARD + .decode(res.signed_blob) + .map_err(SigningError::transport)?; + + let signature = hex::encode(signature); + + Ok(signature) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::credentials::{Credentials, CredentialsProvider, EntityTag}; + use crate::errors::CredentialsError; + use base64::{Engine, prelude::BASE64_STANDARD}; + use http::header::{HeaderName, HeaderValue}; + use http::{Extensions, HeaderMap}; + use httptest::matchers::{all_of, contains, eq, json_decoded, request}; + use httptest::responders::json_encoded; + use httptest::{Expectation, Server}; + use serde_json::json; + + type TestResult = anyhow::Result<()>; + + mockall::mock! { + #[derive(Debug)] + Credentials {} + + impl CredentialsProvider for Credentials { + async fn headers(&self, extensions: Extensions) -> std::result::Result, CredentialsError>; + async fn universe_domain(&self) -> Option; + } + } + + #[tokio::test] + async fn test_iam_sign() -> TestResult { + let server = Server::run(); + let payload = BASE64_STANDARD.encode("test"); + let signed_blob = BASE64_STANDARD.encode("signed_blob"); + server.expect( + Expectation::matching(all_of![ + request::method_path( + "POST", + format!("/v1/projects/-/serviceAccounts/test@example.com:signBlob") + ), + request::headers(contains(("authorization", "Bearer test-value"))), + request::body(json_decoded(eq(json!({ + "payload": payload, + })))) + ]) + .respond_with(json_encoded(json!({ + "signedBlob": signed_blob, + }))), + ); + let endpoint = server.url("").to_string().trim_end_matches('/').to_string(); + + let mut mock = MockCredentials::new(); + mock.expect_headers().return_once(|_extensions| { + let headers = HeaderMap::from_iter([( + HeaderName::from_static("authorization"), + HeaderValue::from_static("Bearer test-value"), + )]); + Ok(CacheableResource::New { + entity_tag: EntityTag::default(), + data: headers, + }) + }); + let creds = Credentials::from(mock); + + let mut signer = IamSigner::new("test@example.com".to_string(), creds); + signer.endpoint = endpoint; + let signature = signer.sign(b"test").await.unwrap(); + + assert_eq!(signature, hex::encode("signed_blob")); + + Ok(()) + } + + #[tokio::test] + async fn test_iam_sign_api_error() -> TestResult { + let server = Server::run(); + let payload = BASE64_STANDARD.encode("test"); + server.expect( + Expectation::matching(all_of![request::method_path( + "POST", + format!("/v1/projects/-/serviceAccounts/test@example.com:signBlob") + ),]) + .respond_with(json_encoded(json!({ + "error": { + "code": 400, + "message": "test-error", + }, + }))), + ); + let endpoint = server.url("").to_string().trim_end_matches('/').to_string(); + + let mut mock = MockCredentials::new(); + mock.expect_headers().return_once(|_extensions| { + Ok(CacheableResource::New { + entity_tag: EntityTag::default(), + data: HeaderMap::new(), + }) + }); + let creds = Credentials::from(mock); + + let mut signer = IamSigner::new("test@example.com".to_string(), creds); + signer.endpoint = endpoint; + let err = signer.sign(b"test").await.unwrap_err(); + + assert!(err.is_transport()); + + Ok(()) + } +} From 08f1e9e3781bd14f3c7ab8c532a862684007852d Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 4 Dec 2025 15:30:01 +0000 Subject: [PATCH 2/6] fix: add missing dep --- Cargo.lock | 1 + Cargo.toml | 1 + src/auth/Cargo.toml | 1 + src/auth/src/signer/iam.rs | 1 - 4 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 664800b932..0b36f7450f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1472,6 +1472,7 @@ dependencies = [ "base64", "bon", "google-cloud-gax", + "hex", "http", "httptest", "jsonwebtoken", diff --git a/Cargo.toml b/Cargo.toml index 7b78442eee..bd0d0b583a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -307,6 +307,7 @@ clap = { default-features = false, version = "4" } crc32c = { default-features = false, version = "0.6.8" } futures = { default-features = false, version = "0.3" } http = { default-features = false, version = "1", features = ["std"] } +hex = { default-features = false, version = "0.4.3" } http-body = { default-features = false, version = "1" } http-body-util = { default-features = false, version = "0.1.3" } humantime = { default-features = false, version = "2" } diff --git a/src/auth/Cargo.toml b/src/auth/Cargo.toml index c1db5ce968..829174afcb 100644 --- a/src/auth/Cargo.toml +++ b/src/auth/Cargo.toml @@ -46,6 +46,7 @@ thiserror.workspace = true time = { workspace = true, features = ["serde"] } tokio = { workspace = true, features = ["fs", "process"] } bon.workspace = true +hex = { workspace = true, features = ["std"] } jsonwebtoken = { workspace = true, features = ["rust_crypto"], optional = true } # Local dependencies gax.workspace = true diff --git a/src/auth/src/signer/iam.rs b/src/auth/src/signer/iam.rs index 6945a279dc..22ea46cb97 100644 --- a/src/auth/src/signer/iam.rs +++ b/src/auth/src/signer/iam.rs @@ -179,7 +179,6 @@ mod tests { #[tokio::test] async fn test_iam_sign_api_error() -> TestResult { let server = Server::run(); - let payload = BASE64_STANDARD.encode("test"); server.expect( Expectation::matching(all_of![request::method_path( "POST", From b97fa3b812d9702f9747e174ccc8998398118c6d Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 4 Dec 2025 16:53:18 +0000 Subject: [PATCH 3/6] fix: clippy all the things --- Cargo.toml | 2 +- src/auth/src/signer/iam.rs | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bd0d0b583a..7f629272ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -307,7 +307,7 @@ clap = { default-features = false, version = "4" } crc32c = { default-features = false, version = "0.6.8" } futures = { default-features = false, version = "0.3" } http = { default-features = false, version = "1", features = ["std"] } -hex = { default-features = false, version = "0.4.3" } +hex = { default-features = false, version = "0.4" } http-body = { default-features = false, version = "1" } http-body-util = { default-features = false, version = "0.1.3" } humantime = { default-features = false, version = "2" } diff --git a/src/auth/src/signer/iam.rs b/src/auth/src/signer/iam.rs index 22ea46cb97..4f8f998d8b 100644 --- a/src/auth/src/signer/iam.rs +++ b/src/auth/src/signer/iam.rs @@ -141,7 +141,7 @@ mod tests { Expectation::matching(all_of![ request::method_path( "POST", - format!("/v1/projects/-/serviceAccounts/test@example.com:signBlob") + "/v1/projects/-/serviceAccounts/test@example.com:signBlob" ), request::headers(contains(("authorization", "Bearer test-value"))), request::body(json_decoded(eq(json!({ @@ -176,13 +176,25 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_iam_client_email() -> TestResult { + let mut mock = MockCredentials::new(); + let creds = Credentials::from(mock); + + let mut signer = IamSigner::new("test@example.com".to_string(), creds); + let client_email = signer.client_email().await.unwrap(); + assert_eq!(client_email, "test@example.com"); + + Ok(()) + } + #[tokio::test] async fn test_iam_sign_api_error() -> TestResult { let server = Server::run(); server.expect( Expectation::matching(all_of![request::method_path( "POST", - format!("/v1/projects/-/serviceAccounts/test@example.com:signBlob") + "/v1/projects/-/serviceAccounts/test@example.com:signBlob" ),]) .respond_with(json_encoded(json!({ "error": { From d56628e2bc472e5d54fb83cc48084ebf0ef97d4f Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 4 Dec 2025 17:17:42 +0000 Subject: [PATCH 4/6] impl: cache reqwest client --- src/auth/src/signer/iam.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/auth/src/signer/iam.rs b/src/auth/src/signer/iam.rs index 4f8f998d8b..086f8e7409 100644 --- a/src/auth/src/signer/iam.rs +++ b/src/auth/src/signer/iam.rs @@ -24,6 +24,7 @@ pub(crate) struct IamSigner { client_email: String, inner: Credentials, endpoint: String, + client: Client, } #[derive(Debug, serde::Serialize)] @@ -43,6 +44,7 @@ impl IamSigner { client_email, inner, endpoint: "https://iamcredentials.googleapis.com".to_string(), + client: Client::new(), } } } @@ -74,11 +76,11 @@ impl SigningProvider for IamSigner { self.endpoint, client_email ); - let client = Client::new(); let payload = BASE64_STANDARD.encode(content); let body = SignBlobRequest { payload }; - let response = client + let response = self + .client .post(url) .header("Content-Type", "application/json") .headers(source_headers) @@ -116,7 +118,7 @@ mod tests { use http::header::{HeaderName, HeaderValue}; use http::{Extensions, HeaderMap}; use httptest::matchers::{all_of, contains, eq, json_decoded, request}; - use httptest::responders::json_encoded; + use httptest::responders::{json_encoded, status_code}; use httptest::{Expectation, Server}; use serde_json::json; @@ -178,10 +180,10 @@ mod tests { #[tokio::test] async fn test_iam_client_email() -> TestResult { - let mut mock = MockCredentials::new(); + let mock = MockCredentials::new(); let creds = Credentials::from(mock); - let mut signer = IamSigner::new("test@example.com".to_string(), creds); + let signer = IamSigner::new("test@example.com".to_string(), creds); let client_email = signer.client_email().await.unwrap(); assert_eq!(client_email, "test@example.com"); @@ -196,12 +198,7 @@ mod tests { "POST", "/v1/projects/-/serviceAccounts/test@example.com:signBlob" ),]) - .respond_with(json_encoded(json!({ - "error": { - "code": 400, - "message": "test-error", - }, - }))), + .respond_with(status_code(500)), ); let endpoint = server.url("").to_string().trim_end_matches('/').to_string(); From 0ea1fb1fb9008745f0843baa852e7832cf34cdbd Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 5 Dec 2025 14:22:04 +0000 Subject: [PATCH 5/6] feat: add retry logic for calling signBlob --- src/auth/src/signer/iam.rs | 140 +++++++++++++++++++++++++++++-------- 1 file changed, 111 insertions(+), 29 deletions(-) diff --git a/src/auth/src/signer/iam.rs b/src/auth/src/signer/iam.rs index 086f8e7409..1e1efbb494 100644 --- a/src/auth/src/signer/iam.rs +++ b/src/auth/src/signer/iam.rs @@ -14,12 +14,18 @@ use crate::credentials::{CacheableResource, Credentials}; use crate::signer::{Result, SigningError, dynamic::SigningProvider}; -use http::Extensions; +use gax::backoff_policy::{BackoffPolicy, BackoffPolicyArg}; +use gax::exponential_backoff::ExponentialBackoff; +use gax::retry_loop_internal::retry_loop; +use gax::retry_policy::{Aip194Strict, RetryPolicyArg, RetryPolicyExt}; +use gax::retry_throttler::{AdaptiveThrottler, RetryThrottlerArg, SharedRetryThrottler}; +use http::{Extensions, HeaderMap}; use reqwest::Client; +use std::sync::Arc; // Implements Signer using IAM signBlob API and reusing using existing [Credentials] to // authenticate to it. -#[derive(Clone, Debug)] +#[derive(Debug)] pub(crate) struct IamSigner { client_email: String, inner: Credentials, @@ -27,7 +33,7 @@ pub(crate) struct IamSigner { client: Client, } -#[derive(Debug, serde::Serialize)] +#[derive(Debug, Clone, serde::Serialize)] struct SignBlobRequest { payload: String, } @@ -58,36 +64,16 @@ impl SigningProvider for IamSigner { async fn sign(&self, content: &[u8]) -> Result { use base64::{Engine, prelude::BASE64_STANDARD}; - let source_headers = self - .inner - .headers(Extensions::new()) - .await - .map_err(SigningError::transport)?; - let source_headers = match source_headers { - CacheableResource::New { data, .. } => data, - CacheableResource::NotModified => { - unreachable!("requested source credentials without a caching etag") - } - }; + let payload = BASE64_STANDARD.encode(content); + let body = SignBlobRequest { payload }; let client_email = self.client_email.clone(); let url = format!( - "{}/v1/projects/-/serviceAccounts/{}:signBlob", - self.endpoint, client_email + "{}/v1/projects/-/serviceAccounts/{client_email}:signBlob", + self.endpoint ); - - let payload = BASE64_STANDARD.encode(content); - let body = SignBlobRequest { payload }; - - let response = self - .client - .post(url) - .header("Content-Type", "application/json") - .headers(source_headers) - .json(&body) - .send() - .await - .map_err(SigningError::transport)?; + let response = + sign_blob_call_with_retry(self.inner.clone(), self.client.clone(), url, body).await?; if !response.status().is_success() { let err_text = response.text().await.map_err(SigningError::transport)?; @@ -109,6 +95,64 @@ impl SigningProvider for IamSigner { } } +async fn sign_blob_call_with_retry( + credentials: Credentials, + client: Client, + url: String, + body: SignBlobRequest, +) -> Result { + let sleep = async |d| tokio::time::sleep(d).await; + + let retry_policy: RetryPolicyArg = Aip194Strict.with_attempt_limit(3).into(); + let backoff_policy: BackoffPolicyArg = ExponentialBackoff::default().into(); + let backoff_policy: Arc = backoff_policy.into(); + let retry_throttler: RetryThrottlerArg = AdaptiveThrottler::default().into(); + let retry_throttler: SharedRetryThrottler = retry_throttler.into(); + + let response = retry_loop( + async move |_| { + let source_headers = credentials + .headers(Extensions::new()) + .await + .map_err(gax::error::Error::authentication)?; + + sign_blob_call(&client, &url, source_headers, body.clone()).await + }, + sleep, + true, // signBlob is idempotent + retry_throttler, + retry_policy.into(), + backoff_policy, + ) + .await + .map_err(SigningError::transport)?; + + Ok(response) +} + +async fn sign_blob_call( + client: &Client, + url: &str, + source_headers: CacheableResource, + body: SignBlobRequest, +) -> gax::Result { + let source_headers = match source_headers { + CacheableResource::New { data, .. } => data, + CacheableResource::NotModified => { + unreachable!("requested source credentials without a caching etag") + } + }; + + client + .post(url) + .header("Content-Type", "application/json") + .headers(source_headers.clone()) + .json(&body) + .send() + .await + .map_err(|e| gax::error::Error::transport(source_headers, e)) +} + #[cfg(test)] mod tests { use super::*; @@ -117,6 +161,7 @@ mod tests { use base64::{Engine, prelude::BASE64_STANDARD}; use http::header::{HeaderName, HeaderValue}; use http::{Extensions, HeaderMap}; + use httptest::cycle; use httptest::matchers::{all_of, contains, eq, json_decoded, request}; use httptest::responders::{json_encoded, status_code}; use httptest::{Expectation, Server}; @@ -219,4 +264,41 @@ mod tests { Ok(()) } + + async fn test_iam_sign_retry() -> TestResult { + let server = Server::run(); + let signed_blob = BASE64_STANDARD.encode("signed_blob"); + server.expect( + Expectation::matching(all_of![request::method_path( + "POST", + "/v1/projects/-/serviceAccounts/test@example.com:signBlob" + ),]) + .times(3) + .respond_with(cycle![ + status_code(503).body("try-again"), + status_code(503).body("try-again"), + json_encoded(json!({ + "signedBlob": signed_blob, + })) + ]), + ); + let endpoint = server.url("").to_string().trim_end_matches('/').to_string(); + + let mut mock = MockCredentials::new(); + mock.expect_headers().return_once(|_extensions| { + Ok(CacheableResource::New { + entity_tag: EntityTag::default(), + data: HeaderMap::new(), + }) + }); + let creds = Credentials::from(mock); + + let mut signer = IamSigner::new("test@example.com".to_string(), creds); + signer.endpoint = endpoint; + let signature = signer.sign(b"test").await.unwrap(); + + assert_eq!(signature, hex::encode("signed_blob")); + + Ok(()) + } } From 2102febd897035d806443711fc767796faa7f4d5 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 5 Dec 2025 15:52:51 +0000 Subject: [PATCH 6/6] fix: address review comments --- src/auth/src/signer/iam.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/auth/src/signer/iam.rs b/src/auth/src/signer/iam.rs index 1e1efbb494..78f827fcdc 100644 --- a/src/auth/src/signer/iam.rs +++ b/src/auth/src/signer/iam.rs @@ -109,7 +109,7 @@ async fn sign_blob_call_with_retry( let retry_throttler: RetryThrottlerArg = AdaptiveThrottler::default().into(); let retry_throttler: SharedRetryThrottler = retry_throttler.into(); - let response = retry_loop( + retry_loop( async move |_| { let source_headers = credentials .headers(Extensions::new()) @@ -125,9 +125,7 @@ async fn sign_blob_call_with_retry( backoff_policy, ) .await - .map_err(SigningError::transport)?; - - Ok(response) + .map_err(SigningError::transport) } async fn sign_blob_call(