diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 604ab9c3a..fa15310c9 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -79,6 +79,12 @@ pub async fn delete(stream_name: Path) -> Result Result, MetastoreError>; async fn put_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; async fn delete_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; + async fn delete_zombie_filters(&self, stream_name: &str) -> Result; /// correlations async fn get_correlations(&self) -> Result, MetastoreError>; diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index d509bdfcc..5ef9dd67a 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -56,7 +56,7 @@ use crate::{ parseable_json_path, schema_path, stream_json_path, to_bytes, }, }, - users::filters::{Filter, migrate_v1_v2}, + users::filters::{FILTERS, Filter, migrate_v1_v2}, }; /// Using PARSEABLE's storage as a metastore (default) @@ -546,6 +546,38 @@ impl Metastore for ObjectStoreMetastore { .await?) } + // clear filters associated to a deleted stream + async fn delete_zombie_filters(&self, stream_name: &str) -> Result { + // stream should not exist in order to have zombie filters + if PARSEABLE.check_stream_exists(stream_name) { + warn!("no zombie filters cleared for [undeleted] stream {}", stream_name); + return Ok(false); + } + + let all_filters = match self.get_filters().await { + Ok(all_f) => all_f, + Err(e) => { + return Err(e); + } + }; + + // collect filters associated with the logstream being deleted + let filters_for_stream: Vec = all_filters + .into_iter() + .filter(|filter| filter.stream_name == stream_name) + .collect(); + + for filter in filters_for_stream.iter() { + self.delete_filter(filter).await?; + + if let Some(filter_id) = filter.filter_id.as_ref() { + FILTERS.delete_filter(filter_id).await; + } + } + + return Ok(true); + } + /// Get all correlations async fn get_correlations(&self) -> Result, MetastoreError> { let mut correlations = Vec::new(); diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index d8cba4aea..cc969a98c 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -235,6 +235,11 @@ impl Parseable { .unwrap_or_default() } + // check if a stream exists + pub fn check_stream_exists(&self, stream_name: &str) -> bool { + self.streams.contains(stream_name) + } + // validate the storage, if the proper path for staging directory is provided // if the proper data directory is provided, or s3 bucket is provided etc pub async fn validate_storage(&self) -> Result, ObjectStorageError> {