diff --git a/apps/labrinth/src/database/models/categories.rs b/apps/labrinth/src/database/models/categories.rs index 05666793c3..18a18b83e0 100644 --- a/apps/labrinth/src/database/models/categories.rs +++ b/apps/labrinth/src/database/models/categories.rs @@ -93,14 +93,16 @@ impl Category { where E: sqlx::Executor<'a, Database = sqlx::Postgres>, { - let mut redis = redis.connect().await?; + { + let mut redis = redis.connect().await?; - let res: Option> = redis - .get_deserialized_from_json(TAGS_NAMESPACE, "category") - .await?; + let res: Option> = redis + .get_deserialized_from_json(TAGS_NAMESPACE, "category") + .await?; - if let Some(res) = res { - return Ok(res); + if let Some(res) = res { + return Ok(res); + } } let result = sqlx::query!( @@ -122,6 +124,8 @@ impl Category { .try_collect::>() .await?; + let mut redis = redis.connect().await?; + redis .set_serialized_to_json(TAGS_NAMESPACE, "category", &result, None) .await?; @@ -158,14 +162,16 @@ impl LinkPlatform { where E: sqlx::Executor<'a, Database = sqlx::Postgres>, { - let mut redis = redis.connect().await?; + { + let mut redis = redis.connect().await?; - let res: Option> = redis - .get_deserialized_from_json(TAGS_NAMESPACE, "link_platform") - .await?; + let res: Option> = redis + .get_deserialized_from_json(TAGS_NAMESPACE, "link_platform") + .await?; - if let Some(res) = res { - return Ok(res); + if let Some(res) = res { + return Ok(res); + } } let result = sqlx::query!( @@ -182,6 +188,8 @@ impl LinkPlatform { .try_collect::>() .await?; + let mut redis = redis.connect().await?; + redis .set_serialized_to_json( TAGS_NAMESPACE, @@ -223,14 +231,16 @@ impl ReportType { where E: sqlx::Executor<'a, Database = sqlx::Postgres>, { - let mut redis = redis.connect().await?; + { + let mut redis = redis.connect().await?; - let res: Option> = redis - .get_deserialized_from_json(TAGS_NAMESPACE, "report_type") - .await?; + let res: Option> = redis + .get_deserialized_from_json(TAGS_NAMESPACE, "report_type") + .await?; - if let Some(res) = res { - return Ok(res); + if let Some(res) = res { + return Ok(res); + } } let result = sqlx::query!( @@ -243,6 +253,8 @@ impl ReportType { .try_collect::>() .await?; + let mut redis = redis.connect().await?; + redis .set_serialized_to_json( TAGS_NAMESPACE, @@ -284,14 +296,16 @@ impl ProjectType { where E: sqlx::Executor<'a, Database = sqlx::Postgres>, { - let mut redis = redis.connect().await?; + { + let mut redis = redis.connect().await?; - let res: Option> = redis - .get_deserialized_from_json(TAGS_NAMESPACE, "project_type") - .await?; + let res: Option> = redis + .get_deserialized_from_json(TAGS_NAMESPACE, "project_type") + .await?; - if let Some(res) = res { - return Ok(res); + if let Some(res) = res { + return Ok(res); + } } let result = sqlx::query!( @@ -304,6 +318,8 @@ impl ProjectType { .try_collect::>() .await?; + let mut redis = redis.connect().await?; + redis .set_serialized_to_json( TAGS_NAMESPACE, diff --git a/apps/labrinth/src/database/models/loader_fields.rs b/apps/labrinth/src/database/models/loader_fields.rs index 32f836c886..db84cb757e 100644 --- a/apps/labrinth/src/database/models/loader_fields.rs +++ b/apps/labrinth/src/database/models/loader_fields.rs @@ -50,12 +50,14 @@ impl Game { where E: sqlx::Executor<'a, Database = sqlx::Postgres>, { - let mut redis = redis.connect().await?; - let cached_games: Option> = redis - .get_deserialized_from_json(GAMES_LIST_NAMESPACE, "games") - .await?; - if let Some(cached_games) = cached_games { - return Ok(cached_games); + { + let mut redis = redis.connect().await?; + let cached_games: Option> = redis + .get_deserialized_from_json(GAMES_LIST_NAMESPACE, "games") + .await?; + if let Some(cached_games) = cached_games { + return Ok(cached_games); + } } let result = sqlx::query!( @@ -74,6 +76,8 @@ impl Game { .try_collect::>() .await?; + let mut redis = redis.connect().await?; + redis .set_serialized_to_json( GAMES_LIST_NAMESPACE, @@ -106,11 +110,13 @@ impl Loader { where E: sqlx::Executor<'a, Database = sqlx::Postgres>, { - let mut redis = redis.connect().await?; - let cached_id: Option = - redis.get_deserialized_from_json(LOADER_ID, name).await?; - if let Some(cached_id) = cached_id { - return Ok(Some(LoaderId(cached_id))); + { + let mut redis = redis.connect().await?; + let cached_id: Option = + redis.get_deserialized_from_json(LOADER_ID, name).await?; + if let Some(cached_id) = cached_id { + return Ok(Some(LoaderId(cached_id))); + } } let result = sqlx::query!( @@ -125,6 +131,7 @@ impl Loader { .map(|r| LoaderId(r.id)); if let Some(result) = result { + let mut redis = redis.connect().await?; redis .set_serialized_to_json(LOADER_ID, name, &result.0, None) .await?; @@ -140,12 +147,14 @@ impl Loader { where E: sqlx::Executor<'a, Database = sqlx::Postgres>, { - let mut redis = redis.connect().await?; - let cached_loaders: Option> = redis - .get_deserialized_from_json(LOADERS_LIST_NAMESPACE, "all") - .await?; - if let Some(cached_loaders) = cached_loaders { - return Ok(cached_loaders); + { + let mut redis = redis.connect().await?; + let cached_loaders: Option> = redis + .get_deserialized_from_json(LOADERS_LIST_NAMESPACE, "all") + .await?; + if let Some(cached_loaders) = cached_loaders { + return Ok(cached_loaders); + } } let result = sqlx::query!( @@ -180,6 +189,8 @@ impl Loader { .try_collect::>() .await?; + let mut redis = redis.connect().await?; + redis .set_serialized_to_json( LOADERS_LIST_NAMESPACE, @@ -455,15 +466,17 @@ impl LoaderField { where E: sqlx::Executor<'a, Database = sqlx::Postgres>, { - let mut redis = redis.connect().await?; + { + let mut redis = redis.connect().await?; - let cached_fields: Option> = redis - .get(LOADER_FIELDS_NAMESPACE_ALL, "") - .await? - .and_then(|x| serde_json::from_str::>(&x).ok()); + let cached_fields: Option> = + redis.get(LOADER_FIELDS_NAMESPACE_ALL, "").await?.and_then( + |x| serde_json::from_str::>(&x).ok(), + ); - if let Some(cached_fields) = cached_fields { - return Ok(cached_fields); + if let Some(cached_fields) = cached_fields { + return Ok(cached_fields); + } } let result = sqlx::query!( @@ -489,6 +502,8 @@ impl LoaderField { .flatten() .collect(); + let mut redis = redis.connect().await?; + redis .set_serialized_to_json( LOADER_FIELDS_NAMESPACE_ALL, @@ -510,16 +525,18 @@ impl LoaderFieldEnum { where E: sqlx::Executor<'a, Database = sqlx::Postgres>, { - let mut redis = redis.connect().await?; + { + let mut redis = redis.connect().await?; - let cached_enum = redis - .get_deserialized_from_json( - LOADER_FIELD_ENUMS_ID_NAMESPACE, - enum_name, - ) - .await?; - if let Some(cached_enum) = cached_enum { - return Ok(cached_enum); + let cached_enum = redis + .get_deserialized_from_json( + LOADER_FIELD_ENUMS_ID_NAMESPACE, + enum_name, + ) + .await?; + if let Some(cached_enum) = cached_enum { + return Ok(cached_enum); + } } let result = sqlx::query!( @@ -540,6 +557,8 @@ impl LoaderFieldEnum { hidable: l.hidable, }); + let mut redis = redis.connect().await?; + redis .set_serialized_to_json( LOADER_FIELD_ENUMS_ID_NAMESPACE, diff --git a/apps/labrinth/src/database/models/mod.rs b/apps/labrinth/src/database/models/mod.rs index 6315fc45cf..5e86a67333 100644 --- a/apps/labrinth/src/database/models/mod.rs +++ b/apps/labrinth/src/database/models/mod.rs @@ -67,6 +67,13 @@ pub enum DatabaseError { SerdeCacheError(#[from] serde_json::Error), #[error("Schema error: {0}")] SchemaError(String), - #[error("Timeout when waiting for cache subscriber")] - CacheTimeout, + #[error( + "Timeout waiting on Redis cache lock ({locks_released}/{locks_waiting} released, spent {time_spent_pool_wait_ms}ms/{time_spent_total_ms}ms waiting on connections from pool)" + )] + CacheTimeout { + locks_released: usize, + locks_waiting: usize, + time_spent_pool_wait_ms: u64, + time_spent_total_ms: u64, + }, } diff --git a/apps/labrinth/src/database/models/notification_item.rs b/apps/labrinth/src/database/models/notification_item.rs index ce3d627d51..f3cedf4771 100644 --- a/apps/labrinth/src/database/models/notification_item.rs +++ b/apps/labrinth/src/database/models/notification_item.rs @@ -380,17 +380,19 @@ impl DBNotification { where E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy, { - let mut redis = redis.connect().await?; - - let cached_notifications: Option> = redis - .get_deserialized_from_json( - USER_NOTIFICATIONS_NAMESPACE, - &user_id.0.to_string(), - ) - .await?; + { + let mut redis = redis.connect().await?; + + let cached_notifications: Option> = redis + .get_deserialized_from_json( + USER_NOTIFICATIONS_NAMESPACE, + &user_id.0.to_string(), + ) + .await?; - if let Some(notifications) = cached_notifications { - return Ok(notifications); + if let Some(notifications) = cached_notifications { + return Ok(notifications); + } } let db_notifications = sqlx::query!( @@ -437,6 +439,8 @@ impl DBNotification { .try_collect::>() .await?; + let mut redis = redis.connect().await?; + redis .set_serialized_to_json( USER_NOTIFICATIONS_NAMESPACE, diff --git a/apps/labrinth/src/database/models/notifications_template_item.rs b/apps/labrinth/src/database/models/notifications_template_item.rs index b75fe770d9..4dafb395c1 100644 --- a/apps/labrinth/src/database/models/notifications_template_item.rs +++ b/apps/labrinth/src/database/models/notifications_template_item.rs @@ -52,14 +52,19 @@ impl NotificationTemplate { exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>, redis: &RedisPool, ) -> Result, DatabaseError> { - let mut redis = redis.connect().await?; - - let maybe_cached_templates = redis - .get_deserialized_from_json(TEMPLATES_NAMESPACE, channel.as_str()) - .await?; - - if let Some(cached) = maybe_cached_templates { - return Ok(cached); + { + let mut redis = redis.connect().await?; + + let maybe_cached_templates = redis + .get_deserialized_from_json( + TEMPLATES_NAMESPACE, + channel.as_str(), + ) + .await?; + + if let Some(cached) = maybe_cached_templates { + return Ok(cached); + } } let results = sqlx::query_as!( @@ -74,6 +79,8 @@ impl NotificationTemplate { let templates = results.into_iter().map(Into::into).collect(); + let mut redis = redis.connect().await?; + redis .set_serialized_to_json( TEMPLATES_NAMESPACE, diff --git a/apps/labrinth/src/database/models/notifications_type_item.rs b/apps/labrinth/src/database/models/notifications_type_item.rs index 4602fc0ef3..44528e796d 100644 --- a/apps/labrinth/src/database/models/notifications_type_item.rs +++ b/apps/labrinth/src/database/models/notifications_type_item.rs @@ -39,14 +39,16 @@ impl NotificationTypeItem { where E: sqlx::Executor<'a, Database = sqlx::Postgres>, { - let mut redis = redis.connect().await?; + { + let mut redis = redis.connect().await?; - let cached_types = redis - .get_deserialized_from_json(NOTIFICATION_TYPES_NAMESPACE, "all") - .await?; + let cached_types = redis + .get_deserialized_from_json(NOTIFICATION_TYPES_NAMESPACE, "all") + .await?; - if let Some(types) = cached_types { - return Ok(types); + if let Some(types) = cached_types { + return Ok(types); + } } let results = sqlx::query_as!( @@ -58,6 +60,8 @@ impl NotificationTypeItem { let types = results.into_iter().map(Into::into).collect(); + let mut redis = redis.connect().await?; + redis .set_serialized_to_json( NOTIFICATION_TYPES_NAMESPACE, diff --git a/apps/labrinth/src/database/models/pat_item.rs b/apps/labrinth/src/database/models/pat_item.rs index 5f760d579d..735f7efd76 100644 --- a/apps/labrinth/src/database/models/pat_item.rs +++ b/apps/labrinth/src/database/models/pat_item.rs @@ -156,17 +156,19 @@ impl DBPersonalAccessToken { where E: sqlx::Executor<'a, Database = sqlx::Postgres>, { - let mut redis = redis.connect().await?; - - let res = redis - .get_deserialized_from_json::>( - PATS_USERS_NAMESPACE, - &user_id.0.to_string(), - ) - .await?; - - if let Some(res) = res { - return Ok(res.into_iter().map(DBPatId).collect()); + { + let mut redis = redis.connect().await?; + + let res = redis + .get_deserialized_from_json::>( + PATS_USERS_NAMESPACE, + &user_id.0.to_string(), + ) + .await?; + + if let Some(res) = res { + return Ok(res.into_iter().map(DBPatId).collect()); + } } let db_pats: Vec = sqlx::query!( @@ -183,6 +185,8 @@ impl DBPersonalAccessToken { .try_collect::>() .await?; + let mut redis = redis.connect().await?; + redis .set( PATS_USERS_NAMESPACE, diff --git a/apps/labrinth/src/database/models/product_item.rs b/apps/labrinth/src/database/models/product_item.rs index 0936ebe51f..e3c8a620ef 100644 --- a/apps/labrinth/src/database/models/product_item.rs +++ b/apps/labrinth/src/database/models/product_item.rs @@ -150,14 +150,16 @@ impl QueryProductWithPrices { where E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy, { - let mut redis = redis.connect().await?; + { + let mut redis = redis.connect().await?; - let res: Option> = redis - .get_deserialized_from_json(PRODUCTS_NAMESPACE, "all") - .await?; + let res: Option> = redis + .get_deserialized_from_json(PRODUCTS_NAMESPACE, "all") + .await?; - if let Some(res) = res { - return Ok(res); + if let Some(res) = res { + return Ok(res); + } } let all_products = product_item::DBProduct::get_all(exec).await?; @@ -191,6 +193,8 @@ impl QueryProductWithPrices { }) .collect::>(); + let mut redis = redis.connect().await?; + redis .set_serialized_to_json(PRODUCTS_NAMESPACE, "all", &products, None) .await?; diff --git a/apps/labrinth/src/database/models/project_item.rs b/apps/labrinth/src/database/models/project_item.rs index 3ed30b1741..717ebf7e47 100644 --- a/apps/labrinth/src/database/models/project_item.rs +++ b/apps/labrinth/src/database/models/project_item.rs @@ -893,16 +893,18 @@ impl DBProject { Option, )>; - let mut redis = redis.connect().await?; + { + let mut redis = redis.connect().await?; - let dependencies = redis - .get_deserialized_from_json::( - PROJECTS_DEPENDENCIES_NAMESPACE, - &id.0.to_string(), - ) - .await?; - if let Some(dependencies) = dependencies { - return Ok(dependencies); + let dependencies = redis + .get_deserialized_from_json::( + PROJECTS_DEPENDENCIES_NAMESPACE, + &id.0.to_string(), + ) + .await?; + if let Some(dependencies) = dependencies { + return Ok(dependencies); + } } let dependencies: Dependencies = sqlx::query!( @@ -930,6 +932,8 @@ impl DBProject { .try_collect::() .await?; + let mut redis = redis.connect().await?; + redis .set_serialized_to_json( PROJECTS_DEPENDENCIES_NAMESPACE, diff --git a/apps/labrinth/src/database/models/session_item.rs b/apps/labrinth/src/database/models/session_item.rs index 0203d52e6e..2b1bac1cb7 100644 --- a/apps/labrinth/src/database/models/session_item.rs +++ b/apps/labrinth/src/database/models/session_item.rs @@ -209,17 +209,19 @@ impl DBSession { where E: sqlx::Executor<'a, Database = sqlx::Postgres>, { - let mut redis = redis.connect().await?; + { + let mut redis = redis.connect().await?; - let res = redis - .get_deserialized_from_json::>( - SESSIONS_USERS_NAMESPACE, - &user_id.0.to_string(), - ) - .await?; + let res = redis + .get_deserialized_from_json::>( + SESSIONS_USERS_NAMESPACE, + &user_id.0.to_string(), + ) + .await?; - if let Some(res) = res { - return Ok(res.into_iter().map(DBSessionId).collect()); + if let Some(res) = res { + return Ok(res.into_iter().map(DBSessionId).collect()); + } } use futures::TryStreamExt; @@ -237,6 +239,8 @@ impl DBSession { .try_collect::>() .await?; + let mut redis = redis.connect().await?; + redis .set_serialized_to_json( SESSIONS_USERS_NAMESPACE, diff --git a/apps/labrinth/src/database/models/user_item.rs b/apps/labrinth/src/database/models/user_item.rs index 18e4e7f7dd..39336b6657 100644 --- a/apps/labrinth/src/database/models/user_item.rs +++ b/apps/labrinth/src/database/models/user_item.rs @@ -299,17 +299,19 @@ impl DBUser { { use futures::stream::TryStreamExt; - let mut redis = redis.connect().await?; + { + let mut redis = redis.connect().await?; - let cached_projects = redis - .get_deserialized_from_json::>( - USERS_PROJECTS_NAMESPACE, - &user_id.0.to_string(), - ) - .await?; + let cached_projects = redis + .get_deserialized_from_json::>( + USERS_PROJECTS_NAMESPACE, + &user_id.0.to_string(), + ) + .await?; - if let Some(projects) = cached_projects { - return Ok(projects); + if let Some(projects) = cached_projects { + return Ok(projects); + } } let db_projects = sqlx::query!( @@ -326,6 +328,8 @@ impl DBUser { .try_collect::>() .await?; + let mut redis = redis.connect().await?; + redis .set_serialized_to_json( USERS_PROJECTS_NAMESPACE, diff --git a/apps/labrinth/src/database/postgres_database.rs b/apps/labrinth/src/database/postgres_database.rs index 5be981315a..0dd86d4cde 100644 --- a/apps/labrinth/src/database/postgres_database.rs +++ b/apps/labrinth/src/database/postgres_database.rs @@ -35,7 +35,21 @@ pub async fn connect_all() -> Result<(PgPool, ReadOnlyPgPool), sqlx::Error> { info!("Initializing database connection"); let database_url = dotenvy::var("DATABASE_URL").expect("`DATABASE_URL` not in .env"); + + let acquire_timeout = + dotenvy::var("DATABASE_ACQUIRE_TIMEOUT_MS") + .ok() + .map_or_else( + || Duration::from_millis(30000), + |x| { + Duration::from_millis(x.parse::().expect( + "DATABASE_ACQUIRE_TIMEOUT_MS must be a valid u64", + )) + }, + ); + let pool = PgPoolOptions::new() + .acquire_timeout(acquire_timeout) .min_connections( dotenvy::var("DATABASE_MIN_CONNECTIONS") .ok() @@ -54,6 +68,7 @@ pub async fn connect_all() -> Result<(PgPool, ReadOnlyPgPool), sqlx::Error> { if let Ok(url) = dotenvy::var("READONLY_DATABASE_URL") { let ro_pool = PgPoolOptions::new() + .acquire_timeout(acquire_timeout) .min_connections( dotenvy::var("READONLY_DATABASE_MIN_CONNECTIONS") .ok() diff --git a/apps/labrinth/src/database/redis.rs b/apps/labrinth/src/database/redis.rs index 5d299f8a10..4dd570f1b9 100644 --- a/apps/labrinth/src/database/redis.rs +++ b/apps/labrinth/src/database/redis.rs @@ -16,6 +16,7 @@ use std::fmt::{Debug, Display}; use std::future::Future; use std::hash::Hash; use std::time::Duration; +use std::time::Instant; const DEFAULT_EXPIRY: i64 = 60 * 60 * 12; // 12 hours const ACTUAL_EXPIRY: i64 = 60 * 30; // 30 minutes @@ -37,6 +38,18 @@ impl RedisPool { // testing pool uses a hashmap to mimic redis behaviour for very small data sizes (ie: tests) // PANICS: production pool will panic if redis url is not set pub fn new(meta_namespace: Option) -> Self { + let wait_timeout = + dotenvy::var("REDIS_WAIT_TIMEOUT_MS").ok().map_or_else( + || Duration::from_millis(15000), + |x| { + Duration::from_millis( + x.parse::().expect( + "REDIS_WAIT_TIMEOUT_MS must be a valid u64", + ), + ) + }, + ); + let url = dotenvy::var("REDIS_URL").expect("Redis URL not set"); let pool = Config::from_url(url.clone()) .builder() @@ -47,15 +60,30 @@ impl RedisPool { .and_then(|x| x.parse().ok()) .unwrap_or(10000), ) + .wait_timeout(Some(wait_timeout)) .runtime(Runtime::Tokio1) .build() .expect("Redis connection failed"); - RedisPool { + let pool = RedisPool { url, pool, meta_namespace: meta_namespace.unwrap_or("".to_string()), - } + }; + + let interval = Duration::from_secs(30); + let max_age = Duration::from_secs(5 * 60); // 5 minutes + let pool_ref = pool.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(interval).await; + pool_ref + .pool + .retain(|_, metrics| metrics.last_used() < max_age); + } + }); + + pool } pub async fn register_and_set_metrics( @@ -483,12 +511,15 @@ impl RedisPool { if !subscribe_ids.is_empty() { fetch_tasks.push(Either::Right(async { - let mut interval = - tokio::time::interval(Duration::from_millis(100)); + let mut wait_time_ms = 50; let start = Utc::now(); + let mut redis_budget = Duration::ZERO; + loop { let results = { + let acquire_start = Instant::now(); let mut connection = self.pool.get().await?; + redis_budget += acquire_start.elapsed(); cmd("MGET") .arg( subscribe_ids @@ -507,15 +538,31 @@ impl RedisPool { .await? }; - if results.into_iter().all(|x| x.is_none()) { + let exist_count = + results.into_iter().filter(|x| x.is_some()).count(); + + // None of the locks exist anymore, we can continue + if exist_count == 0 { break; } - if (Utc::now() - start) > chrono::Duration::seconds(5) { - return Err(DatabaseError::CacheTimeout); + let spinning = Utc::now() - start; + if spinning > chrono::Duration::seconds(5) { + return Err(DatabaseError::CacheTimeout { + locks_released: subscribe_ids.len() - exist_count, + locks_waiting: subscribe_ids.len(), + time_spent_pool_wait_ms: redis_budget.as_millis() + as u64, + time_spent_total_ms: spinning + .num_milliseconds() + .max(0) + as u64, + }); } - interval.tick().await; + tokio::time::sleep(Duration::from_millis(wait_time_ms)) + .await; + wait_time_ms *= 2; // 50, 100, 200, 400, 800, 1600, 3200 } let (return_values, _) =