diff --git a/src/catalog/manifest.rs b/src/catalog/manifest.rs index ad5b32422..4999a6035 100644 --- a/src/catalog/manifest.rs +++ b/src/catalog/manifest.rs @@ -18,10 +18,11 @@ use std::collections::HashMap; +use chrono::{DateTime, Utc}; use itertools::Itertools; use parquet::{file::reader::FileReader, format::SortingColumn}; -use super::column::Column; +use super::{column::{Column, TypedStatistics}, ManifestFile}; #[derive( Debug, @@ -58,6 +59,29 @@ pub struct File { pub sort_order_id: Vec, } +impl File { + pub fn get_file_bounds( + &self, + partition_column: &str, + ) -> (DateTime, DateTime) { + match self + .columns() + .iter() + .find(|col| col.name == partition_column) + .unwrap() + .stats + .as_ref() + .unwrap() + { + TypedStatistics::Int(stats) => ( + DateTime::from_timestamp_millis(stats.min).unwrap(), + DateTime::from_timestamp_millis(stats.max).unwrap(), + ), + _ => unreachable!(), + } + } +} + /// A manifest file composed of multiple file entries. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct Manifest { diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 878568e34..f733672a0 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -16,31 +16,13 @@ * */ -use std::{io::ErrorKind, sync::Arc}; - use self::{column::Column, snapshot::ManifestItem}; -use crate::handlers; -use crate::handlers::http::base_path_without_preceding_slash; -use crate::metadata::STREAM_INFO; -use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; -use crate::option::{Mode, CONFIG}; -use crate::stats::{ - event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats, -}; -use crate::{ - catalog::manifest::Manifest, - event::DEFAULT_TIMESTAMP_KEY, - query::PartialTimeFilter, - storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError}, -}; -use chrono::{DateTime, Local, NaiveTime, Utc}; +use crate::query::PartialTimeFilter; +use chrono::{DateTime, Utc}; use relative_path::RelativePathBuf; -use std::io::Error as IOError; -use tracing::{error, info}; pub mod column; pub mod manifest; pub mod snapshot; -use crate::storage::ObjectStoreFormat; pub use manifest::create_from_parquet_file; pub trait Snapshot { fn manifests(&self, time_predicates: &[PartialTimeFilter]) -> Vec; @@ -79,361 +61,6 @@ impl ManifestFile for manifest::File { } } -fn get_file_bounds( - file: &manifest::File, - partition_column: String, -) -> (DateTime, DateTime) { - match file - .columns() - .iter() - .find(|col| col.name == partition_column) - .unwrap() - .stats - .as_ref() - .unwrap() - { - column::TypedStatistics::Int(stats) => ( - DateTime::from_timestamp_millis(stats.min).unwrap(), - DateTime::from_timestamp_millis(stats.max).unwrap(), - ), - _ => unreachable!(), - } -} - -pub async fn update_snapshot( - storage: Arc, - stream_name: &str, - change: manifest::File, -) -> Result<(), ObjectStorageError> { - let mut meta = storage.get_object_store_format(stream_name).await?; - let manifests = &mut meta.snapshot.manifest_list; - let time_partition = &meta.time_partition; - let lower_bound = match time_partition { - Some(time_partition) => { - let (lower_bound, _) = get_file_bounds(&change, time_partition.to_string()); - lower_bound - } - None => { - let (lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string()); - lower_bound - } - }; - let date = lower_bound.date_naive().format("%Y-%m-%d").to_string(); - let event_labels = event_labels_date(stream_name, "json", &date); - let storage_size_labels = storage_size_labels_date(stream_name, &date); - let events_ingested = EVENTS_INGESTED_DATE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let ingestion_size = EVENTS_INGESTED_SIZE_DATE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let storage_size = EVENTS_STORAGE_SIZE_DATE - .get_metric_with_label_values(&storage_size_labels) - .unwrap() - .get() as u64; - let pos = manifests.iter().position(|item| { - item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound - }); - - // if the mode in I.S. manifest needs to be created but it is not getting created because - // there is already a pos, to index into stream.json - - // We update the manifest referenced by this position - // This updates an existing file so there is no need to create a snapshot entry. - if let Some(pos) = pos { - let info = &mut manifests[pos]; - let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); - - let mut ch = false; - for m in manifests.iter_mut() { - let p = manifest_path("").to_string(); - if m.manifest_path.contains(&p) { - let date = m - .time_lower_bound - .date_naive() - .format("%Y-%m-%d") - .to_string(); - let event_labels = event_labels_date(stream_name, "json", &date); - let storage_size_labels = storage_size_labels_date(stream_name, &date); - let events_ingested = EVENTS_INGESTED_DATE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let ingestion_size = EVENTS_INGESTED_SIZE_DATE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let storage_size = EVENTS_STORAGE_SIZE_DATE - .get_metric_with_label_values(&storage_size_labels) - .unwrap() - .get() as u64; - ch = true; - m.events_ingested = events_ingested; - m.ingestion_size = ingestion_size; - m.storage_size = storage_size; - } - } - - if ch { - if let Some(mut manifest) = storage.get_manifest(&path).await? { - manifest.apply_change(change); - storage.put_manifest(&path, manifest).await?; - let stats = get_current_stats(stream_name, "json"); - if let Some(stats) = stats { - meta.stats = stats; - } - meta.snapshot.manifest_list = manifests.to_vec(); - - storage.put_stream_manifest(stream_name, &meta).await?; - } else { - //instead of returning an error, create a new manifest (otherwise local to storage sync fails) - //but don't update the snapshot - create_manifest( - lower_bound, - change, - storage.clone(), - stream_name, - false, - meta, - events_ingested, - ingestion_size, - storage_size, - ) - .await?; - } - } else { - create_manifest( - lower_bound, - change, - storage.clone(), - stream_name, - true, - meta, - events_ingested, - ingestion_size, - storage_size, - ) - .await?; - } - } else { - create_manifest( - lower_bound, - change, - storage.clone(), - stream_name, - true, - meta, - events_ingested, - ingestion_size, - storage_size, - ) - .await?; - } - - Ok(()) -} - -#[allow(clippy::too_many_arguments)] -async fn create_manifest( - lower_bound: DateTime, - change: manifest::File, - storage: Arc, - stream_name: &str, - update_snapshot: bool, - mut meta: ObjectStoreFormat, - events_ingested: u64, - ingestion_size: u64, - storage_size: u64, -) -> Result<(), ObjectStorageError> { - let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); - let upper_bound = lower_bound - .date_naive() - .and_time( - NaiveTime::from_num_seconds_from_midnight_opt(23 * 3600 + 59 * 60 + 59, 999_999_999) - .ok_or(IOError::new( - ErrorKind::Other, - "Failed to create upper bound for manifest", - ))?, - ) - .and_utc(); - - let manifest = Manifest { - files: vec![change], - ..Manifest::default() - }; - let mut first_event_at = STREAM_INFO.get_first_event(stream_name)?; - if first_event_at.is_none() { - if let Some(first_event) = manifest.files.first() { - let time_partition = &meta.time_partition; - let lower_bound = match time_partition { - Some(time_partition) => { - let (lower_bound, _) = get_file_bounds(first_event, time_partition.to_string()); - lower_bound - } - None => { - let (lower_bound, _) = - get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); - lower_bound - } - }; - first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339()); - if let Err(err) = - STREAM_INFO.set_first_event_at(stream_name, first_event_at.as_ref().unwrap()) - { - error!( - "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", - stream_name - ); - } - } - } - - let mainfest_file_name = manifest_path("").to_string(); - let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); - storage - .put_object(&path, serde_json::to_vec(&manifest)?.into()) - .await?; - if update_snapshot { - let mut manifests = meta.snapshot.manifest_list; - let path = storage.absolute_url(&path); - let new_snapshot_entry = snapshot::ManifestItem { - manifest_path: path.to_string(), - time_lower_bound: lower_bound, - time_upper_bound: upper_bound, - events_ingested, - ingestion_size, - storage_size, - }; - manifests.push(new_snapshot_entry); - meta.snapshot.manifest_list = manifests; - let stats = get_current_stats(stream_name, "json"); - if let Some(stats) = stats { - meta.stats = stats; - } - meta.first_event_at = first_event_at; - storage.put_stream_manifest(stream_name, &meta).await?; - } - - Ok(()) -} - -pub async fn remove_manifest_from_snapshot( - storage: Arc, - stream_name: &str, - dates: Vec, -) -> Result, ObjectStorageError> { - if !dates.is_empty() { - // get current snapshot - let mut meta = storage.get_object_store_format(stream_name).await?; - let meta_for_stats = meta.clone(); - update_deleted_stats(storage.clone(), stream_name, meta_for_stats, dates.clone()).await?; - let manifests = &mut meta.snapshot.manifest_list; - // Filter out items whose manifest_path contains any of the dates_to_delete - manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); - STREAM_INFO.reset_first_event_at(stream_name)?; - meta.first_event_at = None; - storage.put_snapshot(stream_name, meta.snapshot).await?; - } - match CONFIG.options.mode { - Mode::All | Mode::Ingest => { - Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?) - } - Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?), - } -} - -pub async fn get_first_event( - storage: Arc, - stream_name: &str, - dates: Vec, -) -> Result, ObjectStorageError> { - let mut first_event_at: String = String::default(); - match CONFIG.options.mode { - Mode::All | Mode::Ingest => { - // get current snapshot - let stream_first_event = STREAM_INFO.get_first_event(stream_name)?; - if stream_first_event.is_some() { - first_event_at = stream_first_event.unwrap(); - } else { - let mut meta = storage.get_object_store_format(stream_name).await?; - let meta_clone = meta.clone(); - let manifests = meta_clone.snapshot.manifest_list; - let time_partition = meta_clone.time_partition; - if manifests.is_empty() { - info!("No manifest found for stream {stream_name}"); - return Err(ObjectStorageError::Custom("No manifest found".to_string())); - } - let manifest = &manifests[0]; - let path = partition_path( - stream_name, - manifest.time_lower_bound, - manifest.time_upper_bound, - ); - let Some(manifest) = storage.get_manifest(&path).await? else { - return Err(ObjectStorageError::UnhandledError( - "Manifest found in snapshot but not in object-storage" - .to_string() - .into(), - )); - }; - if let Some(first_event) = manifest.files.first() { - let lower_bound = match time_partition { - Some(time_partition) => { - let (lower_bound, _) = get_file_bounds(first_event, time_partition); - lower_bound - } - None => { - let (lower_bound, _) = - get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); - lower_bound - } - }; - first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); - meta.first_event_at = Some(first_event_at.clone()); - storage.put_stream_manifest(stream_name, &meta).await?; - STREAM_INFO.set_first_event_at(stream_name, &first_event_at)?; - } - } - } - Mode::Query => { - let ingestor_metadata = - handlers::http::cluster::get_ingestor_info() - .await - .map_err(|err| { - error!("Fatal: failed to get ingestor info: {:?}", err); - ObjectStorageError::from(err) - })?; - let mut ingestors_first_event_at: Vec = Vec::new(); - for ingestor in ingestor_metadata { - let url = format!( - "{}{}/logstream/{}/retention/cleanup", - ingestor.domain_name, - base_path_without_preceding_slash(), - stream_name - ); - let ingestor_first_event_at = - handlers::http::cluster::send_retention_cleanup_request( - &url, - ingestor.clone(), - &dates, - ) - .await?; - if !ingestor_first_event_at.is_empty() { - ingestors_first_event_at.push(ingestor_first_event_at); - } - } - if ingestors_first_event_at.is_empty() { - return Ok(None); - } - first_event_at = ingestors_first_event_at.iter().min().unwrap().to_string(); - } - } - - Ok(Some(first_event_at)) -} - /// Partition the path to which this manifest belongs. /// Useful when uploading the manifest file. pub fn partition_path( diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 91bdf288c..a1954c290 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -38,7 +38,7 @@ use actix_web::http::header::{self, HeaderMap}; use actix_web::web::Path; use actix_web::Responder; use bytes::Bytes; -use chrono::Utc; +use chrono::{DateTime, Utc}; use http::{header as http_header, StatusCode}; use itertools::Itertools; use relative_path::RelativePathBuf; @@ -534,11 +534,11 @@ pub async fn send_retention_cleanup_request( url: &str, ingestor: IngestorMetadata, dates: &Vec, -) -> Result { - let mut first_event_at: String = String::default(); +) -> Result>, ObjectStorageError> { if !utils::check_liveness(&ingestor.domain_name).await { - return Ok(first_event_at); + return Ok(None); } + let resp = HTTP_CLIENT .post(url) .header(header::CONTENT_TYPE, "application/json") @@ -565,13 +565,13 @@ pub async fn send_retention_cleanup_request( ); } - let resp_data = resp.bytes().await.map_err(|err| { + let first_event_at: String = resp.json().await.map_err(|err| { error!("Fatal: failed to parse response to bytes: {:?}", err); ObjectStorageError::Custom(err.to_string()) })?; + let first_event_at = DateTime::parse_from_rfc3339(&first_event_at)?.to_utc(); - first_event_at = String::from_utf8_lossy(&resp_data).to_string(); - Ok(first_event_at) + Ok(Some(first_event_at)) } pub async fn get_cluster_info() -> Result { diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index a4c25eb45..105a3e536 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -21,7 +21,7 @@ use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, St use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}; use super::ingest::create_stream_if_not_exists; use super::modal::utils::logstream_utils::{ - create_stream_and_schema_from_storage, create_update_stream, update_first_event_at, + create_stream_and_schema_from_storage, create_update_stream, }; use super::query::update_schema_when_distributed; use crate::alerts::Alerts; @@ -56,7 +56,7 @@ use std::fs; use std::num::NonZeroU32; use std::str::FromStr; use std::sync::Arc; -use tracing::warn; +use tracing::{error, warn}; pub async fn delete(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); @@ -552,16 +552,33 @@ pub async fn get_stream_info(stream_name: Path) -> Result = res.unwrap_or_default(); + let first_event_at = storage + .remove_manifest_from_snapshot(&stream_name, date_list) + .await + .ok() + .flatten() + .map(|t| t.to_rfc3339()); Ok((first_event_at, StatusCode::OK)) } diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index ab7726488..cdc338ad8 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -36,7 +36,6 @@ use crate::{ storage::{LogStream, ObjectStoreFormat, StreamType}, validator, }; -use tracing::error; pub async fn create_update_stream( headers: &HeaderMap, @@ -509,56 +508,3 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< Ok(true) } - -/// Updates the first-event-at in storage and logstream metadata for the specified stream. -/// -/// This function updates the `first-event-at` in both the object store and the stream info metadata. -/// If either update fails, an error is logged, but the function will still return the `first-event-at`. -/// -/// # Arguments -/// -/// * `stream_name` - The name of the stream to update. -/// * `first_event_at` - The value of first-event-at. -/// -/// # Returns -/// -/// * `Option` - Returns `Some(String)` with the provided timestamp if the update is successful, -/// or `None` if an error occurs. -/// -/// # Errors -/// -/// This function logs an error if: -/// * The `first-event-at` cannot be updated in the object store. -/// * The `first-event-at` cannot be updated in the stream info. -/// -/// # Examples -///```ignore -/// ```rust -/// use parseable::handlers::http::modal::utils::logstream_utils::update_first_event_at; -/// let result = update_first_event_at("my_stream", "2023-01-01T00:00:00Z").await; -/// match result { -/// Some(timestamp) => println!("first-event-at: {}", timestamp), -/// None => eprintln!("Failed to update first-event-at"), -/// } -/// ``` -pub async fn update_first_event_at(stream_name: &str, first_event_at: &str) -> Option { - let storage = CONFIG.storage().get_object_store(); - if let Err(err) = storage - .update_first_event_in_stream(stream_name, first_event_at) - .await - { - error!( - "Failed to update first_event_at in storage for stream {:?}: {err:?}", - stream_name - ); - } - - if let Err(err) = metadata::STREAM_INFO.set_first_event_at(stream_name, first_event_at) { - error!( - "Failed to update first_event_at in stream info for stream {:?}: {err:?}", - stream_name - ); - } - - Some(first_event_at.to_string()) -} diff --git a/src/metadata.rs b/src/metadata.rs index 182bc610a..ce716afb3 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -18,7 +18,7 @@ use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; -use chrono::{Local, NaiveDateTime}; +use chrono::{DateTime, Local, NaiveDateTime, Utc}; use itertools::Itertools; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; @@ -69,7 +69,7 @@ pub struct LogStreamMetadata { pub alerts: Alerts, pub retention: Option, pub created_at: String, - pub first_event_at: Option, + pub first_event_at: Option>, pub time_partition: Option, pub time_partition_limit: Option, pub custom_partition: Option, @@ -115,11 +115,14 @@ impl StreamInfo { Ok(!self.schema(stream_name)?.fields.is_empty()) } - pub fn get_first_event(&self, stream_name: &str) -> Result, MetadataError> { + pub fn get_first_event( + &self, + stream_name: &str, + ) -> Result>, MetadataError> { let map = self.read().expect(LOCK_EXPECT); map.get(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) - .map(|metadata| metadata.first_event_at.clone()) + .map(|metadata| metadata.first_event_at) } pub fn get_time_partition(&self, stream_name: &str) -> Result, MetadataError> { @@ -212,7 +215,7 @@ impl StreamInfo { pub fn set_first_event_at( &self, stream_name: &str, - first_event_at: &str, + first_event_at: DateTime, ) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) @@ -474,6 +477,12 @@ pub async fn load_stream_metadata_on_server_start( } else { ObjectStoreFormat::default() }; + let first_event_at = if let Some(time) = first_event_at { + let parsed = DateTime::parse_from_rfc3339(&time)?.to_utc(); + Some(parsed) + } else { + None + }; let schema = update_data_type_time_partition(storage, stream_name, schema, time_partition.as_ref()) .await?; @@ -558,6 +567,8 @@ pub mod error { ObjectStorage(#[from] ObjectStorageError), #[error(" Error: {0}")] Anyhow(#[from] anyhow::Error), + #[error("Error reading datetime: {0}")] + Chrono(#[from] chrono::ParseError), } } } diff --git a/src/stats.rs b/src/stats.rs index 0016dbdbe..dc11240dc 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,5 +1,3 @@ -use tracing::warn; - /* * Parseable Server (C) 2022 - 2024 Parseable, Inc. * @@ -23,8 +21,6 @@ use crate::metrics::{ EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE, }; -use crate::storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat}; -use std::sync::Arc; /// Helper struct type created by copying stats values from metadata #[derive(Debug, Default, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq)] @@ -101,67 +97,6 @@ pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option, - stream_name: &str, - meta: ObjectStoreFormat, - dates: Vec, -) -> Result<(), ObjectStorageError> { - let mut num_row: i64 = 0; - let mut storage_size: i64 = 0; - let mut ingestion_size: i64 = 0; - - let mut manifests = meta.snapshot.manifest_list; - manifests.retain(|item| dates.iter().any(|date| item.manifest_path.contains(date))); - if !manifests.is_empty() { - for manifest in manifests { - let manifest_date = manifest.time_lower_bound.date_naive().to_string(); - let _ = - EVENTS_INGESTED_DATE.remove_label_values(&[stream_name, "json", &manifest_date]); - let _ = EVENTS_INGESTED_SIZE_DATE.remove_label_values(&[ - stream_name, - "json", - &manifest_date, - ]); - let _ = EVENTS_STORAGE_SIZE_DATE.remove_label_values(&[ - "data", - stream_name, - "parquet", - &manifest_date, - ]); - num_row += manifest.events_ingested as i64; - ingestion_size += manifest.ingestion_size as i64; - storage_size += manifest.storage_size as i64; - } - } - EVENTS_DELETED - .with_label_values(&[stream_name, "json"]) - .add(num_row); - EVENTS_DELETED_SIZE - .with_label_values(&[stream_name, "json"]) - .add(ingestion_size); - DELETED_EVENTS_STORAGE_SIZE - .with_label_values(&["data", stream_name, "parquet"]) - .add(storage_size); - EVENTS_INGESTED - .with_label_values(&[stream_name, "json"]) - .sub(num_row); - EVENTS_INGESTED_SIZE - .with_label_values(&[stream_name, "json"]) - .sub(ingestion_size); - STORAGE_SIZE - .with_label_values(&["data", stream_name, "parquet"]) - .sub(storage_size); - let stats = get_current_stats(stream_name, "json"); - if let Some(stats) = stats { - if let Err(e) = storage.put_stats(stream_name, &stats).await { - warn!("Error updating stats to objectstore due to error [{}]", e); - } - } - - Ok(()) -} - pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Result<()> { let event_labels = event_labels(stream_name, format); let storage_size_labels = storage_size_labels(stream_name); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index f86b55757..b543c5a10 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -258,4 +258,6 @@ pub enum ObjectStorageError { PathError(relative_path::FromPathError), #[error("Error: {0}")] MetadataError(#[from] MetadataError), + #[error("Error parsing datetime: {0}")] + Chrono(#[from] chrono::ParseError), } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 50cef439c..b67b08e56 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -25,12 +25,22 @@ use super::{ PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; +use crate::catalog::{manifest, partition_path, snapshot}; use crate::event::format::LogSource; +use crate::event::DEFAULT_TIMESTAMP_KEY; +use crate::handlers; +use crate::handlers::http::base_path_without_preceding_slash; +use crate::handlers::http::cluster::get_ingestor_info; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::{CORRELATION_DIR, DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metadata::SchemaVersion; -use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; +use crate::metrics::{ + DELETED_EVENTS_STORAGE_SIZE, EVENTS_DELETED, EVENTS_DELETED_SIZE, EVENTS_INGESTED, + EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, + EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, +}; use crate::option::Mode; +use crate::stats::{event_labels_date, get_current_stats, storage_size_labels_date}; use crate::{ alerts::Alerts, catalog::{self, manifest::Manifest, snapshot::Snapshot}, @@ -44,15 +54,16 @@ use actix_web_prometheus::PrometheusMetrics; use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; -use chrono::{DateTime, Local, Utc}; +use chrono::{DateTime, Local, NaiveTime, Utc}; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; -use tracing::error; +use tracing::{error, info, warn}; use std::collections::BTreeMap; use std::fmt::Debug; +use std::io::ErrorKind; use std::num::NonZeroU32; use std::{ collections::HashMap, @@ -242,10 +253,10 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { async fn update_first_event_in_stream( &self, stream_name: &str, - first_event: &str, + first_event: DateTime, ) -> Result<(), ObjectStorageError> { let mut format = self.get_object_store_format(stream_name).await?; - format.first_event_at = Some(first_event.to_string()); + format.first_event_at = Some(first_event.to_rfc3339()); let format_json = to_bytes(&format); self.put_object(&stream_json_path(stream_name), format_json) .await?; @@ -650,7 +661,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let store = CONFIG.storage().get_object_store(); let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); - catalog::update_snapshot(store, stream, manifest).await?; + store.update_snapshot(stream, manifest).await?; let _ = fs::remove_file(file); } @@ -711,7 +722,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { async fn get_first_event_from_storage( &self, stream_name: &str, - ) -> Result, ObjectStorageError> { + ) -> Result>, ObjectStorageError> { let mut all_first_events = vec![]; let stream_metas = self.get_stream_meta_from_storage(stream_name).await; if let Ok(stream_metas) = stream_metas { @@ -727,12 +738,386 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { if all_first_events.is_empty() { return Ok(None); } - let first_event_at = all_first_events.iter().min().unwrap().to_rfc3339(); - Ok(Some(first_event_at)) + let first_event_at = all_first_events.into_iter().min(); + Ok(first_event_at) } // pick a better name fn get_bucket_name(&self) -> String; + + async fn get_first_event( + &self, + stream_name: &str, + dates: Vec, + ) -> Result>, ObjectStorageError> { + let mut first_event_at: Option> = None; + match CONFIG.options.mode { + Mode::All | Mode::Ingest => { + // get current snapshot + let stream_first_event = STREAM_INFO.get_first_event(stream_name)?; + if let Some(stream_first_event) = stream_first_event { + first_event_at = Some(stream_first_event); + } else { + let mut meta = self.get_object_store_format(stream_name).await?; + let meta_clone = meta.clone(); + let manifests = meta_clone.snapshot.manifest_list; + let time_partition = meta_clone.time_partition; + if manifests.is_empty() { + info!("No manifest found for stream {stream_name}"); + return Err(ObjectStorageError::Custom("No manifest found".to_string())); + } + let manifest = &manifests[0]; + let path = partition_path( + stream_name, + manifest.time_lower_bound, + manifest.time_upper_bound, + ); + let Some(manifest) = self.get_manifest(&path).await? else { + return Err(ObjectStorageError::UnhandledError( + "Manifest found in snapshot but not in object-storage" + .to_string() + .into(), + )); + }; + if let Some(first_event) = manifest.files.first() { + let (lower_bound, _) = first_event.get_file_bounds( + time_partition + .as_ref() + .map_or(DEFAULT_TIMESTAMP_KEY, |t| t.as_str()), + ); + meta.first_event_at = Some(lower_bound.to_rfc3339()); + self.put_stream_manifest(stream_name, &meta).await?; + STREAM_INFO.set_first_event_at(stream_name, lower_bound)?; + } + } + } + Mode::Query => { + let ingestor_metadata = get_ingestor_info().await.map_err(|err| { + error!("Fatal: failed to get ingestor info: {:?}", err); + ObjectStorageError::from(err) + })?; + let mut ingestors_first_event_at: Vec> = Vec::new(); + for ingestor in ingestor_metadata { + let url = format!( + "{}{}/logstream/{}/retention/cleanup", + ingestor.domain_name, + base_path_without_preceding_slash(), + stream_name + ); + let ingestor_first_event_at = + handlers::http::cluster::send_retention_cleanup_request( + &url, + ingestor.clone(), + &dates, + ) + .await?; + if let Some(ingestor_first_event_at) = ingestor_first_event_at { + ingestors_first_event_at.push(ingestor_first_event_at); + } + } + if ingestors_first_event_at.is_empty() { + return Ok(None); + } + first_event_at = ingestors_first_event_at.into_iter().min(); + } + } + + Ok(first_event_at) + } + + async fn update_snapshot( + &self, + stream_name: &str, + change: manifest::File, + ) -> Result<(), ObjectStorageError> { + let mut meta = self.get_object_store_format(stream_name).await?; + let manifests = &mut meta.snapshot.manifest_list; + let (lower_bound, _) = change.get_file_bounds( + meta.time_partition + .as_ref() + .map_or(DEFAULT_TIMESTAMP_KEY, |t| t.as_str()), + ); + let date = lower_bound.date_naive().format("%Y-%m-%d").to_string(); + let event_labels = event_labels_date(stream_name, "json", &date); + let storage_size_labels = storage_size_labels_date(stream_name, &date); + let events_ingested = EVENTS_INGESTED_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let ingestion_size = EVENTS_INGESTED_SIZE_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let storage_size = EVENTS_STORAGE_SIZE_DATE + .get_metric_with_label_values(&storage_size_labels) + .unwrap() + .get() as u64; + let pos = manifests.iter().position(|item| { + item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound + }); + + // if the mode in I.S. manifest needs to be created but it is not getting created because + // there is already a pos, to index into stream.json + + // We update the manifest referenced by this position + // This updates an existing file so there is no need to create a snapshot entry. + if let Some(pos) = pos { + let info = &mut manifests[pos]; + let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); + + let mut ch = false; + for m in manifests.iter_mut() { + let p = manifest_path("").to_string(); + if m.manifest_path.contains(&p) { + let date = m + .time_lower_bound + .date_naive() + .format("%Y-%m-%d") + .to_string(); + let event_labels = event_labels_date(stream_name, "json", &date); + let storage_size_labels = storage_size_labels_date(stream_name, &date); + let events_ingested = EVENTS_INGESTED_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let ingestion_size = EVENTS_INGESTED_SIZE_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let storage_size = EVENTS_STORAGE_SIZE_DATE + .get_metric_with_label_values(&storage_size_labels) + .unwrap() + .get() as u64; + ch = true; + m.events_ingested = events_ingested; + m.ingestion_size = ingestion_size; + m.storage_size = storage_size; + } + } + + if ch { + if let Some(mut manifest) = self.get_manifest(&path).await? { + manifest.apply_change(change); + self.put_manifest(&path, manifest).await?; + let stats = get_current_stats(stream_name, "json"); + if let Some(stats) = stats { + meta.stats = stats; + } + meta.snapshot.manifest_list = manifests.to_vec(); + + self.put_stream_manifest(stream_name, &meta).await?; + } else { + //instead of returning an error, create a new manifest (otherwise local to storage sync fails) + //but don't update the snapshot + self.create_manifest( + lower_bound, + change, + stream_name, + false, + meta, + events_ingested, + ingestion_size, + storage_size, + ) + .await?; + } + } else { + self.create_manifest( + lower_bound, + change, + stream_name, + true, + meta, + events_ingested, + ingestion_size, + storage_size, + ) + .await?; + } + } else { + self.create_manifest( + lower_bound, + change, + stream_name, + true, + meta, + events_ingested, + ingestion_size, + storage_size, + ) + .await?; + } + + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + async fn create_manifest( + &self, + lower_bound: DateTime, + change: manifest::File, + stream_name: &str, + update_snapshot: bool, + mut meta: ObjectStoreFormat, + events_ingested: u64, + ingestion_size: u64, + storage_size: u64, + ) -> Result<(), ObjectStorageError> { + let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); + let upper_bound = lower_bound + .date_naive() + .and_time( + NaiveTime::from_num_seconds_from_midnight_opt( + 23 * 3600 + 59 * 60 + 59, + 999_999_999, + ) + .ok_or(std::io::Error::new( + ErrorKind::Other, + "Failed to create upper bound for manifest", + ))?, + ) + .and_utc(); + + let manifest = Manifest { + files: vec![change], + ..Manifest::default() + }; + let mut first_event_at = STREAM_INFO.get_first_event(stream_name)?; + if first_event_at.is_none() { + if let Some(first_event) = manifest.files.first() { + let time_partition = &meta.time_partition; + let (lower_bound, _) = first_event.get_file_bounds( + time_partition + .as_ref() + .map_or(DEFAULT_TIMESTAMP_KEY, |t| t.as_str()), + ); + first_event_at = Some(lower_bound); + if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, lower_bound) { + error!( + "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", + stream_name + ); + } + } + } + + let mainfest_file_name = manifest_path("").to_string(); + let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); + self.put_object(&path, serde_json::to_vec(&manifest)?.into()) + .await?; + if update_snapshot { + let mut manifests = meta.snapshot.manifest_list; + let path = self.absolute_url(&path); + let new_snapshot_entry = snapshot::ManifestItem { + manifest_path: path.to_string(), + time_lower_bound: lower_bound, + time_upper_bound: upper_bound, + events_ingested, + ingestion_size, + storage_size, + }; + manifests.push(new_snapshot_entry); + meta.snapshot.manifest_list = manifests; + let stats = get_current_stats(stream_name, "json"); + if let Some(stats) = stats { + meta.stats = stats; + } + meta.first_event_at = first_event_at.map(|t| t.to_rfc3339()); + self.put_stream_manifest(stream_name, &meta).await?; + } + + Ok(()) + } + + async fn remove_manifest_from_snapshot( + &self, + stream_name: &str, + dates: Vec, + ) -> Result>, ObjectStorageError> { + if !dates.is_empty() { + // get current snapshot + let mut meta = self.get_object_store_format(stream_name).await?; + let meta_for_stats = meta.clone(); + self.update_deleted_stats(stream_name, meta_for_stats, dates.clone()) + .await?; + let manifests = &mut meta.snapshot.manifest_list; + // Filter out items whose manifest_path contains any of the dates_to_delete + manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); + STREAM_INFO.reset_first_event_at(stream_name)?; + meta.first_event_at = None; + self.put_snapshot(stream_name, meta.snapshot).await?; + } + let dates = match CONFIG.options.mode { + Mode::All | Mode::Ingest => Vec::new(), + Mode::Query => dates, + }; + + self.get_first_event(stream_name, dates).await + } + + async fn update_deleted_stats( + &self, + stream_name: &str, + meta: ObjectStoreFormat, + dates: Vec, + ) -> Result<(), ObjectStorageError> { + let mut num_row: i64 = 0; + let mut storage_size: i64 = 0; + let mut ingestion_size: i64 = 0; + + let mut manifests = meta.snapshot.manifest_list; + manifests.retain(|item| dates.iter().any(|date| item.manifest_path.contains(date))); + if !manifests.is_empty() { + for manifest in manifests { + let manifest_date = manifest.time_lower_bound.date_naive().to_string(); + let _ = EVENTS_INGESTED_DATE.remove_label_values(&[ + stream_name, + "json", + &manifest_date, + ]); + let _ = EVENTS_INGESTED_SIZE_DATE.remove_label_values(&[ + stream_name, + "json", + &manifest_date, + ]); + let _ = EVENTS_STORAGE_SIZE_DATE.remove_label_values(&[ + "data", + stream_name, + "parquet", + &manifest_date, + ]); + num_row += manifest.events_ingested as i64; + ingestion_size += manifest.ingestion_size as i64; + storage_size += manifest.storage_size as i64; + } + } + EVENTS_DELETED + .with_label_values(&[stream_name, "json"]) + .add(num_row); + EVENTS_DELETED_SIZE + .with_label_values(&[stream_name, "json"]) + .add(ingestion_size); + DELETED_EVENTS_STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .add(storage_size); + EVENTS_INGESTED + .with_label_values(&[stream_name, "json"]) + .sub(num_row); + EVENTS_INGESTED_SIZE + .with_label_values(&[stream_name, "json"]) + .sub(ingestion_size); + STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .sub(storage_size); + let stats = get_current_stats(stream_name, "json"); + if let Some(stats) = stats { + if let Err(e) = self.put_stats(stream_name, &stats).await { + warn!("Error updating stats to objectstore due to error [{}]", e); + } + } + + Ok(()) + } } pub async fn commit_schema_to_storage( diff --git a/src/storage/retention.rs b/src/storage/retention.rs index 2cbee48c7..f16f20cef 100644 --- a/src/storage/retention.rs +++ b/src/storage/retention.rs @@ -171,7 +171,6 @@ impl From for Vec { } mod action { - use crate::catalog::remove_manifest_from_snapshot; use crate::{metadata, option::CONFIG}; use chrono::{Days, NaiveDate, Utc}; use futures::{stream::FuturesUnordered, StreamExt}; @@ -197,7 +196,7 @@ mod action { if !dates.is_empty() { let delete_tasks = FuturesUnordered::new(); let res_remove_manifest = - remove_manifest_from_snapshot(store.clone(), &stream_name, dates.clone()).await; + store.remove_manifest_from_snapshot( &stream_name, dates.clone()).await; for date in dates_to_delete { let path = RelativePathBuf::from_iter([&stream_name, &date]); @@ -220,7 +219,7 @@ mod action { } if let Ok(Some(first_event_at)) = res_remove_manifest { if let Err(err) = - metadata::STREAM_INFO.set_first_event_at(&stream_name, &first_event_at) + metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at) { error!( "Failed to update first_event_at in streaminfo for stream {:?} {err:?}",