diff --git a/Cargo.lock b/Cargo.lock index 3c7eee4..b2d2e64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "aho-corasick" diff --git a/example.yaml b/example.yaml index c688120..a07d708 100644 --- a/example.yaml +++ b/example.yaml @@ -37,10 +37,10 @@ middlewares: # # aggregate_gauges: true - # Flush the aggregate buffer every `flush_interval` seconds. + # Flush the aggregate buffer every `flush_interval` milliseconds. # Defaults to 1 second. # - # flush_interval: 1 + # flush_interval: 1000 # Normally the times at which metrics are flushed are approximately aligned # with a multiple of `flush_interval`. For example, a `flush_interval` of 1 diff --git a/src/config.rs b/src/config.rs index d3a9f04..975d72f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,3 +1,9 @@ +use std::fmt::Formatter; +use std::time::Duration; +#[cfg(feature = "cli")] +use serde::de::Visitor; +#[cfg(feature = "cli")] +use serde::{Deserializer}; #[cfg(feature = "cli")] use {anyhow::Error, serde::Deserialize, std::fs::File}; @@ -79,8 +85,8 @@ fn default_true() -> bool { } #[cfg(feature = "cli")] -fn default_flush_interval() -> u64 { - 1 +fn default_flush_interval() -> Duration { + Duration::from_secs(1) } #[cfg(feature = "cli")] @@ -95,8 +101,8 @@ pub struct AggregateMetricsConfig { pub aggregate_counters: bool, #[cfg_attr(feature = "cli", serde(default = "default_true"))] pub aggregate_gauges: bool, - #[cfg_attr(feature = "cli", serde(default = "default_flush_interval"))] - pub flush_interval: u64, + #[cfg_attr(feature = "cli", serde(default = "default_flush_interval", deserialize_with="deserialize_duration"))] + pub flush_interval: Duration, #[cfg_attr(feature = "cli", serde(default = "default_flush_offset"))] pub flush_offset: i64, #[cfg_attr(feature = "cli", serde(default))] @@ -109,11 +115,58 @@ pub struct SampleConfig { pub sample_rate: f64, } +/// Deserializes a number or a time-string into a Duration struct. +/// Numbers without unit suffixes will be treated as seconds while suffixes will be +/// parsed using https://crates.io/crates/humantime +#[cfg(feature = "cli")] +fn deserialize_duration<'de, D>(deserializer: D) -> Result where D:Deserializer<'de> { + struct FlushIntervalVisitor; + + impl Visitor<'_> for FlushIntervalVisitor { + type Value = Duration; + + fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { + formatter.write_str("a non negative number") + } + + fn visit_u64(self, v: u64) -> Result + where + E: serde::de::Error, + { + Ok(Duration::from_millis(v)) + } + } + + deserializer.deserialize_any(FlushIntervalVisitor) +} + #[cfg(test)] #[cfg(feature = "cli")] mod tests { use super::*; + #[test] + fn flush_duration_milliseconds() { + let yaml = r#" + middlewares: + - type: aggregate-metrics + flush_interval: 125 + "#; + let config = serde_yaml::from_str::(yaml).unwrap(); + assert!(matches!(&config.middlewares[0], MiddlewareConfig::AggregateMetrics(c) if c.flush_interval == Duration::from_millis(125))); + } + + #[test] + fn flush_duration_negative_number() { + let yaml = r#" + middleware: + - type: aggregate-metrics + flush_interval: -1000 + "#; + let config = serde_yaml::from_str::(yaml); + assert!(config.is_err()); + } + #[test] fn config() { let config = Config::new("example.yaml").unwrap(); @@ -152,7 +205,7 @@ mod tests { AggregateMetricsConfig { aggregate_counters: true, aggregate_gauges: true, - flush_interval: 1, + flush_interval: 1s, flush_offset: 0, max_map_size: None, }, diff --git a/src/middleware/aggregate.rs b/src/middleware/aggregate.rs index 3736285..cd7598b 100644 --- a/src/middleware/aggregate.rs +++ b/src/middleware/aggregate.rs @@ -1,10 +1,7 @@ #[cfg(test)] use std::sync::Mutex; -use std::{ - collections::HashMap, - time::{SystemTime, UNIX_EPOCH}, -}; +use std::{collections::HashMap, time::{SystemTime, UNIX_EPOCH}}; use std::{fmt, str}; use crate::{config::AggregateMetricsConfig, middleware::Middleware, types::Metric}; @@ -142,16 +139,16 @@ where SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() - .as_secs() + .as_millis() as u64 }); let rounded_bucket = - i64::try_from((now / self.config.flush_interval) * self.config.flush_interval) + i64::try_from((now / self.config.flush_interval.as_millis() as u64) * self.config.flush_interval.as_millis() as u64) .expect("overflow when calculating with flush_interval"); let rounded_bucket = u64::try_from(rounded_bucket + self.config.flush_offset) .expect("overflow when calculating with flush_interval"); - if self.last_flushed_at + self.config.flush_interval <= rounded_bucket { + if self.last_flushed_at + self.config.flush_interval.as_millis() as u64 <= rounded_bucket { self.flush_metrics(); self.last_flushed_at = rounded_bucket; } @@ -173,7 +170,7 @@ where #[cfg(test)] mod tests { use std::cell::RefCell; - + use std::time::Duration; use super::*; use crate::testutils::FnStep; @@ -183,7 +180,52 @@ mod tests { let config = AggregateMetricsConfig { aggregate_counters: true, aggregate_gauges: true, - flush_interval: 10, + flush_interval: Duration::from_millis(100), + flush_offset: 0, + max_map_size: None, + }; + let results = RefCell::new(vec![]); + let next = FnStep(|metric: &mut Metric| { + results.borrow_mut().push(metric.clone()); + }); + let mut aggregator = AggregateMetrics::new(config, next); + + *CURRENT_TIME.lock().unwrap() = Some(0); + + aggregator.poll(); + + aggregator.submit(&mut Metric::new( + b"users.online:1|c|@0.5|#country:china".to_vec(), + )); + + *CURRENT_TIME.lock().unwrap() = Some(10); + + aggregator.poll(); + + aggregator.submit(&mut Metric::new( + b"users.online:1|c|@0.5|#country:china".to_vec(), + )); + + assert_eq!(results.borrow_mut().len(), 0); + + *CURRENT_TIME.lock().unwrap() = Some(110); + + aggregator.poll(); + + assert_eq!( + results.borrow_mut().as_slice(), + &[Metric::new( + b"users.online:2|c|@0.5|#country:china".to_vec() + )] + ); + } + + #[test] + fn counter_seconds() { + let config = AggregateMetricsConfig { + aggregate_counters: true, + aggregate_gauges: true, + flush_interval: Duration::from_secs(1), flush_offset: 0, max_map_size: None, }; @@ -201,7 +243,7 @@ mod tests { b"users.online:1|c|@0.5|#country:china".to_vec(), )); - *CURRENT_TIME.lock().unwrap() = Some(1); + *CURRENT_TIME.lock().unwrap() = Some(101); aggregator.poll(); @@ -211,7 +253,7 @@ mod tests { assert_eq!(results.borrow_mut().len(), 0); - *CURRENT_TIME.lock().unwrap() = Some(11); + *CURRENT_TIME.lock().unwrap() = Some(1001); aggregator.poll(); @@ -228,7 +270,7 @@ mod tests { let config = AggregateMetricsConfig { aggregate_counters: true, aggregate_gauges: true, - flush_interval: 10, + flush_interval: Duration::from_millis(100), flush_offset: 0, max_map_size: None, }; @@ -246,7 +288,7 @@ mod tests { b"users.online:3|g|@0.5|#country:china".to_vec(), )); - *CURRENT_TIME.lock().unwrap() = Some(1); + *CURRENT_TIME.lock().unwrap() = Some(10); aggregator.poll(); @@ -256,7 +298,7 @@ mod tests { assert_eq!(results.borrow_mut().len(), 0); - *CURRENT_TIME.lock().unwrap() = Some(11); + *CURRENT_TIME.lock().unwrap() = Some(110); aggregator.poll(); diff --git a/src/types.rs b/src/types.rs index fe260a8..91f33a6 100644 --- a/src/types.rs +++ b/src/types.rs @@ -39,7 +39,7 @@ pub struct MetricTag<'a> { pub name_value_sep_pos: Option, } -impl<'a> MetricTag<'a> { +impl MetricTag<'_> { pub fn new(bytes: &[u8]) -> MetricTag { MetricTag { raw: bytes, @@ -58,7 +58,7 @@ impl<'a> MetricTag<'a> { } } -impl<'a> fmt::Debug for MetricTag<'a> { +impl fmt::Debug for MetricTag<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { if self.name_value_sep_pos.is_none() { f.debug_struct("MetricTag") @@ -85,7 +85,7 @@ impl<'a> Iterator for MetricTagIterator<'a> { let mut tag_pos_iter = remaining_tags.iter(); let next_tag_sep_pos = tag_pos_iter.position(|&b| b == b','); - return if let Some(tag_sep_pos) = next_tag_sep_pos { + if let Some(tag_sep_pos) = next_tag_sep_pos { // Got a tag and more tags remain let tag = MetricTag::new(&remaining_tags[..tag_sep_pos]); self.remaining_tags = Some(&remaining_tags[tag_sep_pos + 1..]); @@ -96,7 +96,7 @@ impl<'a> Iterator for MetricTagIterator<'a> { let tag = MetricTag::new(remaining_tags); self.remaining_tags = None; Some(tag) - }; + } } }