Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
stats::delete_stats(&stream_name, "json")
.unwrap_or_else(|e| warn!("failed to delete stats for stream {}: {:?}", stream_name, e));


// clear filters associated to the deleted logstream
if let Err(e) = PARSEABLE.metastore.delete_zombie_filters(&stream_name).await {
warn!("failed to delete zombie filters associated to stream {}: {:?}", stream_name, e);
}

Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
}

Expand Down
1 change: 1 addition & 0 deletions src/metastore/metastore_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub trait Metastore: std::fmt::Debug + Send + Sync {
async fn get_filters(&self) -> Result<Vec<Filter>, MetastoreError>;
async fn put_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
async fn delete_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
async fn delete_zombie_filters(&self, stream_name: &str) -> Result<bool, MetastoreError>;

/// correlations
async fn get_correlations(&self) -> Result<Vec<Bytes>, MetastoreError>;
Expand Down
34 changes: 33 additions & 1 deletion src/metastore/metastores/object_store_metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::{
parseable_json_path, schema_path, stream_json_path, to_bytes,
},
},
users::filters::{Filter, migrate_v1_v2},
users::filters::{FILTERS, Filter, migrate_v1_v2},
};

/// Using PARSEABLE's storage as a metastore (default)
Expand Down Expand Up @@ -546,6 +546,38 @@ impl Metastore for ObjectStoreMetastore {
.await?)
}

// clear filters associated to a deleted stream
async fn delete_zombie_filters(&self, stream_name: &str) -> Result<bool, MetastoreError> {
// stream should not exist in order to have zombie filters
if PARSEABLE.check_stream_exists(stream_name) {
warn!("no zombie filters cleared for [undeleted] stream {}", stream_name);
return Ok(false);
}

let all_filters = match self.get_filters().await {
Ok(all_f) => all_f,
Err(e) => {
return Err(e);
}
};

// collect filters associated with the logstream being deleted
let filters_for_stream: Vec<Filter> = all_filters
.into_iter()
.filter(|filter| filter.stream_name == stream_name)
.collect();

for filter in filters_for_stream.iter() {
self.delete_filter(filter).await?;

if let Some(filter_id) = filter.filter_id.as_ref() {
FILTERS.delete_filter(filter_id).await;
}
}

return Ok(true);
}

/// Get all correlations
async fn get_correlations(&self) -> Result<Vec<Bytes>, MetastoreError> {
let mut correlations = Vec::new();
Expand Down
5 changes: 5 additions & 0 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ impl Parseable {
.unwrap_or_default()
}

// check if a stream exists
pub fn check_stream_exists(&self, stream_name: &str) -> bool {
self.streams.contains(stream_name)
}

// validate the storage, if the proper path for staging directory is provided
// if the proper data directory is provided, or s3 bucket is provided etc
pub async fn validate_storage(&self) -> Result<Option<Bytes>, ObjectStorageError> {
Expand Down
Loading