From aca26716a5f23ccf96701050568e2ba3dfe1c74f Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Tue, 23 Dec 2025 20:07:53 +0530 Subject: [PATCH 1/4] fix: remove associated filters during stream deletion --- src/handlers/http/logstream.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 604ab9c3a..c8e59c6f9 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -29,6 +29,7 @@ use crate::rbac::role::Action; use crate::stats::{Stats, event_labels_date, storage_size_labels_date}; use crate::storage::retention::Retention; use crate::storage::{ObjectStoreFormat, StreamInfo, StreamType}; +use crate::users::filters::{FILTERS, Filter}; use crate::utils::actix::extract_session_key_from_req; use crate::utils::json::flatten::{ self, convert_to_array, generic_flattening, has_more_than_max_allowed_levels, @@ -56,6 +57,21 @@ pub async fn delete(stream_name: Path) -> Result = all_filters + .into_iter() + .filter(|filter| filter.stream_name == stream_name) + .collect(); + + for filter in filters_for_stream.iter() { + PARSEABLE.metastore.delete_filter(filter).await?; + + if let Some(filter_id) = filter.filter_id.as_ref() { + FILTERS.delete_filter(filter_id).await; + } + } + // Delete from storage objectstore.delete_stream(&stream_name).await?; // Delete from staging From 19704d9e4a374c4ffdc3a2545ca9d09bc313a9da Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Tue, 23 Dec 2025 20:56:34 +0530 Subject: [PATCH 2/4] chore: safe await --- src/handlers/http/logstream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index c8e59c6f9..28d1870f5 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -57,7 +57,7 @@ pub async fn delete(stream_name: Path) -> Result = all_filters .into_iter() From d812994ec62ecd30d1cda15fb029c2489799155f Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Wed, 24 Dec 2025 19:03:14 +0530 Subject: [PATCH 3/4] chore: refactor logic for deleting zombie filters --- src/handlers/http/logstream.rs | 22 ++++-------- src/metastore/metastore_traits.rs | 1 + .../metastores/object_store_metastore.rs | 34 ++++++++++++++++++- src/parseable/mod.rs | 9 +++++ 4 files changed, 49 insertions(+), 17 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 28d1870f5..fa15310c9 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -29,7 +29,6 @@ use crate::rbac::role::Action; use crate::stats::{Stats, event_labels_date, storage_size_labels_date}; use crate::storage::retention::Retention; use crate::storage::{ObjectStoreFormat, StreamInfo, StreamType}; -use crate::users::filters::{FILTERS, Filter}; use crate::utils::actix::extract_session_key_from_req; use crate::utils::json::flatten::{ self, convert_to_array, generic_flattening, has_more_than_max_allowed_levels, @@ -57,21 +56,6 @@ pub async fn delete(stream_name: Path) -> Result = all_filters - .into_iter() - .filter(|filter| filter.stream_name == stream_name) - .collect(); - - for filter in filters_for_stream.iter() { - PARSEABLE.metastore.delete_filter(filter).await?; - - if let Some(filter_id) = filter.filter_id.as_ref() { - FILTERS.delete_filter(filter_id).await; - } - } - // Delete from storage objectstore.delete_stream(&stream_name).await?; // Delete from staging @@ -95,6 +79,12 @@ pub async fn delete(stream_name: Path) -> Result Result, MetastoreError>; async fn put_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; async fn delete_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; + async fn delete_zombie_filters(&self, stream_name: &str) -> Result; /// correlations async fn get_correlations(&self) -> Result, MetastoreError>; diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index d509bdfcc..79e408c12 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -56,7 +56,7 @@ use crate::{ parseable_json_path, schema_path, stream_json_path, to_bytes, }, }, - users::filters::{Filter, migrate_v1_v2}, + users::filters::{FILTERS, Filter, migrate_v1_v2}, }; /// Using PARSEABLE's storage as a metastore (default) @@ -546,6 +546,38 @@ impl Metastore for ObjectStoreMetastore { .await?) } + // clear filters associated to a deleted stream + async fn delete_zombie_filters(&self, stream_name: &str) -> Result { + // stream should not exist in order to have zombie filters + if PARSEABLE.check_stream_exists(stream_name) { + warn!("no zombie filters cleared for [undeleted] stream {}", stream_name); + return Ok(false); + } + + let all_filters = match PARSEABLE.metastore.get_filters().await { + Ok(all_f) => all_f, + Err(e) => { + return Err(e); + } + }; + + // collect filters associated with the logstream being deleted + let filters_for_stream: Vec = all_filters + .into_iter() + .filter(|filter| filter.stream_name == stream_name) + .collect(); + + for filter in filters_for_stream.iter() { + PARSEABLE.metastore.delete_filter(filter).await?; + + if let Some(filter_id) = filter.filter_id.as_ref() { + FILTERS.delete_filter(filter_id).await; + } + } + + return Ok(true); + } + /// Get all correlations async fn get_correlations(&self) -> Result, MetastoreError> { let mut correlations = Vec::new(); diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index d8cba4aea..80713c443 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -235,6 +235,15 @@ impl Parseable { .unwrap_or_default() } + // check if a stream exists + pub fn check_stream_exists(&self, stream_name: &str) -> bool { + if self.streams.contains(stream_name) { + return true; + } else { + return false; + } + } + // validate the storage, if the proper path for staging directory is provided // if the proper data directory is provided, or s3 bucket is provided etc pub async fn validate_storage(&self) -> Result, ObjectStorageError> { From 6e96d09de32080784d5cfbd3a37d8c365dba04a0 Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Wed, 24 Dec 2025 20:19:47 +0530 Subject: [PATCH 4/4] chore: simplification --- src/metastore/metastores/object_store_metastore.rs | 4 ++-- src/parseable/mod.rs | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index 79e408c12..5ef9dd67a 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -554,7 +554,7 @@ impl Metastore for ObjectStoreMetastore { return Ok(false); } - let all_filters = match PARSEABLE.metastore.get_filters().await { + let all_filters = match self.get_filters().await { Ok(all_f) => all_f, Err(e) => { return Err(e); @@ -568,7 +568,7 @@ impl Metastore for ObjectStoreMetastore { .collect(); for filter in filters_for_stream.iter() { - PARSEABLE.metastore.delete_filter(filter).await?; + self.delete_filter(filter).await?; if let Some(filter_id) = filter.filter_id.as_ref() { FILTERS.delete_filter(filter_id).await; diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 80713c443..cc969a98c 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -237,11 +237,7 @@ impl Parseable { // check if a stream exists pub fn check_stream_exists(&self, stream_name: &str) -> bool { - if self.streams.contains(stream_name) { - return true; - } else { - return false; - } + self.streams.contains(stream_name) } // validate the storage, if the proper path for staging directory is provided