diff --git a/src/correlation/mod.rs b/src/correlation.rs similarity index 94% rename from src/correlation/mod.rs rename to src/correlation.rs index 9df9c900b..cf2c7bd82 100644 --- a/src/correlation/mod.rs +++ b/src/correlation.rs @@ -54,17 +54,11 @@ impl Correlation { .into_iter() .flat_map(|(_, correlations_bytes)| correlations_bytes) .filter_map(|correlation| { - if correlation.is_empty() { - None - } else { - match serde_json::from_slice(&correlation) { - Ok(correlation_config) => Some(correlation_config), - Err(e) => { - error!("Unable to load correlation: {e}"); - None - } - } - } + serde_json::from_slice(&correlation) + .inspect_err(|e| { + error!("Unable to load correlation: {e}"); + }) + .ok() }) .collect(); @@ -107,13 +101,11 @@ impl Correlation { .find(|c| c.id == correlation_id && c.user_id == user_id) .cloned(); - if let Some(c) = correlation { - Ok(c) - } else { - Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( + correlation.ok_or_else(|| { + CorrelationError::AnyhowError(anyhow::Error::msg(format!( "Unable to find correlation with ID- {correlation_id}" - )))) - } + ))) + }) } pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> { diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index f65484259..6d75c2e5b 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -678,10 +678,8 @@ impl ObjectStorage for BlobStore { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_dashboard_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "dashboards" - )); + let user_dashboard_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/dashboards")); let dashboards_path = RelativePathBuf::from(&user_dashboard_path); let dashboard_bytes = self .get_objects( @@ -716,10 +714,8 @@ impl ObjectStorage for BlobStore { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_filters_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "filters" - )); + let user_filters_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/filters",)); let resp = self .client .list_with_delimiter(Some(&user_filters_path)) @@ -767,10 +763,8 @@ impl ObjectStorage for BlobStore { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_correlation_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "correlations" - )); + let user_correlation_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations")); let correlations_path = RelativePathBuf::from(&user_correlation_path); let correlation_bytes = self .get_objects( diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 0bb2ec12b..5d8041fc2 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -27,7 +27,7 @@ use async_trait::async_trait; use bytes::Bytes; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; use fs_extra::file::CopyOptions; -use futures::{stream::FuturesUnordered, TryStreamExt}; +use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; use relative_path::{RelativePath, RelativePathBuf}; use tokio::fs::{self, DirEntry}; use tokio_stream::wrappers::ReadDirStream; @@ -430,17 +430,16 @@ impl ObjectStorage for LocalFS { ) -> Result>, ObjectStorageError> { let mut correlations: HashMap> = HashMap::new(); let users_root_path = self.root.join(USERS_ROOT_DIR); - let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); - let users: Vec = directories.try_collect().await?; - for user in users { + let mut directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); + while let Some(user) = directories.next().await { + let user = user?; if !user.path().is_dir() { continue; } let correlations_path = users_root_path.join(user.path()).join("correlations"); - let directories = ReadDirStream::new(fs::read_dir(&correlations_path).await?); - let correlations_files: Vec = directories.try_collect().await?; - for correlation in correlations_files { - let correlation_absolute_path = correlation.path(); + let mut files = ReadDirStream::new(fs::read_dir(&correlations_path).await?); + while let Some(correlation) = files.next().await { + let correlation_absolute_path = correlation?.path(); let file = fs::read(correlation_absolute_path.clone()).await?; let correlation_relative_path = correlation_absolute_path .strip_prefix(self.root.as_path()) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 886685667..e115dd0bf 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -811,10 +811,8 @@ impl ObjectStorage for S3 { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_dashboard_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "dashboards" - )); + let user_dashboard_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/dashboards")); let dashboards_path = RelativePathBuf::from(&user_dashboard_path); let dashboard_bytes = self .get_objects( @@ -849,10 +847,8 @@ impl ObjectStorage for S3 { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_filters_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "filters" - )); + let user_filters_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/filters",)); let resp = self .client .list_with_delimiter(Some(&user_filters_path)) @@ -900,10 +896,8 @@ impl ObjectStorage for S3 { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_correlation_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "correlations" - )); + let user_correlation_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations",)); let correlations_path = RelativePathBuf::from(&user_correlation_path); let correlation_bytes = self .get_objects(