From e260bc4e629ddde60302068aeff0a8999bef5e21 Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Sun, 28 Dec 2025 12:33:35 +0530 Subject: [PATCH 1/7] init: fetch affected --- src/handlers/http/logstream.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 604ab9c3a..2eab3b5af 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -494,6 +494,19 @@ pub async fn delete_stream_hot_tier( )) } +pub async fn get_affected(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); + + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if !PARSEABLE.check_or_load_stream(&stream_name).await { + return Err(StreamNotFound(stream_name.clone()).into()); + } + + Ok((web::Json({}), StatusCode::OK)) +} + #[allow(unused)] fn classify_json_error(kind: serde_json::error::Category) -> StatusCode { match kind { From ef3b4a64e3451c71269104f0ed693f3e32b10e86 Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Mon, 29 Dec 2025 01:01:31 +0530 Subject: [PATCH 2/7] feat: get logstream affected resources {filters, dashboards (+ tile_ids)} --- src/handlers/http/logstream.rs | 13 +- src/handlers/http/modal/ingest_server.rs | 8 + src/handlers/http/modal/query_server.rs | 8 + src/handlers/http/modal/server.rs | 8 + .../http/modal/utils/logstream_utils.rs | 155 +++++++++++++++++- src/rbac/role.rs | 2 + 6 files changed, 186 insertions(+), 8 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 2eab3b5af..b18adb7c5 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -20,6 +20,7 @@ use self::error::StreamError; use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats}; use super::query::update_schema_when_distributed; use crate::event::format::override_data_type; +use crate::handlers::http::modal::utils::logstream_utils::LogstreamAffectedResources; use crate::hottier::{CURRENT_HOT_TIER_VERSION, HotTierManager, StreamHotTier}; use crate::metadata::SchemaVersion; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; @@ -494,17 +495,21 @@ pub async fn delete_stream_hot_tier( )) } -pub async fn get_affected(stream_name: Path) -> Result { +pub async fn get_logstream_affected_resources( + stream_name: Path +) -> Result { let stream_name = stream_name.into_inner(); // For query mode, if the stream not found in memory map, - //check if it exists in the storage - //create stream and schema from storage + // check if it exists in the storage + // create stream and schema from storage if !PARSEABLE.check_or_load_stream(&stream_name).await { return Err(StreamNotFound(stream_name.clone()).into()); } - Ok((web::Json({}), StatusCode::OK)) + let affected_resources = LogstreamAffectedResources::load(&stream_name).await; + + Ok((web::Json(affected_resources), StatusCode::OK)) } #[allow(unused)] diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 0440e857c..e3e68d1e7 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -272,6 +272,14 @@ impl IngestServer { .authorize_for_resource(Action::GetStats), ), ) + .service( + // GET "/logstream/{logstream}/affected-resources" ==> Get affected resources for given log stream + web::resource("/affected-resources").route( + web::get() + .to(logstream::get_logstream_affected_resources) + .authorize_for_resource(Action::GetLogstreamAffectedResources), + ), + ) .service( web::scope("/retention").service( web::resource("/cleanup").route( diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index c345d3112..d2c3cb636 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -313,6 +313,14 @@ impl QueryServer { .authorize_for_resource(Action::GetStats), ), ) + .service( + // GET "/logstream/{logstream}/affected-resources" ==> Get affected resources for given log stream + web::resource("/affected-resources").route( + web::get() + .to(logstream::get_logstream_affected_resources) + .authorize_for_resource(Action::GetLogstreamAffectedResources), + ), + ) .service( web::resource("/retention") // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 7b145ebb1..fea083b13 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -488,6 +488,14 @@ impl Server { .authorize_for_resource(Action::GetStats), ), ) + .service( + // GET "/logstream/{logstream}/affected-resources" ==> Get affected resources for given log stream + web::resource("/affected-resources").route( + web::get() + .to(logstream::get_logstream_affected_resources) + .authorize_for_resource(Action::GetLogstreamAffectedResources), + ), + ) .service( web::resource("/retention") // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 533d5d86f..6f65de73b 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -17,17 +17,29 @@ */ use actix_web::http::header::HeaderMap; +use datafusion::common::HashSet; +use ulid::Ulid; use crate::{ - event::format::LogSource, - handlers::{ + event::format::LogSource, handlers::{ CUSTOM_PARTITION_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, UPDATE_STREAM_KEY, - }, - storage::StreamType, + }, + metastore::MetastoreError, + parseable::{ + PARSEABLE, StreamNotFound + }, + storage::StreamType, + users::{ + dashboards::{Dashboard}, + filters::Filter + } }; +/// Field in a dashboard's tile that should contain the logstream name +const TILE_FIELD_REFERRING_TO_STREAM: &str = "chartQuery"; + #[derive(Debug, Default)] pub struct PutStreamHeaders { pub time_partition: String, @@ -74,3 +86,138 @@ impl From<&HeaderMap> for PutStreamHeaders { } } } + +/// Resources that rely on a specific logstream and will be affected if it gets deleted +#[derive(Debug, Default, serde::Serialize)] +pub struct LogstreamAffectedResources { + pub filters: Vec, + pub dashboards: Vec +} + +#[derive(Debug, Default, serde::Serialize)] +pub struct LogstreamAffectedDashboard { + pub dashboard: Dashboard, + pub affected_tile_ids: Vec +} + +#[derive(thiserror::Error, Debug)] +pub enum LogstreamAffectedResourcesError { + #[error("Stream not found: {0}")] + NoSuchStream(#[from] StreamNotFound), + + #[error("Metastore error: {0}")] + FromMetastoreError(#[from] MetastoreError), +} + +impl LogstreamAffectedResources { + pub async fn load(stream_name: &str) -> Self { + Self { + filters: LogstreamAffectedResources::fetch_affected_filters(stream_name) + .await + .unwrap_or_else(|e| { + tracing::warn!("failed to fetch filters: {}", e); + Vec::new() + }), + + dashboards: LogstreamAffectedResources::fetch_affected_dashboards(stream_name) + .await + .unwrap_or_else(|e| { + tracing::warn!("failed to fetch dashboards: {}", e); + Vec::new() + }), + } + } + + pub async fn fetch_affected_filters( + stream_name: &str + ) -> Result, LogstreamAffectedResourcesError> { + if !PARSEABLE.streams.contains(stream_name) { + return Err(LogstreamAffectedResourcesError::NoSuchStream( + StreamNotFound(stream_name.to_string()) + )); + } + + Ok(PARSEABLE.metastore.get_filters().await? + .into_iter() + .filter(|filter| filter.stream_name == stream_name) + .collect()) + } + + pub async fn fetch_affected_dashboards( + stream_name: &str + ) -> Result, LogstreamAffectedResourcesError> { + if !PARSEABLE.streams.contains(stream_name) { + return Err(LogstreamAffectedResourcesError::NoSuchStream( + StreamNotFound(stream_name.to_string()) + )); + } + + let all_dashboards = PARSEABLE.metastore.get_dashboards().await?; + let mut parsed_dashboards = Vec::::new(); + + for dashboard in all_dashboards { + if dashboard.is_empty() { + continue; + } + + let dashboard_value = match serde_json::from_slice::(&dashboard) { + Ok(value) => value, + Err(err) => { + tracing::warn!("Failed to parse dashboard JSON: {}", err); + continue; + } + }; + + if let Ok(dashboard) = serde_json::from_value::(dashboard_value.clone()) { + parsed_dashboards.retain(|d: &Dashboard| { + d.dashboard_id != dashboard.dashboard_id + }); + + parsed_dashboards.push(dashboard); + } else { + tracing::warn!("Failed to deserialize dashboard: {:?}", dashboard_value); + } + } + + let mut affected_dashboards: Vec = vec![]; + + for dashboard in parsed_dashboards { + let Some(tiles) = dashboard.tiles.as_ref() else { + continue; + }; + + println!("here"); + + let mut affected_tile_ids = HashSet::::new(); + + for tile in tiles { + let Some(tile_fields) = tile.other_fields.as_ref() else { + continue; + }; + + let Some(tile_value) = tile_fields.get(TILE_FIELD_REFERRING_TO_STREAM) else { + continue; + }; + + + + if let Some(chart_query) = tile_value.as_str() { + println!("{}", chart_query); + if chart_query.contains(stream_name) && !affected_tile_ids.contains(&tile.tile_id) { + affected_tile_ids.insert(tile.tile_id); + } + } + } + + if !affected_tile_ids.is_empty() { + affected_dashboards.push(LogstreamAffectedDashboard { + dashboard, + affected_tile_ids: affected_tile_ids.into_iter().collect() + }); + } + } + + println!("h2: {}", affected_dashboards.len()); + Ok(affected_dashboards) + } +} \ No newline at end of file diff --git a/src/rbac/role.rs b/src/rbac/role.rs index 9e54bb96a..9dd0e2a91 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -32,6 +32,7 @@ pub enum Action { DetectSchema, GetSchema, GetStats, + GetLogstreamAffectedResources, DeleteStream, GetRetention, PutRetention, @@ -164,6 +165,7 @@ impl RoleBuilder { | Action::GetSchema | Action::DetectSchema | Action::GetStats + | Action::GetLogstreamAffectedResources | Action::GetRetention | Action::PutRetention | Action::All => Permission::Resource(action, self.resource_type.clone().unwrap()), From 99bc6cd636828aba583853943bb26ab049b3690a Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Mon, 29 Dec 2025 02:34:40 +0530 Subject: [PATCH 3/7] chore: use correct (std) hashset --- Cargo.lock | 170 ++++++++---------- .../http/modal/utils/logstream_utils.rs | 2 +- 2 files changed, 71 insertions(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 43cda6fc4..fb0d3373f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,7 +81,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -218,7 +218,7 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -281,7 +281,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9dd80fa0bd6217e482112d9d87a05af8e0f8dec9e3aa51f34816f761c5cf7da7" dependencies = [ "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -747,7 +747,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -900,7 +900,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.108", + "syn", ] [[package]] @@ -1231,7 +1231,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -1510,7 +1510,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.108", + "syn", ] [[package]] @@ -1521,7 +1521,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2019,7 +2019,7 @@ checksum = "1063ad4c9e094b3f798acee16d9a47bd7372d9699be2de21b05c3bd3f34ab848" dependencies = [ "datafusion-doc", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2209,7 +2209,7 @@ checksum = "30542c1ad912e0e3d22a1935c290e12e8a29d704a420177a31faad4a601a0800" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2230,7 +2230,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2240,7 +2240,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" dependencies = [ "derive_builder_core", - "syn 2.0.108", + "syn", ] [[package]] @@ -2253,7 +2253,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.108", + "syn", ] [[package]] @@ -2283,7 +2283,7 @@ dependencies = [ "convert_case 0.6.0", "proc-macro2", "quote", - "syn 2.0.108", + "syn", "unicode-xid", ] @@ -2297,7 +2297,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.108", + "syn", "unicode-xid", ] @@ -2320,7 +2320,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2527,7 +2527,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2706,6 +2706,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hmac-sha256" +version = "1.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad6880c8d4a9ebf39c6e8b77007ce223f646a4d21ce29d99f70cb16420545425" + [[package]] name = "hostname" version = "0.4.0" @@ -3064,7 +3070,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -3073,16 +3079,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" -[[package]] -name = "idna" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" -dependencies = [ - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "idna" version = "1.1.0" @@ -3585,7 +3581,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -3652,14 +3648,15 @@ checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "openid" -version = "0.15.0" +version = "0.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "627898ab5b3fff5e5f1dc0e404bafdbb87a4337d815e86149f53640380946ccc" +checksum = "c0a9d93c04da2d5e11578af6207f163c0816698b24c25a7aefae06a71e2d07bb" dependencies = [ "base64 0.22.1", "biscuit", "chrono", - "lazy_static", + "getrandom 0.3.1", + "hmac-sha256", "mime", "reqwest 0.12.12", "serde", @@ -4041,7 +4038,7 @@ checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -4084,7 +4081,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.108", + "syn", ] [[package]] @@ -4097,27 +4094,25 @@ dependencies = [ ] [[package]] -name = "proc-macro-error" -version = "1.0.4" +name = "proc-macro-error-attr2" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" dependencies = [ - "proc-macro-error-attr", "proc-macro2", "quote", - "syn 1.0.109", - "version_check", ] [[package]] -name = "proc-macro-error-attr" -version = "1.0.4" +name = "proc-macro-error2" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" dependencies = [ + "proc-macro-error-attr2", "proc-macro2", "quote", - "version_check", + "syn", ] [[package]] @@ -4227,7 +4222,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -4240,7 +4235,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -4502,7 +4497,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b" dependencies = [ "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -4700,7 +4695,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.108", + "syn", "unicode-ident", ] @@ -4718,7 +4713,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.108", + "syn", "unicode-ident", ] @@ -4995,7 +4990,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5043,7 +5038,7 @@ checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5228,7 +5223,7 @@ checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5289,7 +5284,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.108", + "syn", ] [[package]] @@ -5298,16 +5293,6 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "unicode-ident", -] - [[package]] name = "syn" version = "2.0.108" @@ -5342,7 +5327,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5425,7 +5410,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5436,7 +5421,7 @@ checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5552,7 +5537,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5771,7 +5756,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5855,27 +5840,12 @@ version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" -[[package]] -name = "unicode-bidi" -version = "0.3.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" - [[package]] name = "unicode-ident" version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a210d160f08b701c8721ba1c726c11662f877ea6b7094007e1ca9a1041945034" -[[package]] -name = "unicode-normalization" -version = "0.1.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" -dependencies = [ - "tinyvec", -] - [[package]] name = "unicode-segmentation" version = "1.12.0" @@ -5934,7 +5904,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b" dependencies = [ "form_urlencoded", - "idna 1.1.0", + "idna", "percent-encoding", "serde", ] @@ -5970,11 +5940,11 @@ dependencies = [ [[package]] name = "validator" -version = "0.18.1" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db79c75af171630a3148bd3e6d7c4f42b6a9a014c2945bc5ed0020cbb8d9478e" +checksum = "d0b4a29d8709210980a09379f27ee31549b73292c87ab9899beee1c0d3be6303" dependencies = [ - "idna 0.5.0", + "idna", "once_cell", "regex", "serde", @@ -5986,16 +5956,16 @@ dependencies = [ [[package]] name = "validator_derive" -version = "0.18.2" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df0bcf92720c40105ac4b2dda2a4ea3aa717d4d6a862cc217da653a4bd5c6b10" +checksum = "bac855a2ce6f843beb229757e6e570a42e837bcb15e5f449dd48d5747d41bf77" dependencies = [ "darling", "once_cell", - "proc-macro-error", + "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6126,7 +6096,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn 2.0.108", + "syn", "wasm-bindgen-shared", ] @@ -6161,7 +6131,7 @@ checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -6303,7 +6273,7 @@ checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6314,7 +6284,7 @@ checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6668,7 +6638,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", "synstructure", ] @@ -6699,7 +6669,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6710,7 +6680,7 @@ checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6730,7 +6700,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", "synstructure", ] @@ -6759,7 +6729,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 6f65de73b..6b6b0dc86 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -16,8 +16,8 @@ * */ +use std::collections::HashSet; use actix_web::http::header::HeaderMap; -use datafusion::common::HashSet; use ulid::Ulid; use crate::{ From eeb085f7cf6c88a793ca4c0d748f75f458074a95 Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Mon, 29 Dec 2025 21:04:42 +0530 Subject: [PATCH 4/7] feat: fetch affected alerts in LogstreamAffectedResources --- .../http/modal/utils/logstream_utils.rs | 115 ++++++++++++------ 1 file changed, 81 insertions(+), 34 deletions(-) diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 6b6b0dc86..1e5e32095 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -18,10 +18,11 @@ use std::collections::HashSet; use actix_web::http::header::HeaderMap; +use bytes::Bytes; use ulid::Ulid; use crate::{ - event::format::LogSource, handlers::{ + alerts::AlertConfig, event::format::LogSource, handlers::{ CUSTOM_PARTITION_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, UPDATE_STREAM_KEY, @@ -31,10 +32,7 @@ use crate::{ PARSEABLE, StreamNotFound }, storage::StreamType, - users::{ - dashboards::{Dashboard}, - filters::Filter - } + users::dashboards::Dashboard }; /// Field in a dashboard's tile that should contain the logstream name @@ -90,13 +88,13 @@ impl From<&HeaderMap> for PutStreamHeaders { /// Resources that rely on a specific logstream and will be affected if it gets deleted #[derive(Debug, Default, serde::Serialize)] pub struct LogstreamAffectedResources { - pub filters: Vec, + pub filters: Vec, pub dashboards: Vec } #[derive(Debug, Default, serde::Serialize)] pub struct LogstreamAffectedDashboard { - pub dashboard: Dashboard, + pub dashboard_id: Ulid, pub affected_tile_ids: Vec } @@ -130,7 +128,7 @@ impl LogstreamAffectedResources { pub async fn fetch_affected_filters( stream_name: &str - ) -> Result, LogstreamAffectedResourcesError> { + ) -> Result, LogstreamAffectedResourcesError> { if !PARSEABLE.streams.contains(stream_name) { return Err(LogstreamAffectedResourcesError::NoSuchStream( StreamNotFound(stream_name.to_string()) @@ -139,8 +137,12 @@ impl LogstreamAffectedResources { Ok(PARSEABLE.metastore.get_filters().await? .into_iter() - .filter(|filter| filter.stream_name == stream_name) - .collect()) + .filter_map(|filter| { + if filter.stream_name == stream_name && + let Some(f_id) = filter.filter_id { + Some(f_id) + } else { None } + }).collect()) } pub async fn fetch_affected_dashboards( @@ -155,39 +157,27 @@ impl LogstreamAffectedResources { let all_dashboards = PARSEABLE.metastore.get_dashboards().await?; let mut parsed_dashboards = Vec::::new(); - for dashboard in all_dashboards { - if dashboard.is_empty() { - continue; - } - - let dashboard_value = match serde_json::from_slice::(&dashboard) { - Ok(value) => value, - Err(err) => { - tracing::warn!("Failed to parse dashboard JSON: {}", err); + for dashboard_bytes in all_dashboards { + let dashboard = match self::bytes_to_json::(dashboard_bytes) { + Ok(d) => d, + Err(e) => { + tracing::warn!("{}", e.to_string()); continue; } }; - if let Ok(dashboard) = serde_json::from_value::(dashboard_value.clone()) { - parsed_dashboards.retain(|d: &Dashboard| { - d.dashboard_id != dashboard.dashboard_id - }); - + if !parsed_dashboards.iter().any(|d| d.dashboard_id == dashboard.dashboard_id) { parsed_dashboards.push(dashboard); - } else { - tracing::warn!("Failed to deserialize dashboard: {:?}", dashboard_value); } } let mut affected_dashboards: Vec = vec![]; - for dashboard in parsed_dashboards { + for (dash_i, dashboard) in parsed_dashboards.iter().enumerate() { let Some(tiles) = dashboard.tiles.as_ref() else { continue; }; - println!("here"); - let mut affected_tile_ids = HashSet::::new(); for tile in tiles { @@ -199,10 +189,7 @@ impl LogstreamAffectedResources { continue; }; - - if let Some(chart_query) = tile_value.as_str() { - println!("{}", chart_query); if chart_query.contains(stream_name) && !affected_tile_ids.contains(&tile.tile_id) { affected_tile_ids.insert(tile.tile_id); } @@ -211,13 +198,73 @@ impl LogstreamAffectedResources { if !affected_tile_ids.is_empty() { affected_dashboards.push(LogstreamAffectedDashboard { - dashboard, + dashboard_id: dashboard.dashboard_id.unwrap_or_else(|| { + tracing::warn!("dashboard {}: [id] is missing -- for logstream {}", dash_i, stream_name); + Ulid::new() // default to a new ULID if missing -- what else? + }), affected_tile_ids: affected_tile_ids.into_iter().collect() }); } } - println!("h2: {}", affected_dashboards.len()); Ok(affected_dashboards) } + + pub async fn fetch_affected_alerts( + stream_name: &str + ) -> Result, LogstreamAffectedResourcesError> { + if !PARSEABLE.streams.contains(stream_name) { + return Err(LogstreamAffectedResourcesError::NoSuchStream( + StreamNotFound(stream_name.to_string()) + )); + } + + let all_alerts = PARSEABLE.metastore.get_alerts().await?; + + let mut stream_alerts = HashSet::::new(); + for alert_bytes in all_alerts { + let alert = match self::bytes_to_json::(alert_bytes) { + Ok(alert_val) => alert_val, + Err(e) => { + tracing::warn!("{}", e.to_string()); + continue; + } + }; + + if !alert.datasets.contains(&stream_name.to_string()) { continue }; + stream_alerts.insert(alert.id); + } + + Ok(stream_alerts.into_iter().collect()) + } +} + + +// utility funcs: + +#[derive(Debug, thiserror::Error)] +pub enum Bytes2JSONError { + #[error("zero sized Bytes")] + ZeroSizedBytes, + + #[error("failed to parse bytes to JSON: {0}")] + FailedToParse(String) +} + +fn bytes_to_json(json_bytes: Bytes) -> Result { + if json_bytes.is_empty() { + return Err(Bytes2JSONError::ZeroSizedBytes); + } + + let json_bytes_value = match serde_json::from_slice::(&json_bytes) { + Ok(value) => value, + Err(err) => { + return Err(Bytes2JSONError::FailedToParse(format!("{:#?}", err))) + } + }; + + return match serde_json::from_value::(json_bytes_value.clone()) { + Ok(parsed_object) => Ok(parsed_object), + Err(e) => Err(Bytes2JSONError::FailedToParse(format!("deserialization failed: {:#?}", e))) + }; } \ No newline at end of file From 7def6f79aa6e269d069e26d07d0dd284ffe2fa68 Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Tue, 30 Dec 2025 09:17:31 +0530 Subject: [PATCH 5/7] feat: fetch affected - alerts and roles --- src/handlers/http/logstream.rs | 21 ++- src/handlers/http/modal/ingest_server.rs | 2 +- src/handlers/http/modal/query_server.rs | 2 +- src/handlers/http/modal/server.rs | 2 +- .../http/modal/utils/logstream_utils.rs | 177 +++++++++++++----- src/rbac/role.rs | 3 + 6 files changed, 150 insertions(+), 57 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index b18adb7c5..5e830fdb5 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -495,7 +495,7 @@ pub async fn delete_stream_hot_tier( )) } -pub async fn get_logstream_affected_resources( +pub async fn get_affected_resources( stream_name: Path ) -> Result { let stream_name = stream_name.into_inner(); @@ -507,9 +507,12 @@ pub async fn get_logstream_affected_resources( return Err(StreamNotFound(stream_name.clone()).into()); } - let affected_resources = LogstreamAffectedResources::load(&stream_name).await; - - Ok((web::Json(affected_resources), StatusCode::OK)) + match LogstreamAffectedResources::load(&stream_name).await { + Ok(affecred_resources) + => Ok((web::Json(affecred_resources), StatusCode::OK)), + Err(err) + => Err(err.into()) + } } #[allow(unused)] @@ -528,13 +531,13 @@ pub mod error { use http::StatusCode; use crate::{ - hottier::HotTierError, - metastore::MetastoreError, - parseable::StreamNotFound, - storage::ObjectStorageError, + hottier::HotTierError, + metastore::MetastoreError, + parseable::StreamNotFound, + storage::ObjectStorageError, validator::error::{ AlertValidationError, HotTierValidationError, StreamNameValidationError, - }, + } }; #[allow(unused)] diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index e3e68d1e7..088599701 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -276,7 +276,7 @@ impl IngestServer { // GET "/logstream/{logstream}/affected-resources" ==> Get affected resources for given log stream web::resource("/affected-resources").route( web::get() - .to(logstream::get_logstream_affected_resources) + .to(logstream::get_affected_resources) .authorize_for_resource(Action::GetLogstreamAffectedResources), ), ) diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index d2c3cb636..1e534b432 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -317,7 +317,7 @@ impl QueryServer { // GET "/logstream/{logstream}/affected-resources" ==> Get affected resources for given log stream web::resource("/affected-resources").route( web::get() - .to(logstream::get_logstream_affected_resources) + .to(logstream::get_affected_resources) .authorize_for_resource(Action::GetLogstreamAffectedResources), ), ) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index fea083b13..a425e4e58 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -492,7 +492,7 @@ impl Server { // GET "/logstream/{logstream}/affected-resources" ==> Get affected resources for given log stream web::resource("/affected-resources").route( web::get() - .to(logstream::get_logstream_affected_resources) + .to(logstream::get_affected_resources) .authorize_for_resource(Action::GetLogstreamAffectedResources), ), ) diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 1e5e32095..ef1b273db 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -16,22 +16,25 @@ * */ -use std::collections::HashSet; use actix_web::http::header::HeaderMap; use bytes::Bytes; use ulid::Ulid; use crate::{ - alerts::AlertConfig, event::format::LogSource, handlers::{ + alerts::AlertConfig, + event::format::LogSource, + handlers::{ CUSTOM_PARTITION_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, - UPDATE_STREAM_KEY, + UPDATE_STREAM_KEY, http::logstream::error::StreamError, }, metastore::MetastoreError, - parseable::{ - PARSEABLE, StreamNotFound + parseable::{PARSEABLE, StreamNotFound}, + rbac::role::{ + ParseableResourceType, + model::DefaultPrivilege }, - storage::StreamType, + storage::{ObjectStorageError, StorageMetadata, StreamType}, users::dashboards::Dashboard }; @@ -89,7 +92,9 @@ impl From<&HeaderMap> for PutStreamHeaders { #[derive(Debug, Default, serde::Serialize)] pub struct LogstreamAffectedResources { pub filters: Vec, - pub dashboards: Vec + pub dashboards: Vec, + pub alerts: Vec, + pub roles: Vec, } #[derive(Debug, Default, serde::Serialize)] @@ -100,39 +105,44 @@ pub struct LogstreamAffectedDashboard { #[derive(thiserror::Error, Debug)] pub enum LogstreamAffectedResourcesError { - #[error("Stream not found: {0}")] - NoSuchStream(#[from] StreamNotFound), + #[error("(to fetch affected resources) logstream not found: {0}")] + StreamNotFound(#[from] StreamNotFound), - #[error("Metastore error: {0}")] - FromMetastoreError(#[from] MetastoreError), + #[error("(get affected resources) metastore error: {0}")] + MetastoreError(#[from] MetastoreError), + + #[error("(get affected resources) objectstore error: {0}")] + ObjectStorageError(#[from] ObjectStorageError), + + #[error("(get affected resources) could not parse JSON: {0}")] + Bytes2JSONError(#[from] Bytes2JSONError) } impl LogstreamAffectedResources { - pub async fn load(stream_name: &str) -> Self { - Self { - filters: LogstreamAffectedResources::fetch_affected_filters(stream_name) - .await - .unwrap_or_else(|e| { - tracing::warn!("failed to fetch filters: {}", e); - Vec::new() - }), - - dashboards: LogstreamAffectedResources::fetch_affected_dashboards(stream_name) - .await - .unwrap_or_else(|e| { - tracing::warn!("failed to fetch dashboards: {}", e); - Vec::new() - }), - } + /// Load all resources that will be affected if the given logstream is deleted. + /// + /// ### Arguments + /// - `stream_name` - The name of the logstream to check for dependencies + /// + /// ### Returns + /// A tuple where: + /// - First element: `true` if no resources are affected (empty loaded struct), `false` otherwise + /// - Second element: The populated `LogstreamAffectedResources` struct + + pub async fn load(stream_name: &str) -> Result { + Ok(Self { + filters: Self::fetch_affected_filters(stream_name).await?, + dashboards: Self::fetch_affected_dashboards(stream_name).await?, + alerts: Self::fetch_affected_alerts(stream_name).await?, + roles: Self::fetch_affected_roles(stream_name).await?, + }) } pub async fn fetch_affected_filters( stream_name: &str ) -> Result, LogstreamAffectedResourcesError> { if !PARSEABLE.streams.contains(stream_name) { - return Err(LogstreamAffectedResourcesError::NoSuchStream( - StreamNotFound(stream_name.to_string()) - )); + return Err(StreamNotFound(stream_name.to_string()).into()); } Ok(PARSEABLE.metastore.get_filters().await? @@ -149,9 +159,7 @@ impl LogstreamAffectedResources { stream_name: &str ) -> Result, LogstreamAffectedResourcesError> { if !PARSEABLE.streams.contains(stream_name) { - return Err(LogstreamAffectedResourcesError::NoSuchStream( - StreamNotFound(stream_name.to_string()) - )); + return Err(StreamNotFound(stream_name.to_string()).into()); } let all_dashboards = PARSEABLE.metastore.get_dashboards().await?; @@ -178,8 +186,7 @@ impl LogstreamAffectedResources { continue; }; - let mut affected_tile_ids = HashSet::::new(); - + let mut affected_tile_ids = Vec::::new(); for tile in tiles { let Some(tile_fields) = tile.other_fields.as_ref() else { continue; @@ -191,7 +198,7 @@ impl LogstreamAffectedResources { if let Some(chart_query) = tile_value.as_str() { if chart_query.contains(stream_name) && !affected_tile_ids.contains(&tile.tile_id) { - affected_tile_ids.insert(tile.tile_id); + affected_tile_ids.push(tile.tile_id); } } } @@ -202,7 +209,7 @@ impl LogstreamAffectedResources { tracing::warn!("dashboard {}: [id] is missing -- for logstream {}", dash_i, stream_name); Ulid::new() // default to a new ULID if missing -- what else? }), - affected_tile_ids: affected_tile_ids.into_iter().collect() + affected_tile_ids }); } } @@ -214,14 +221,12 @@ impl LogstreamAffectedResources { stream_name: &str ) -> Result, LogstreamAffectedResourcesError> { if !PARSEABLE.streams.contains(stream_name) { - return Err(LogstreamAffectedResourcesError::NoSuchStream( - StreamNotFound(stream_name.to_string()) - )); + return Err(StreamNotFound(stream_name.to_string()).into()); } let all_alerts = PARSEABLE.metastore.get_alerts().await?; - let mut stream_alerts = HashSet::::new(); + let mut stream_alerts = Vec::::new(); for alert_bytes in all_alerts { let alert = match self::bytes_to_json::(alert_bytes) { Ok(alert_val) => alert_val, @@ -231,16 +236,98 @@ impl LogstreamAffectedResources { } }; - if !alert.datasets.contains(&stream_name.to_string()) { continue }; - stream_alerts.insert(alert.id); + if !alert.datasets.iter().any(|s| s == stream_name) { + continue + }; + + if !stream_alerts.contains(&alert.id) { + stream_alerts.push(alert.id); + } } - Ok(stream_alerts.into_iter().collect()) + Ok(stream_alerts) + } + + + pub async fn fetch_affected_roles( + stream_name: &str + ) -> Result, LogstreamAffectedResourcesError> { + if !PARSEABLE.streams.contains(stream_name) { + return Err(StreamNotFound(stream_name.to_string()).into()); + } + + let metadata_bytes = PARSEABLE + .metastore + .get_parseable_metadata() + .await + .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))? + .ok_or_else(|| ObjectStorageError::Custom("parseable metadata not initialized".into()))?; + + let metadata = self::bytes_to_json::(metadata_bytes)?; + + let mut stream_associated_roles = Vec::::new(); + for (role_name, privileges) in &metadata.roles { + for privilege in privileges { + + let associated_stream = match privilege { + DefaultPrivilege::Ingestor { resource } => { + match resource { + ParseableResourceType::Stream(stream) => stream, + _ => continue + } + }, + + DefaultPrivilege::Reader { resource } => { + match resource { + ParseableResourceType::Stream(stream) => stream, + _ => continue + } + }, + + DefaultPrivilege::Writer { resource } => { + match resource { + ParseableResourceType::Stream(stream) => stream, + _ => continue + } + }, + + _ => continue + }; + + if associated_stream == stream_name && !stream_associated_roles.contains(role_name) { + stream_associated_roles.push(role_name.to_string()); + + // if any role privilege matches the input stream, + // add the role to the set and break + break; + } + + } + } + + Ok(stream_associated_roles) + } +} + + +impl From for StreamError { + fn from(err: LogstreamAffectedResourcesError) -> Self { + match err { + LogstreamAffectedResourcesError::StreamNotFound(e) => { + StreamError::StreamNotFound(e) + } + LogstreamAffectedResourcesError::MetastoreError(e) => { + StreamError::MetastoreError(e) + } + other => { + StreamError::Anyhow(anyhow::anyhow!(other.to_string())) + } + } } } -// utility funcs: +// utility: #[derive(Debug, thiserror::Error)] pub enum Bytes2JSONError { diff --git a/src/rbac/role.rs b/src/rbac/role.rs index 9dd0e2a91..cc9679e8b 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -239,6 +239,7 @@ pub mod model { Action::DetectSchema, Action::GetSchema, Action::GetStats, + Action::GetLogstreamAffectedResources, Action::GetRetention, Action::PutRetention, Action::PutHotTierEnabled, @@ -275,6 +276,7 @@ pub mod model { Action::ListStream, Action::GetSchema, Action::GetStats, + Action::GetLogstreamAffectedResources, Action::PutRetention, Action::PutAlert, Action::GetAlert, @@ -315,6 +317,7 @@ pub mod model { Action::ListStream, Action::GetSchema, Action::GetStats, + Action::GetLogstreamAffectedResources, Action::GetLLM, Action::QueryLLM, Action::ListLLM, From cfb0ef199bbe29db607d7584e2bfa1f98edaea0b Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Tue, 30 Dec 2025 09:29:21 +0530 Subject: [PATCH 6/7] chore: old docstring cleanup and typo --- src/handlers/http/logstream.rs | 4 ++-- src/handlers/http/modal/utils/logstream_utils.rs | 10 ---------- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 5e830fdb5..eaaca9254 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -508,8 +508,8 @@ pub async fn get_affected_resources( } match LogstreamAffectedResources::load(&stream_name).await { - Ok(affecred_resources) - => Ok((web::Json(affecred_resources), StatusCode::OK)), + Ok(affected_resources) + => Ok((web::Json(affected_resources), StatusCode::OK)), Err(err) => Err(err.into()) } diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index ef1b273db..fda117fa4 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -119,16 +119,6 @@ pub enum LogstreamAffectedResourcesError { } impl LogstreamAffectedResources { - /// Load all resources that will be affected if the given logstream is deleted. - /// - /// ### Arguments - /// - `stream_name` - The name of the logstream to check for dependencies - /// - /// ### Returns - /// A tuple where: - /// - First element: `true` if no resources are affected (empty loaded struct), `false` otherwise - /// - Second element: The populated `LogstreamAffectedResources` struct - pub async fn load(stream_name: &str) -> Result { Ok(Self { filters: Self::fetch_affected_filters(stream_name).await?, From e6c993dc68a127c07bff1d206810cbce6e9ddf15 Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Tue, 30 Dec 2025 11:55:59 +0530 Subject: [PATCH 7/7] fix: look stream name in dbName of tiles --- .../http/modal/utils/logstream_utils.rs | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index fda117fa4..0dc046720 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -39,7 +39,7 @@ use crate::{ }; /// Field in a dashboard's tile that should contain the logstream name -const TILE_FIELD_REFERRING_TO_STREAM: &str = "chartQuery"; +const TILE_FIELD_REFERRING_TO_STREAM: &str = "dbName"; #[derive(Debug, Default)] pub struct PutStreamHeaders { @@ -186,21 +186,28 @@ impl LogstreamAffectedResources { continue; }; - if let Some(chart_query) = tile_value.as_str() { - if chart_query.contains(stream_name) && !affected_tile_ids.contains(&tile.tile_id) { + if let Some(db_names) = tile_value.as_array() { + let dbs_have_stream = db_names + .iter() + .any(|db| { + if let Some(db_str) = db.as_str() { + return db_str == stream_name + } else { false } + }); + + if dbs_have_stream && !affected_tile_ids.contains(&tile.tile_id) { affected_tile_ids.push(tile.tile_id); } } } - if !affected_tile_ids.is_empty() { + if !affected_tile_ids.is_empty() && dashboard.dashboard_id.is_some() { affected_dashboards.push(LogstreamAffectedDashboard { - dashboard_id: dashboard.dashboard_id.unwrap_or_else(|| { - tracing::warn!("dashboard {}: [id] is missing -- for logstream {}", dash_i, stream_name); - Ulid::new() // default to a new ULID if missing -- what else? - }), + dashboard_id: dashboard.dashboard_id.unwrap(), affected_tile_ids }); + } else if !affected_tile_ids.is_empty() { + tracing::warn!("dashboard {}: [id] is missing, skipping -- for logstream {}", dash_i, stream_name); } }