diff --git a/Cargo.lock b/Cargo.lock index abd06c5f131a7..6bd1791248e1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5521,9 +5521,13 @@ dependencies = [ name = "mz-authenticator" version = "0.1.0" dependencies = [ + "jsonwebtoken", "mz-adapter", "mz-frontegg-auth", + "reqwest", "serde", + "tokio", + "tracing", "workspace-hack", ] diff --git a/oidc_auth_setup.md b/oidc_auth_setup.md new file mode 100644 index 0000000000000..bd6a3560b0bc6 --- /dev/null +++ b/oidc_auth_setup.md @@ -0,0 +1,77 @@ +## Setting + +PGOAUTHDEBUG=UNSAFE psql 'host=192.168.215.3 user=employees dbname=promo oauth_issuer=http://host.docker.internal:4444 oauth_client_id=1186624a-7bed-44f8-867e-d3938a29c924 oauth_client_secret=88OlbvOkiDaPSypu94qK_WHDjG' + + +code_client=$(docker compose -f quickstart.yml exec hydra \ + hydra create client \ + --endpoint http://127.0.0.1:4445 \ + --grant-type authorization_code,refresh_token \ + --response-type code,id_token \ + --format json \ + --scope openid --scope offline --scope profile --scope email\ + --access-token-strategy jwt \ + --redirect-uri http://127.0.0.1:5555/callback) + +code_client_id=$(echo $code_client | jq -r '.client_id') +code_client_secret=$(echo $code_client | jq -r '.client_secret') + +docker compose -f quickstart.yml exec hydra \ + hydra perform authorization-code \ + --client-id $code_client_id \ + --client-secret $code_client_secret \ + --endpoint http://127.0.0.1:4444/ \ + --port 5555 \ + --scope openid --scope offline --scope profile --scope email + +## Deleting a client +hydra delete oauth2-client --endpoint http://localhost:4445 b1a93de1-e4dd-4da9-8e81-083ec4e89f6e=$ + +client id: 060a4f3d-1cac-46e4-b5a5-6b9c66cd9431 +secret: wAghHCKR_E26yuLRpSkaoz2epq + + + + + +device + +device_client=$(docker compose -f quickstart.yml exec hydra \ + hydra create client \ + --endpoint http://127.0.0.1:4445 \ + --format json \ + --name "my device app" \ + --grant-type urn:ietf:params:oauth:grant-type:device_code,refresh_token \ + --token-endpoint-auth-method none \ + --access-token-strategy jwt \ + --scope openid,offline_access,profile) + +device_client_id=$(echo $device_client | jq -r '.client_id') +device_client_secret=$(echo $device_client | jq -r '.client_secret') + +echo $device_client_id +echo $device_client_secret + + +docker compose -f quickstart.yml exec hydra \ + hydra perform device-code \ + --client-id $device_client_id \ + --client-secret $device_client_secret \ + --endpoint http://127.0.0.1:4444/ \ + --scope openid,offline_access + + +Visit http://host.docker.internal:4444/oauth2/device/verify and enter the code: mpGRAMPk + + +http://localhost:4444/.well-known/jwks.json + + bin/environmentd \ + --oidc-issuer="http://127.0.0.1:4444" \ + --oidc-jwks-uri="http://127.0.0.1:4444/.well-known/jwks.json" \ + --listeners-config-path='src/materialized/ci/listener_configs/oidc.json' + +eyJhbGciOiJSUzI1NiIsImtpZCI6Ijk3ZTJmOTJhLWM2YjQtNDQ0ZC1hNjZhLWY3Y2YwOTIwNzdhMyIsInR5cCI6IkpXVCJ9.eyJhdWQiOltdLCJjbGllbnRfaWQiOiJlZTY3ZDIzNi1mMzY4LTQ0ZmYtYTBiNS1mYWIxZDQwNjZhOTEiLCJleHAiOjE3NjU0Nzg3NDIsImV4dCI6e30sImlhdCI6MTc2NTQ3NTE0MiwiaXNzIjoiaHR0cDovLzEyNy4wLjAuMTo0NDQ0IiwianRpIjoiMzVmNjZkZTEtMzYzNC00ZmM5LWE2ZTktZjM3MmI0YTVlZTRjIiwibmJmIjoxNzY1NDc1MTQyLCJzY3AiOlsib3BlbmlkIiwib2ZmbGluZSJdLCJzdWIiOiJmb29AYmFyLmNvbSJ9.v_tSd01RUVxUvvpO3lBRtFTyZwekliiXpDGAFDeaoOALVUQhtzrYOqWowUNPoMK8mFowwLAVHpc0VOnVHqPVEino4cp3Q3o_FGOGzSdkDBvh2ZmGeC_4uT_C3fzIz7I6fHNCqE7kY_r0EZgguJPepMpgfZ0irjV972tLHV612vrG9LyoplbJCGr6mvkkWBB1fqzcn6C4WnjNaSCArb-riJtfMLNH-AWzotOfJbvdHqNbQcFiNKpA7Xc1sderpLlFdf19U-5NRuQ1YpT1jhKj10JwUIX_Ct7btk8H2LPJ405pVaIU-TvYWMY8mOfoPvmjOxbhMPOgn_aYhJkRjTk1f1H4K8MTMEnKRon7H-Jn-YMnEUVH8bQvLY76fu0LQ6mGbbsdy_o-1bp99n1cSdQIJkHYtPXUspgQPbOLwjYns19M8nEwFamHwUruAm_RBZYiAQTB3Z-SjKGi4HWLwZXr0OaRx-aP-HTVaG6v14z7Vr_mKbcLN_ZyQTailqAwPbu0NRIP-tXp8owlVwNbyL1FdqbM58g5lnxao6H77bmHCyG8YZTYm7ID-plCrGnrSkJm2AN_9PaxgeMvJ9ekcQg2nEpvsO5D1eDyMuKE5CCTZrs7c8Y5G-tCHiCbqftwFcBLgDUc-voSk0gFavS1bFCcXw4ExKfDGjy_4oXDOeBUWcw diff --git a/src/authenticator/Cargo.toml b/src/authenticator/Cargo.toml index 3c34cd66028cb..e11467e70b082 100644 --- a/src/authenticator/Cargo.toml +++ b/src/authenticator/Cargo.toml @@ -8,9 +8,13 @@ rust-version.workspace = true publish = false [dependencies] +jsonwebtoken = "9.3.1" mz-adapter = { path = "../adapter", default-features = false } mz-frontegg-auth = { path = "../frontegg-auth", default-features = false } +reqwest = "0.12.24" serde = { version = "1.0.219", features = ["derive"] } +tracing = "0.1.43" +tokio = { version = "1.48.0", default-features = false } workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } [lints] diff --git a/src/authenticator/src/lib.rs b/src/authenticator/src/lib.rs index 6e1527d13b742..68f4d5e2e2df4 100644 --- a/src/authenticator/src/lib.rs +++ b/src/authenticator/src/lib.rs @@ -7,13 +7,18 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +pub mod oidc; + use mz_adapter::Client as AdapterClient; use mz_frontegg_auth::Authenticator as FronteggAuthenticator; +pub use oidc::{OidcAuthenticator, OidcConfig, OidcError}; + #[derive(Debug, Clone)] pub enum Authenticator { Frontegg(FronteggAuthenticator), Password(AdapterClient), Sasl(AdapterClient), + Oidc(OidcAuthenticator), None, } diff --git a/src/authenticator/src/oidc.rs b/src/authenticator/src/oidc.rs new file mode 100644 index 0000000000000..29b4bf61e06bd --- /dev/null +++ b/src/authenticator/src/oidc.rs @@ -0,0 +1,202 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! OIDC Authentication for pgwire connections. +//! +//! This module provides JWT-based authentication using OpenID Connect (OIDC). +//! JWTs are validated locally using JWKS fetched from the configured provider. + +use std::time::Duration; + +use jsonwebtoken::{DecodingKey, Validation, decode, decode_header, jwk::JwkSet}; +use reqwest::Client as HttpClient; +use serde::{Deserialize, Serialize}; +use tracing::warn; + +/// Command line arguments for OIDC authentication. +#[derive(Debug, Clone)] +pub struct OidcConfig { + /// OIDC issuer URL (e.g., "https://accounts.google.com"). + /// This is validated against the `iss` claim in the JWT. + pub oidc_issuer: String, + /// JWKS URI for fetching public keys. + /// (e.g., "https://www.googleapis.com/oauth2/v3/certs") + pub oidc_jwks_uri: String, +} + +/// Errors that can occur during OIDC authentication. +#[derive(Debug)] +pub enum OidcError { + /// JWT token has expired. + TokenExpired, + /// JWT signature is invalid. + InvalidSignature, + /// JWT issuer does not match expected value. + InvalidIssuer, + /// Failed to fetch JWKS from provider. + JwksFetchFailed(String), + /// OIDC configuration is incomplete. + IncompleteConfig(String), + /// JWT is malformed or could not be parsed. + MalformedToken(String), + /// No matching key found in JWKS. + NoMatchingKey, +} + +impl std::fmt::Display for OidcError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OidcError::TokenExpired => write!(f, "token expired"), + OidcError::InvalidSignature => write!(f, "invalid signature"), + OidcError::InvalidIssuer => write!(f, "invalid issuer"), + OidcError::JwksFetchFailed(e) => write!(f, "failed to fetch JWKS: {}", e), + OidcError::IncompleteConfig(e) => write!(f, "incomplete OIDC config: {}", e), + OidcError::MalformedToken(e) => write!(f, "malformed token: {}", e), + OidcError::NoMatchingKey => write!(f, "no matching key in JWKS"), + } + } +} + +impl std::error::Error for OidcError {} + +/// Claims extracted from a validated JWT. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OidcClaims { + /// Subject (user identifier). + pub sub: String, + /// Issuer. + pub iss: String, + /// Expiration time (Unix timestamp). + pub exp: i64, + /// Issued at time (Unix timestamp). + #[serde(default)] + pub iat: Option, + /// Email claim (commonly used for username). + #[serde(default)] + pub email: Option, + /// Whether the email is verified. + #[serde(default)] + pub email_verified: Option, + /// Preferred username claim. + #[serde(default)] + pub preferred_username: Option, + /// Name claim. + #[serde(default)] + pub name: Option, +} + +impl OidcClaims { + /// Extract the username to use for the session. + /// + /// Priority: email > preferred_username > sub + pub fn username(&self) -> &str { + self.email + .as_deref() + .or(self.preferred_username.as_deref()) + .unwrap_or(&self.sub) + } +} + +/// OIDC Authenticator that validates JWTs using JWKS. +#[derive(Debug, Clone)] +pub struct OidcAuthenticator { + issuer: String, + jwks_uri: String, + http_client: HttpClient, +} + +impl OidcAuthenticator { + /// Create a new [`OidcAuthenticator`] from [`OidcConfig`]. + pub fn new(config: OidcConfig) -> Self { + Self { + issuer: config.oidc_issuer, + jwks_uri: config.oidc_jwks_uri, + http_client: HttpClient::new(), + } + } + + /// Validate a JWT token and return the claims. + /// + /// This performs the following validations: + /// 1. Decode the JWT header to get the key ID (kid) and algorithm + /// 2. Fetch JWKS and find the matching key + /// 3. Verify the signature + /// 4. Validate claims (exp, iss) + pub async fn validate_token(&self, token: &str) -> Result { + // 1. Decode header to get key ID (kid) and algorithm + let header = decode_header(token).map_err(|e| OidcError::MalformedToken(e.to_string()))?; + + // 2. Fetch JWKS and get the matching key + let decoding_key = self.fetch_decoding_key(&header.kid).await?; + + // 3. Set up validation + let mut validation = Validation::new(header.alg); + validation.set_issuer(&[&self.issuer]); + validation.validate_aud = false; + + // 4. Decode and validate the token + let token_data = decode::(token, &decoding_key, &validation).map_err(|e| { + use jsonwebtoken::errors::ErrorKind; + match e.kind() { + ErrorKind::ExpiredSignature => OidcError::TokenExpired, + ErrorKind::InvalidSignature => OidcError::InvalidSignature, + ErrorKind::InvalidIssuer => OidcError::InvalidIssuer, + _ => OidcError::MalformedToken(e.to_string()), + } + })?; + + Ok(token_data.claims) + } + + /// Fetch JWKS from the provider and return the decoding key. + async fn fetch_decoding_key(&self, kid: &Option) -> Result { + let response = self + .http_client + .get(&self.jwks_uri) + .timeout(Duration::from_secs(10)) + .send() + .await + .map_err(|e| OidcError::JwksFetchFailed(e.to_string()))?; + + if !response.status().is_success() { + return Err(OidcError::JwksFetchFailed(format!( + "HTTP {}", + response.status() + ))); + } + + let jwks: JwkSet = response + .json() + .await + .map_err(|e| OidcError::JwksFetchFailed(e.to_string()))?; + + // Find the matching key + for jwk in jwks.keys { + let jwk_kid = jwk.common.key_id.as_ref(); + + // Match by kid if provided, otherwise use the first key + let is_match = match kid { + Some(k) => jwk_kid == Some(k), + None => true, + }; + + if is_match { + match DecodingKey::from_jwk(&jwk) { + Ok(key) => return Ok(key), + Err(e) => { + warn!("Failed to parse JWK: {}", e); + continue; + } + } + } + } + + Err(OidcError::NoMatchingKey) + } +} diff --git a/src/environmentd/src/environmentd/main.rs b/src/environmentd/src/environmentd/main.rs index df2788af49963..db17a17ffe42f 100644 --- a/src/environmentd/src/environmentd/main.rs +++ b/src/environmentd/src/environmentd/main.rs @@ -36,6 +36,7 @@ use mz_adapter_types::bootstrap_builtin_cluster_config::{ SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR, }; use mz_auth::password::Password; +use mz_authenticator::{OidcAuthenticator, OidcConfig}; use mz_aws_secrets_controller::AwsSecretsController; use mz_build_info::BuildInfo; use mz_catalog::config::ClusterReplicaSizeMap; @@ -171,6 +172,13 @@ pub struct Args { /// Frontegg arguments. #[clap(flatten)] frontegg: FronteggCliArgs, + // === OIDC options. === + /// OIDC issuer URL (e.g., "https://accounts.google.com"). + #[clap(long, env = "MZ_OIDC_ISSUER", requires = "oidc_jwks_uri")] + oidc_issuer: Option, + /// JWKS URI for fetching public keys. + #[clap(long, env = "MZ_OIDC_JWKS_URI", requires = "oidc_issuer")] + oidc_jwks_uri: Option, // === Orchestrator options. === /// The service orchestrator implementation to use. #[structopt(long, value_enum, env = "ORCHESTRATOR")] @@ -743,6 +751,13 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> { // Configure connections. let tls = args.tls.into_config()?; let frontegg = FronteggAuthenticator::from_args(args.frontegg, &metrics_registry)?; + let oidc = match (args.oidc_issuer, args.oidc_jwks_uri) { + (Some(issuer), Some(jwks_uri)) => Some(OidcAuthenticator::new(OidcConfig { + oidc_issuer: issuer, + oidc_jwks_uri: jwks_uri, + })), + _ => None, + }; let listeners_config: ListenersConfig = { let f = File::open(args.listeners_config_path)?; serde_json::from_reader(f)? @@ -1081,6 +1096,7 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> { tls_reload_certs: mz_server_core::default_cert_reload_ticker(), external_login_password_mz_system: args.external_login_password_mz_system, frontegg, + oidc, cors_allowed_origin, egress_addresses: args.announce_egress_address, http_host_name: args.http_host_name, diff --git a/src/environmentd/src/http.rs b/src/environmentd/src/http.rs index 550e070596b96..64bc0e68954c5 100644 --- a/src/environmentd/src/http.rs +++ b/src/environmentd/src/http.rs @@ -1022,6 +1022,31 @@ async fn auth( include_www_authenticate_header, }); } + Authenticator::Oidc(oidc) => match creds { + Some(Credentials::Token { token }) => { + // Validate JWT token + let claims = oidc + .validate_token(&token) + .await + .map_err(|_| AuthError::InvalidCredentials)?; + let name = claims.username().to_string(); + (name, None, None) + } + Some(Credentials::Password { password, .. }) => { + // Allow JWT to be passed as password + let claims = oidc + .validate_token(&password.0) + .await + .map_err(|_| AuthError::InvalidCredentials)?; + let name = claims.username().to_string(); + (name, None, None) + } + None => { + return Err(AuthError::MissingHttpAuthentication { + include_www_authenticate_header, + }); + } + }, Authenticator::None => { // If no authentication, use whatever is in the HTTP auth // header (without checking the password), or fall back to the diff --git a/src/environmentd/src/lib.rs b/src/environmentd/src/lib.rs index 611821b59d3e5..a428db7c3cc2e 100644 --- a/src/environmentd/src/lib.rs +++ b/src/environmentd/src/lib.rs @@ -36,7 +36,7 @@ use mz_adapter_types::dyncfgs::{ WITH_0DT_DEPLOYMENT_MAX_WAIT, }; use mz_auth::password::Password; -use mz_authenticator::Authenticator; +use mz_authenticator::{Authenticator, OidcAuthenticator}; use mz_build_info::{BuildInfo, build_info}; use mz_catalog::config::ClusterReplicaSizeMap; use mz_catalog::durable::BootstrapArgs; @@ -106,6 +106,8 @@ pub struct Config { pub external_login_password_mz_system: Option, /// Frontegg JWT authentication configuration. pub frontegg: Option, + /// OIDC JWT authentication configuration. + pub oidc: Option, /// Origins for which cross-origin resource sharing (CORS) for HTTP requests /// is permitted. pub cors_allowed_origin: AllowOrigin, @@ -261,6 +263,7 @@ impl Listener { active_connection_counter: ConnectionCounter, tls_reloading_context: Option, frontegg: Option, + oidc: Option, adapter_client: AdapterClient, metrics: MetricsConfig, helm_chart_version: Option, @@ -280,6 +283,9 @@ impl Listener { ), AuthenticatorKind::Password => Authenticator::Password(adapter_client.clone()), AuthenticatorKind::Sasl => Authenticator::Sasl(adapter_client.clone()), + AuthenticatorKind::Oidc => Authenticator::Oidc( + oidc.expect("OIDC config is required with AuthenticatorKind::Oidc"), + ), AuthenticatorKind::None => Authenticator::None, }; @@ -380,16 +386,23 @@ impl Listeners { let authenticator_frontegg_rx = authenticator_frontegg_rx.shared(); let (authenticator_password_tx, authenticator_password_rx) = oneshot::channel(); let authenticator_password_rx = authenticator_password_rx.shared(); + let (authenticator_oidc_tx, authenticator_oidc_rx) = oneshot::channel(); + let authenticator_oidc_rx = authenticator_oidc_rx.shared(); let (authenticator_none_tx, authenticator_none_rx) = oneshot::channel(); let authenticator_none_rx = authenticator_none_rx.shared(); - // We can only send the Frontegg and None variants immediately. + // We can only send the Frontegg, OIDC, and None variants immediately. // The Password variant requires an adapter client. if let Some(frontegg) = &config.frontegg { authenticator_frontegg_tx .send(Arc::new(Authenticator::Frontegg(frontegg.clone()))) .expect("rx known to be live"); } + if let Some(oidc) = &config.oidc { + authenticator_oidc_tx + .send(Arc::new(Authenticator::Oidc(oidc.clone()))) + .expect("rx known to be live"); + } authenticator_none_tx .send(Arc::new(Authenticator::None)) .expect("rx known to be live"); @@ -406,6 +419,7 @@ impl Listeners { AuthenticatorKind::Frontegg => authenticator_frontegg_rx.clone(), AuthenticatorKind::Password => authenticator_password_rx.clone(), AuthenticatorKind::Sasl => authenticator_password_rx.clone(), + AuthenticatorKind::Oidc => authenticator_oidc_rx.clone(), AuthenticatorKind::None => authenticator_none_rx.clone(), }; let source: &'static str = Box::leak(name.clone().into_boxed_str()); @@ -826,6 +840,7 @@ impl Listeners { active_connection_counter.clone(), tls_reloading_context.clone(), config.frontegg.clone(), + config.oidc.clone(), adapter_client.clone(), metrics.clone(), config.helm_chart_version.clone(), diff --git a/src/materialized/ci/listener_configs/oidc.json b/src/materialized/ci/listener_configs/oidc.json new file mode 100644 index 0000000000000..28cc5ca0cc353 --- /dev/null +++ b/src/materialized/ci/listener_configs/oidc.json @@ -0,0 +1,38 @@ +{ + "sql": { + "external": { + "addr": "0.0.0.0:6875", + "authenticator_kind": "Oidc", + "allowed_roles": "NormalAndInternal", + "enable_tls": false + } + }, + "http": { + "external": { + "addr": "0.0.0.0:6876", + "authenticator_kind": "Oidc", + "allowed_roles": "NormalAndInternal", + "enable_tls": false, + "routes": { + "base": true, + "webhook": true, + "internal": true, + "metrics": false, + "profiling": true + } + }, + "metrics": { + "addr": "0.0.0.0:6878", + "authenticator_kind": "None", + "allowed_roles": "NormalAndInternal", + "enable_tls": false, + "routes": { + "base": false, + "webhook": false, + "internal": false, + "metrics": true, + "profiling": false + } + } + } +} diff --git a/src/pgwire/src/protocol.rs b/src/pgwire/src/protocol.rs index c6a3d68301a99..86230567d3307 100644 --- a/src/pgwire/src/protocol.rs +++ b/src/pgwire/src/protocol.rs @@ -480,6 +480,73 @@ where let auth_session = pending().right_future(); (session, auth_session) } + Authenticator::Oidc(oidc) => { + tracing::info!("OIDC authentication"); + // OIDC authentication: JWT sent as password in cleartext flow + conn.send(BackendMessage::AuthenticationCleartextPassword) + .await?; + conn.flush().await?; + + let jwt = match conn.recv().await? { + Some(FrontendMessage::RawAuthentication(data)) => { + match decode_password(Cursor::new(&data)).ok() { + Some(FrontendMessage::Password { password }) => password, + _ => { + return conn + .send(ErrorResponse::fatal( + SqlState::INVALID_AUTHORIZATION_SPECIFICATION, + "expected Password message", + )) + .await; + } + } + } + _ => { + return conn + .send(ErrorResponse::fatal( + SqlState::INVALID_AUTHORIZATION_SPECIFICATION, + "expected Password message", + )) + .await; + } + }; + + tracing::info!("JWT: {}", jwt); + + // Validate JWT + let claims = match oidc.validate_token(&jwt).await { + Ok(claims) => claims, + Err(e) => { + warn!(?e, "OIDC authentication failed"); + return conn + .send(ErrorResponse::fatal( + SqlState::INVALID_PASSWORD, + "invalid token", + )) + .await; + } + }; + + // Use username from claims, or fall back to the user provided in startup + let session_user = if user.is_empty() { + claims.username().to_string() + } else { + user + }; + + let session = adapter_client.new_session(SessionConfig { + conn_id: conn.conn_id().clone(), + uuid: conn_uuid, + user: session_user, + client_ip: conn.peer_addr().clone(), + external_metadata_rx: None, + internal_user_metadata: None, + helm_chart_version, + }); + // No session expiry for OIDC - session lasts until connection closes. + let auth_session = pending().right_future(); + (session, auth_session) + } Authenticator::None => { let session = adapter_client.new_session(SessionConfig { conn_id: conn.conn_id().clone(), diff --git a/src/server-core/src/listeners.rs b/src/server-core/src/listeners.rs index 0bdb1ce75ba4f..a10775e6e0d8a 100644 --- a/src/server-core/src/listeners.rs +++ b/src/server-core/src/listeners.rs @@ -22,6 +22,8 @@ pub enum AuthenticatorKind { Password, /// Authenticate users using SASL. Sasl, + /// Authenticate users using OIDC (JWT tokens). + Oidc, /// Do not authenticate users. Trust they are who they say they are without verification. #[default] None,