Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions pgdog/src/frontend/router/parser/cache/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use std::{collections::HashSet, ops::Deref};
use parking_lot::Mutex;
use std::sync::Arc;

use super::super::{
comment::comment, Error, Route, Shard, StatementRewrite, StatementRewriteContext, Table,
};
use super::super::{Error, Route, Shard, StatementRewrite, StatementRewriteContext, Table};
use super::{Fingerprint, Stats};
use crate::backend::schema::Schema;
use crate::frontend::router::parser::rewrite::statement::RewritePlan;
Expand Down Expand Up @@ -72,6 +70,8 @@ impl Ast {
schema: &ShardingSchema,
db_schema: &Schema,
prepared_statements: &mut PreparedStatements,
comment_shard: Option<Shard>,
comment_role: Option<Role>,
user: &str,
search_path: Option<&ParameterValue>,
) -> Result<Self, Error> {
Expand All @@ -81,7 +81,6 @@ impl Ast {
QueryParserEngine::PgQueryRaw => parse_raw(query),
}
.map_err(Error::PgQuery)?;
let (comment_shard, comment_role) = comment(query, schema)?;
let fingerprint =
Fingerprint::new(query, schema.query_parser_engine).map_err(Error::PgQuery)?;

Expand Down Expand Up @@ -125,12 +124,16 @@ impl Ast {
query: &BufferedQuery,
ctx: &super::AstContext<'_>,
prepared_statements: &mut PreparedStatements,
shard: Option<Shard>,
role: Option<Role>,
) -> Result<Self, Error> {
Self::new(
query,
&ctx.sharding_schema,
&ctx.db_schema,
prepared_statements,
shard,
role,
ctx.user,
ctx.search_path,
)
Expand Down
42 changes: 39 additions & 3 deletions pgdog/src/frontend/router/parser/cache/cache_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ use lru::LruCache;
use once_cell::sync::Lazy;
use pg_query::normalize;
use pgdog_config::QueryParserEngine;
use std::borrow::Cow;
use std::collections::HashMap;
use std::time::Duration;
use tracing_subscriber::filter::Filtered;

use parking_lot::Mutex;
use std::sync::Arc;
use tracing::debug;

use super::super::{Error, Route};
use super::{Ast, AstContext};
use crate::frontend::router::parser::comment;
use crate::frontend::{BufferedQuery, PreparedStatements};

static CACHE: Lazy<Cache> = Lazy::new(Cache::new);
Expand Down Expand Up @@ -97,6 +100,10 @@ impl Cache {
/// Parse a statement by either getting it from cache
/// or using pg_query parser.
///
/// In the event of cache miss, we retry after removing all comments except
/// for pgdog metadata. We retain it for correctness, since a query with
/// that metadata must not map to an identical query without it.
///
/// N.B. There is a race here that allows multiple threads to
/// parse the same query. That's better imo than locking the data structure
/// while we parse the query.
Expand All @@ -118,12 +125,38 @@ impl Cache {
}
}

let (maybe_shard, maybe_role, maybe_filtered_query) =
comment::parse_comment(&query, &ctx.sharding_schema)?;

let query_to_cache: Cow<'_, str>;

if let Some(filtered_query) = maybe_filtered_query {
query_to_cache = Cow::Owned(filtered_query);

// Check cache again after removing comments from query
let mut guard = self.inner.lock();

let ast = guard.queries.get_mut(&*query_to_cache).map(|entry| {
entry.stats.lock().hits += 1;
entry.clone()
});

if let Some(ast) = ast {
guard.stats.hits += 1;
return Ok(ast);
}
} else {
query_to_cache = Cow::Borrowed(query.query());
}

// Parse query without holding lock.
let entry = Ast::with_context(query, ctx, prepared_statements)?;
let entry = Ast::with_context(query, ctx, prepared_statements, maybe_shard, maybe_role)?;
let parse_time = entry.stats.lock().parse_time;

let mut guard = self.inner.lock();
guard.queries.put(query.query().to_string(), entry.clone());
guard
.queries
.put(query_to_cache.into_owned(), entry.clone());
guard.stats.misses += 1;
guard.stats.parse_time += parse_time;

Expand All @@ -138,7 +171,10 @@ impl Cache {
ctx: &AstContext<'_>,
prepared_statements: &mut PreparedStatements,
) -> Result<Ast, Error> {
let mut entry = Ast::with_context(query, ctx, prepared_statements)?;
let (maybe_shard, maybe_role, _) = comment::parse_comment(&query, &ctx.sharding_schema)?;

let mut entry =
Ast::with_context(query, ctx, prepared_statements, maybe_shard, maybe_role)?;
entry.cached = false;

let parse_time = entry.stats.lock().parse_time;
Expand Down
117 changes: 92 additions & 25 deletions pgdog/src/frontend/router/parser/comment.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use once_cell::sync::Lazy;
use pg_query::protobuf::ScanToken;
use pg_query::scan_raw;
use pg_query::{protobuf::Token, scan};
use pgdog_config::QueryParserEngine;
Expand All @@ -11,11 +12,12 @@ use crate::frontend::router::sharding::ContextBuilder;
use super::super::parser::Shard;
use super::Error;

static SHARD: Lazy<Regex> = Lazy::new(|| Regex::new(r#"pgdog_shard: *([0-9]+)"#).unwrap());
static SHARDING_KEY: Lazy<Regex> = Lazy::new(|| {
pub static SHARD: Lazy<Regex> = Lazy::new(|| Regex::new(r#"pgdog_shard: *([0-9]+)"#).unwrap());
pub static SHARDING_KEY: Lazy<Regex> = Lazy::new(|| {
Regex::new(r#"pgdog_sharding_key: *(?:"([^"]*)"|'([^']*)'|([0-9a-zA-Z-]+))"#).unwrap()
});
static ROLE: Lazy<Regex> = Lazy::new(|| Regex::new(r#"pgdog_role: *(primary|replica)"#).unwrap());
pub static ROLE: Lazy<Regex> =
Lazy::new(|| Regex::new(r#"pgdog_role: *(primary|replica)"#).unwrap());

fn get_matched_value<'a>(caps: &'a regex::Captures<'a>) -> Option<&'a str> {
caps.get(1)
Expand All @@ -24,23 +26,26 @@ fn get_matched_value<'a>(caps: &'a regex::Captures<'a>) -> Option<&'a str> {
.map(|m| m.as_str())
}

/// Extract shard number from a comment.
/// Extract shard number from a comment. Additionally returns the entire
/// comment string if it exists.
///
/// Comment style uses the C-style comments (not SQL comments!)
/// Comment style for the shard metadata uses the C-style comments (not SQL comments!)
/// as to allow the comment to appear anywhere in the query.
///
/// See [`SHARD`] and [`SHARDING_KEY`] for the style of comment we expect.
///
pub fn comment(
pub fn parse_comment(
query: &str,
schema: &ShardingSchema,
) -> Result<(Option<Shard>, Option<Role>), Error> {
) -> Result<(Option<Shard>, Option<Role>, Option<String>), Error> {
let tokens = match schema.query_parser_engine {
QueryParserEngine::PgQueryProtobuf => scan(query),
QueryParserEngine::PgQueryRaw => scan_raw(query),
}
.map_err(Error::PgQuery)?;
let mut shard = None;
let mut role = None;
let mut filtered_query = None;

for token in tokens.tokens.iter() {
if token.token == Token::CComment as i32 {
Expand All @@ -57,33 +62,95 @@ pub fn comment(
if let Some(cap) = SHARDING_KEY.captures(comment) {
if let Some(sharding_key) = get_matched_value(&cap) {
if let Some(schema) = schema.schemas.get(Some(sharding_key.into())) {
return Ok((Some(schema.shard().into()), role));
shard = Some(schema.shard().into());
} else {
let ctx = ContextBuilder::infer_from_from_and_config(sharding_key, schema)?
.shards(schema.shards)
.build()?;
shard = Some(ctx.apply()?);
}
let ctx = ContextBuilder::infer_from_from_and_config(sharding_key, schema)?
.shards(schema.shards)
.build()?;
return Ok((Some(ctx.apply()?), role));
}
}
if let Some(cap) = SHARD.captures(comment) {
if let Some(shard) = cap.get(1) {
return Ok((
Some(
shard
.as_str()
.parse::<usize>()
.ok()
.map(Shard::Direct)
.unwrap_or(Shard::All),
),
role,
));
if let Some(s) = cap.get(1) {
shard = Some(
s.as_str()
.parse::<usize>()
.ok()
.map(Shard::Direct)
.unwrap_or(Shard::All),
);
}
}
}
}

Ok((None, role))
if has_comments(&tokens.tokens) {
filtered_query = Some(remove_comments(
query,
&tokens.tokens,
Some(&[&SHARD, &*SHARDING_KEY, &ROLE]),
)?);
}

Ok((shard, role, filtered_query))
}

pub fn has_comments(tokenized_query: &Vec<ScanToken>) -> bool {
tokenized_query
.iter()
.any(|st| st.token == Token::CComment as i32 || st.token == Token::SqlComment as i32)
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that this function is over-generalized and may not need the except parameter.

pub fn remove_comments(
query: &str,
tokenized_query: &Vec<ScanToken>,
except: Option<&[&Regex]>,
) -> Result<String, Error> {
let mut cursor = 0;
let mut out = String::with_capacity(query.len());

for st in tokenized_query {
let start = st.start as usize;
let end = st.end as usize;

out.push_str(&query[cursor..start]);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code involving cursor keeps non token characters (between and at the end of all tokens) like spaces to preserve the original query. We don't want to do anything extra like normalize it or something.


match st.token {
t if t == Token::CComment as i32 => {
let comment = &query[start..end];

if let Some(except) = except {
let rewritten = keep_only_matching(comment, except);

out.push_str(&rewritten);
}
}
_ => {
out.push_str(&query[start..end]);
}
}

cursor = end;
}

if cursor < query.len() {
out.push_str(&query[cursor..]);
}

Ok(out)
}

fn keep_only_matching(comment: &str, regs: &[&Regex]) -> String {
let mut out = String::new();

for reg in regs {
for m in reg.find_iter(comment) {
out.push_str(m.as_str());
}
}

out
}

#[cfg(test)]
Expand Down