diff --git a/pgdog/src/frontend/router/parser/cache/ast.rs b/pgdog/src/frontend/router/parser/cache/ast.rs index 2be0e0fb..c80c560e 100644 --- a/pgdog/src/frontend/router/parser/cache/ast.rs +++ b/pgdog/src/frontend/router/parser/cache/ast.rs @@ -7,9 +7,7 @@ use std::{collections::HashSet, ops::Deref}; use parking_lot::Mutex; use std::sync::Arc; -use super::super::{ - comment::comment, Error, Route, Shard, StatementRewrite, StatementRewriteContext, Table, -}; +use super::super::{Error, Route, Shard, StatementRewrite, StatementRewriteContext, Table}; use super::{Fingerprint, Stats}; use crate::backend::schema::Schema; use crate::frontend::router::parser::rewrite::statement::RewritePlan; @@ -72,6 +70,8 @@ impl Ast { schema: &ShardingSchema, db_schema: &Schema, prepared_statements: &mut PreparedStatements, + comment_shard: Option, + comment_role: Option, user: &str, search_path: Option<&ParameterValue>, ) -> Result { @@ -81,7 +81,6 @@ impl Ast { QueryParserEngine::PgQueryRaw => parse_raw(query), } .map_err(Error::PgQuery)?; - let (comment_shard, comment_role) = comment(query, schema)?; let fingerprint = Fingerprint::new(query, schema.query_parser_engine).map_err(Error::PgQuery)?; @@ -125,12 +124,16 @@ impl Ast { query: &BufferedQuery, ctx: &super::AstContext<'_>, prepared_statements: &mut PreparedStatements, + shard: Option, + role: Option, ) -> Result { Self::new( query, &ctx.sharding_schema, &ctx.db_schema, prepared_statements, + shard, + role, ctx.user, ctx.search_path, ) diff --git a/pgdog/src/frontend/router/parser/cache/cache_impl.rs b/pgdog/src/frontend/router/parser/cache/cache_impl.rs index 4c50e61f..cd8aac8b 100644 --- a/pgdog/src/frontend/router/parser/cache/cache_impl.rs +++ b/pgdog/src/frontend/router/parser/cache/cache_impl.rs @@ -2,8 +2,10 @@ use lru::LruCache; use once_cell::sync::Lazy; use pg_query::normalize; use pgdog_config::QueryParserEngine; +use std::borrow::Cow; use std::collections::HashMap; use std::time::Duration; +use tracing_subscriber::filter::Filtered; use parking_lot::Mutex; use std::sync::Arc; @@ -11,6 +13,7 @@ use tracing::debug; use super::super::{Error, Route}; use super::{Ast, AstContext}; +use crate::frontend::router::parser::comment; use crate::frontend::{BufferedQuery, PreparedStatements}; static CACHE: Lazy = Lazy::new(Cache::new); @@ -97,6 +100,10 @@ impl Cache { /// Parse a statement by either getting it from cache /// or using pg_query parser. /// + /// In the event of cache miss, we retry after removing all comments except + /// for pgdog metadata. We retain it for correctness, since a query with + /// that metadata must not map to an identical query without it. + /// /// N.B. There is a race here that allows multiple threads to /// parse the same query. That's better imo than locking the data structure /// while we parse the query. @@ -118,12 +125,38 @@ impl Cache { } } + let (maybe_shard, maybe_role, maybe_filtered_query) = + comment::parse_comment(&query, &ctx.sharding_schema)?; + + let query_to_cache: Cow<'_, str>; + + if let Some(filtered_query) = maybe_filtered_query { + query_to_cache = Cow::Owned(filtered_query); + + // Check cache again after removing comments from query + let mut guard = self.inner.lock(); + + let ast = guard.queries.get_mut(&*query_to_cache).map(|entry| { + entry.stats.lock().hits += 1; + entry.clone() + }); + + if let Some(ast) = ast { + guard.stats.hits += 1; + return Ok(ast); + } + } else { + query_to_cache = Cow::Borrowed(query.query()); + } + // Parse query without holding lock. - let entry = Ast::with_context(query, ctx, prepared_statements)?; + let entry = Ast::with_context(query, ctx, prepared_statements, maybe_shard, maybe_role)?; let parse_time = entry.stats.lock().parse_time; let mut guard = self.inner.lock(); - guard.queries.put(query.query().to_string(), entry.clone()); + guard + .queries + .put(query_to_cache.into_owned(), entry.clone()); guard.stats.misses += 1; guard.stats.parse_time += parse_time; @@ -138,7 +171,10 @@ impl Cache { ctx: &AstContext<'_>, prepared_statements: &mut PreparedStatements, ) -> Result { - let mut entry = Ast::with_context(query, ctx, prepared_statements)?; + let (maybe_shard, maybe_role, _) = comment::parse_comment(&query, &ctx.sharding_schema)?; + + let mut entry = + Ast::with_context(query, ctx, prepared_statements, maybe_shard, maybe_role)?; entry.cached = false; let parse_time = entry.stats.lock().parse_time; diff --git a/pgdog/src/frontend/router/parser/comment.rs b/pgdog/src/frontend/router/parser/comment.rs index a87883ad..b65968d8 100644 --- a/pgdog/src/frontend/router/parser/comment.rs +++ b/pgdog/src/frontend/router/parser/comment.rs @@ -1,4 +1,5 @@ use once_cell::sync::Lazy; +use pg_query::protobuf::ScanToken; use pg_query::scan_raw; use pg_query::{protobuf::Token, scan}; use pgdog_config::QueryParserEngine; @@ -11,11 +12,12 @@ use crate::frontend::router::sharding::ContextBuilder; use super::super::parser::Shard; use super::Error; -static SHARD: Lazy = Lazy::new(|| Regex::new(r#"pgdog_shard: *([0-9]+)"#).unwrap()); -static SHARDING_KEY: Lazy = Lazy::new(|| { +pub static SHARD: Lazy = Lazy::new(|| Regex::new(r#"pgdog_shard: *([0-9]+)"#).unwrap()); +pub static SHARDING_KEY: Lazy = Lazy::new(|| { Regex::new(r#"pgdog_sharding_key: *(?:"([^"]*)"|'([^']*)'|([0-9a-zA-Z-]+))"#).unwrap() }); -static ROLE: Lazy = Lazy::new(|| Regex::new(r#"pgdog_role: *(primary|replica)"#).unwrap()); +pub static ROLE: Lazy = + Lazy::new(|| Regex::new(r#"pgdog_role: *(primary|replica)"#).unwrap()); fn get_matched_value<'a>(caps: &'a regex::Captures<'a>) -> Option<&'a str> { caps.get(1) @@ -24,23 +26,26 @@ fn get_matched_value<'a>(caps: &'a regex::Captures<'a>) -> Option<&'a str> { .map(|m| m.as_str()) } -/// Extract shard number from a comment. +/// Extract shard number from a comment. Additionally returns the entire +/// comment string if it exists. /// -/// Comment style uses the C-style comments (not SQL comments!) +/// Comment style for the shard metadata uses the C-style comments (not SQL comments!) /// as to allow the comment to appear anywhere in the query. /// /// See [`SHARD`] and [`SHARDING_KEY`] for the style of comment we expect. /// -pub fn comment( +pub fn parse_comment( query: &str, schema: &ShardingSchema, -) -> Result<(Option, Option), Error> { +) -> Result<(Option, Option, Option), Error> { let tokens = match schema.query_parser_engine { QueryParserEngine::PgQueryProtobuf => scan(query), QueryParserEngine::PgQueryRaw => scan_raw(query), } .map_err(Error::PgQuery)?; + let mut shard = None; let mut role = None; + let mut filtered_query = None; for token in tokens.tokens.iter() { if token.token == Token::CComment as i32 { @@ -57,33 +62,95 @@ pub fn comment( if let Some(cap) = SHARDING_KEY.captures(comment) { if let Some(sharding_key) = get_matched_value(&cap) { if let Some(schema) = schema.schemas.get(Some(sharding_key.into())) { - return Ok((Some(schema.shard().into()), role)); + shard = Some(schema.shard().into()); + } else { + let ctx = ContextBuilder::infer_from_from_and_config(sharding_key, schema)? + .shards(schema.shards) + .build()?; + shard = Some(ctx.apply()?); } - let ctx = ContextBuilder::infer_from_from_and_config(sharding_key, schema)? - .shards(schema.shards) - .build()?; - return Ok((Some(ctx.apply()?), role)); } } if let Some(cap) = SHARD.captures(comment) { - if let Some(shard) = cap.get(1) { - return Ok(( - Some( - shard - .as_str() - .parse::() - .ok() - .map(Shard::Direct) - .unwrap_or(Shard::All), - ), - role, - )); + if let Some(s) = cap.get(1) { + shard = Some( + s.as_str() + .parse::() + .ok() + .map(Shard::Direct) + .unwrap_or(Shard::All), + ); } } } } - Ok((None, role)) + if has_comments(&tokens.tokens) { + filtered_query = Some(remove_comments( + query, + &tokens.tokens, + Some(&[&SHARD, &*SHARDING_KEY, &ROLE]), + )?); + } + + Ok((shard, role, filtered_query)) +} + +pub fn has_comments(tokenized_query: &Vec) -> bool { + tokenized_query + .iter() + .any(|st| st.token == Token::CComment as i32 || st.token == Token::SqlComment as i32) +} + +pub fn remove_comments( + query: &str, + tokenized_query: &Vec, + except: Option<&[&Regex]>, +) -> Result { + let mut cursor = 0; + let mut out = String::with_capacity(query.len()); + + for st in tokenized_query { + let start = st.start as usize; + let end = st.end as usize; + + out.push_str(&query[cursor..start]); + + match st.token { + t if t == Token::CComment as i32 => { + let comment = &query[start..end]; + + if let Some(except) = except { + let rewritten = keep_only_matching(comment, except); + + out.push_str(&rewritten); + } + } + _ => { + out.push_str(&query[start..end]); + } + } + + cursor = end; + } + + if cursor < query.len() { + out.push_str(&query[cursor..]); + } + + Ok(out) +} + +fn keep_only_matching(comment: &str, regs: &[&Regex]) -> String { + let mut out = String::new(); + + for reg in regs { + for m in reg.find_iter(comment) { + out.push_str(m.as_str()); + } + } + + out } #[cfg(test)]