From ec38ca8c673e9150910a7e6c66a9eac180ae289a Mon Sep 17 00:00:00 2001 From: Martin Linzmayer Date: Mon, 20 Jan 2025 11:27:36 +0100 Subject: [PATCH 1/8] ref: use Duration for flush_interval --- src/config.rs | 9 ++--- src/middleware/aggregate.rs | 71 +++++++++++++++++++++++++++++-------- 2 files changed, 62 insertions(+), 18 deletions(-) diff --git a/src/config.rs b/src/config.rs index d3a9f04..6a15944 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,3 +1,4 @@ +use std::time::Duration; #[cfg(feature = "cli")] use {anyhow::Error, serde::Deserialize, std::fs::File}; @@ -79,8 +80,8 @@ fn default_true() -> bool { } #[cfg(feature = "cli")] -fn default_flush_interval() -> u64 { - 1 +fn default_flush_interval() -> Duration { + Duration::from_secs(1) } #[cfg(feature = "cli")] @@ -96,7 +97,7 @@ pub struct AggregateMetricsConfig { #[cfg_attr(feature = "cli", serde(default = "default_true"))] pub aggregate_gauges: bool, #[cfg_attr(feature = "cli", serde(default = "default_flush_interval"))] - pub flush_interval: u64, + pub flush_interval: Duration, #[cfg_attr(feature = "cli", serde(default = "default_flush_offset"))] pub flush_offset: i64, #[cfg_attr(feature = "cli", serde(default))] @@ -152,7 +153,7 @@ mod tests { AggregateMetricsConfig { aggregate_counters: true, aggregate_gauges: true, - flush_interval: 1, + flush_interval: 1s, flush_offset: 0, max_map_size: None, }, diff --git a/src/middleware/aggregate.rs b/src/middleware/aggregate.rs index 3736285..f649c6c 100644 --- a/src/middleware/aggregate.rs +++ b/src/middleware/aggregate.rs @@ -1,14 +1,12 @@ #[cfg(test)] use std::sync::Mutex; -use std::{ - collections::HashMap, - time::{SystemTime, UNIX_EPOCH}, -}; +use std::{collections::HashMap, i64, time::{SystemTime, UNIX_EPOCH}}; use std::{fmt, str}; use crate::{config::AggregateMetricsConfig, middleware::Middleware, types::Metric}; + #[derive(Hash, Eq, PartialEq)] struct BucketKey { // contains the raw metric bytes with the value stripped out @@ -142,16 +140,16 @@ where SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() - .as_secs() + .as_millis() as u64 }); let rounded_bucket = - i64::try_from((now / self.config.flush_interval) * self.config.flush_interval) + i64::try_from((now / self.config.flush_interval.as_millis() as u64) * self.config.flush_interval.as_millis() as u64) .expect("overflow when calculating with flush_interval"); let rounded_bucket = u64::try_from(rounded_bucket + self.config.flush_offset) .expect("overflow when calculating with flush_interval"); - if self.last_flushed_at + self.config.flush_interval <= rounded_bucket { + if self.last_flushed_at + self.config.flush_interval.as_millis() as u64 <= rounded_bucket { self.flush_metrics(); self.last_flushed_at = rounded_bucket; } @@ -173,7 +171,7 @@ where #[cfg(test)] mod tests { use std::cell::RefCell; - + use std::time::Duration; use super::*; use crate::testutils::FnStep; @@ -183,7 +181,52 @@ mod tests { let config = AggregateMetricsConfig { aggregate_counters: true, aggregate_gauges: true, - flush_interval: 10, + flush_interval: Duration::from_millis(100), + flush_offset: 0, + max_map_size: None, + }; + let results = RefCell::new(vec![]); + let next = FnStep(|metric: &mut Metric| { + results.borrow_mut().push(metric.clone()); + }); + let mut aggregator = AggregateMetrics::new(config, next); + + *CURRENT_TIME.lock().unwrap() = Some(0); + + aggregator.poll(); + + aggregator.submit(&mut Metric::new( + b"users.online:1|c|@0.5|#country:china".to_vec(), + )); + + *CURRENT_TIME.lock().unwrap() = Some(10); + + aggregator.poll(); + + aggregator.submit(&mut Metric::new( + b"users.online:1|c|@0.5|#country:china".to_vec(), + )); + + assert_eq!(results.borrow_mut().len(), 0); + + *CURRENT_TIME.lock().unwrap() = Some(110); + + aggregator.poll(); + + assert_eq!( + results.borrow_mut().as_slice(), + &[Metric::new( + b"users.online:2|c|@0.5|#country:china".to_vec() + )] + ); + } + + #[test] + fn counter_seconds() { + let config = AggregateMetricsConfig { + aggregate_counters: true, + aggregate_gauges: true, + flush_interval: Duration::from_secs(1), flush_offset: 0, max_map_size: None, }; @@ -201,7 +244,7 @@ mod tests { b"users.online:1|c|@0.5|#country:china".to_vec(), )); - *CURRENT_TIME.lock().unwrap() = Some(1); + *CURRENT_TIME.lock().unwrap() = Some(101); aggregator.poll(); @@ -211,7 +254,7 @@ mod tests { assert_eq!(results.borrow_mut().len(), 0); - *CURRENT_TIME.lock().unwrap() = Some(11); + *CURRENT_TIME.lock().unwrap() = Some(1001); aggregator.poll(); @@ -228,7 +271,7 @@ mod tests { let config = AggregateMetricsConfig { aggregate_counters: true, aggregate_gauges: true, - flush_interval: 10, + flush_interval: Duration::from_millis(100), flush_offset: 0, max_map_size: None, }; @@ -246,7 +289,7 @@ mod tests { b"users.online:3|g|@0.5|#country:china".to_vec(), )); - *CURRENT_TIME.lock().unwrap() = Some(1); + *CURRENT_TIME.lock().unwrap() = Some(10); aggregator.poll(); @@ -256,7 +299,7 @@ mod tests { assert_eq!(results.borrow_mut().len(), 0); - *CURRENT_TIME.lock().unwrap() = Some(11); + *CURRENT_TIME.lock().unwrap() = Some(110); aggregator.poll(); From 50155c5c5dc103fb1557e5ccde6b2cf6dfe82b9f Mon Sep 17 00:00:00 2001 From: Martin Linzmayer Date: Mon, 20 Jan 2025 12:27:09 +0100 Subject: [PATCH 2/8] lint --- src/middleware/aggregate.rs | 2 +- src/types.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/middleware/aggregate.rs b/src/middleware/aggregate.rs index f649c6c..06eb3f9 100644 --- a/src/middleware/aggregate.rs +++ b/src/middleware/aggregate.rs @@ -1,7 +1,7 @@ #[cfg(test)] use std::sync::Mutex; -use std::{collections::HashMap, i64, time::{SystemTime, UNIX_EPOCH}}; +use std::{collections::HashMap, time::{SystemTime, UNIX_EPOCH}}; use std::{fmt, str}; use crate::{config::AggregateMetricsConfig, middleware::Middleware, types::Metric}; diff --git a/src/types.rs b/src/types.rs index fe260a8..91f33a6 100644 --- a/src/types.rs +++ b/src/types.rs @@ -39,7 +39,7 @@ pub struct MetricTag<'a> { pub name_value_sep_pos: Option, } -impl<'a> MetricTag<'a> { +impl MetricTag<'_> { pub fn new(bytes: &[u8]) -> MetricTag { MetricTag { raw: bytes, @@ -58,7 +58,7 @@ impl<'a> MetricTag<'a> { } } -impl<'a> fmt::Debug for MetricTag<'a> { +impl fmt::Debug for MetricTag<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { if self.name_value_sep_pos.is_none() { f.debug_struct("MetricTag") @@ -85,7 +85,7 @@ impl<'a> Iterator for MetricTagIterator<'a> { let mut tag_pos_iter = remaining_tags.iter(); let next_tag_sep_pos = tag_pos_iter.position(|&b| b == b','); - return if let Some(tag_sep_pos) = next_tag_sep_pos { + if let Some(tag_sep_pos) = next_tag_sep_pos { // Got a tag and more tags remain let tag = MetricTag::new(&remaining_tags[..tag_sep_pos]); self.remaining_tags = Some(&remaining_tags[tag_sep_pos + 1..]); @@ -96,7 +96,7 @@ impl<'a> Iterator for MetricTagIterator<'a> { let tag = MetricTag::new(remaining_tags); self.remaining_tags = None; Some(tag) - }; + } } } From d8bf663197438c136fa1b865407a6139c474bca3 Mon Sep 17 00:00:00 2001 From: Martin Linzmayer Date: Wed, 22 Jan 2025 12:16:15 +0100 Subject: [PATCH 3/8] add custom deserializer using humantime --- Cargo.lock | 3 +- Cargo.toml | 2 ++ example.yaml | 4 ++- src/config.rs | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 86 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3c7eee4..e885fcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "aho-corasick" @@ -463,6 +463,7 @@ dependencies = [ "clap", "crc32fast", "env_logger", + "humantime", "insta", "log", "rand", diff --git a/Cargo.toml b/Cargo.toml index 9bdd8ee..ec2b1a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ log = "0.4" signal-hook = { version = "0.3.17", optional = true } thread_local = { version = "1.1.7", optional = true } rand = { version = "0.8.5", features = ["small_rng"] } +humantime = { version = "2.1.0", optional = true } [features] default = ["cli"] @@ -31,6 +32,7 @@ cli = [ "dep:serde_yaml", "dep:signal-hook", "dep:env_logger", + "dep:humantime" ] # opt into cadence feature to enable cadence adapter diff --git a/example.yaml b/example.yaml index c688120..33ef053 100644 --- a/example.yaml +++ b/example.yaml @@ -37,7 +37,9 @@ middlewares: # # aggregate_gauges: true - # Flush the aggregate buffer every `flush_interval` seconds. + # Flush the aggregate buffer every `flush_interval` time units. + # By default, numbers without time unit suffixes are treated as seconds. + # Supported formats are defined here: https://docs.rs/humantime/latest/humantime/fn.parse_duration.html. # Defaults to 1 second. # # flush_interval: 1 diff --git a/src/config.rs b/src/config.rs index 6a15944..0a36824 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,7 @@ +use std::fmt::Formatter; use std::time::Duration; +use serde::de::Visitor; +use serde::{Deserializer}; #[cfg(feature = "cli")] use {anyhow::Error, serde::Deserialize, std::fs::File}; @@ -96,7 +99,7 @@ pub struct AggregateMetricsConfig { pub aggregate_counters: bool, #[cfg_attr(feature = "cli", serde(default = "default_true"))] pub aggregate_gauges: bool, - #[cfg_attr(feature = "cli", serde(default = "default_flush_interval"))] + #[cfg_attr(feature = "cli", serde(default = "default_flush_interval", deserialize_with="deserialize_duration"))] pub flush_interval: Duration, #[cfg_attr(feature = "cli", serde(default = "default_flush_offset"))] pub flush_offset: i64, @@ -110,11 +113,86 @@ pub struct SampleConfig { pub sample_rate: f64, } +/// Deserializes a number or a time-string into a Duration struct. +/// Numbers without unit suffixes will be treated as seconds while suffixes will be +/// parsed using https://crates.io/crates/humantime +fn deserialize_duration<'de, D>(deserializer: D) -> Result where D:Deserializer<'de> { + struct FlushDurationVisitor; + + impl Visitor<'_> for FlushDurationVisitor { + type Value = Duration; + + fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { + formatter.write_str("a non negative number with optional unit suffix") + } + + fn visit_u64(self, v: u64) -> Result + where + E: serde::de::Error, + { + Ok(Duration::from_secs(v)) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + humantime::parse_duration(v).map_err(serde::de::Error::custom) + } + } + + deserializer.deserialize_any(FlushDurationVisitor) +} + #[cfg(test)] #[cfg(feature = "cli")] mod tests { use super::*; + #[test] + fn flush_duration_without_suffix() { + let yaml = r#" + middlewares: + - type: aggregate-metrics + flush_interval: 10 + "#; + let config = serde_yaml::from_str::(yaml).unwrap(); + assert!(matches!(&config.middlewares[0], MiddlewareConfig::AggregateMetrics(c) if c.flush_interval == Duration::from_secs(10))); + } + + #[test] + fn flush_duration_ms_suffix() { + let yaml = r#" + middlewares: + - type: aggregate-metrics + flush_interval: 125ms + "#; + let config = serde_yaml::from_str::(yaml).unwrap(); + assert!(matches!(&config.middlewares[0], MiddlewareConfig::AggregateMetrics(c) if c.flush_interval == Duration::from_millis(125))); + } + + #[test] + fn flush_duration_negative_number() { + let yaml = r#" + middleware: + - type: aggregate-metrics + flush_interval: -1000 + "#; + let config = serde_yaml::from_str::(yaml); + assert!(config.is_err()); + } + + #[test] + fn flush_duration_negative_number_with_suffix() { + let yaml = r#" + middleware: + - type: aggregate-metrics + flush_interval: -125ms + "#; + let config = serde_yaml::from_str::(yaml); + assert!(config.is_err()); + } + #[test] fn config() { let config = Config::new("example.yaml").unwrap(); From 12a745fccc5c8e4bc00994448e6fe237dc198124 Mon Sep 17 00:00:00 2001 From: Martin Linzmayer Date: Wed, 22 Jan 2025 12:19:01 +0100 Subject: [PATCH 4/8] add feature flag guard --- src/config.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/config.rs b/src/config.rs index 0a36824..e61ee9c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,8 @@ use std::fmt::Formatter; use std::time::Duration; +#[cfg(feature = "cli")] use serde::de::Visitor; +#[cfg(feature = "cli")] use serde::{Deserializer}; #[cfg(feature = "cli")] use {anyhow::Error, serde::Deserialize, std::fs::File}; @@ -116,6 +118,7 @@ pub struct SampleConfig { /// Deserializes a number or a time-string into a Duration struct. /// Numbers without unit suffixes will be treated as seconds while suffixes will be /// parsed using https://crates.io/crates/humantime +#[cfg(feature = "cli")] fn deserialize_duration<'de, D>(deserializer: D) -> Result where D:Deserializer<'de> { struct FlushDurationVisitor; From caef14c80413493fb9187c201ec8d34414489d4c Mon Sep 17 00:00:00 2001 From: Martin Linzmayer Date: Wed, 22 Jan 2025 12:26:41 +0100 Subject: [PATCH 5/8] rename visitor struct --- src/config.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/config.rs b/src/config.rs index e61ee9c..11611f1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -120,9 +120,9 @@ pub struct SampleConfig { /// parsed using https://crates.io/crates/humantime #[cfg(feature = "cli")] fn deserialize_duration<'de, D>(deserializer: D) -> Result where D:Deserializer<'de> { - struct FlushDurationVisitor; + struct FLushIntervalVisitor; - impl Visitor<'_> for FlushDurationVisitor { + impl Visitor<'_> for FLushIntervalVisitor { type Value = Duration; fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { @@ -144,7 +144,7 @@ fn deserialize_duration<'de, D>(deserializer: D) -> Result w } } - deserializer.deserialize_any(FlushDurationVisitor) + deserializer.deserialize_any(FLushIntervalVisitor) } #[cfg(test)] From 75a7cf57bf40fb1d2d3e5f65b0720e4706881ced Mon Sep 17 00:00:00 2001 From: Martin Linzmayer Date: Wed, 22 Jan 2025 12:27:03 +0100 Subject: [PATCH 6/8] fix typo --- src/config.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/config.rs b/src/config.rs index 11611f1..e2e52c3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -120,9 +120,9 @@ pub struct SampleConfig { /// parsed using https://crates.io/crates/humantime #[cfg(feature = "cli")] fn deserialize_duration<'de, D>(deserializer: D) -> Result where D:Deserializer<'de> { - struct FLushIntervalVisitor; + struct FlushIntervalVisitor; - impl Visitor<'_> for FLushIntervalVisitor { + impl Visitor<'_> for FlushIntervalVisitor { type Value = Duration; fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { @@ -144,7 +144,7 @@ fn deserialize_duration<'de, D>(deserializer: D) -> Result w } } - deserializer.deserialize_any(FLushIntervalVisitor) + deserializer.deserialize_any(FlushIntervalVisitor) } #[cfg(test)] From c26c1b6187106e7f974de27bb7099d6dcf4a0141 Mon Sep 17 00:00:00 2001 From: Martin Linzmayer Date: Fri, 24 Jan 2025 11:02:57 +0100 Subject: [PATCH 7/8] interpret flush interval as milliseconds --- Cargo.lock | 1 - Cargo.toml | 4 +--- example.yaml | 6 ++---- src/config.rs | 37 ++++--------------------------------- src/middleware/aggregate.rs | 1 - 5 files changed, 7 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e885fcc..b2d2e64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -463,7 +463,6 @@ dependencies = [ "clap", "crc32fast", "env_logger", - "humantime", "insta", "log", "rand", diff --git a/Cargo.toml b/Cargo.toml index ec2b1a2..26ad8e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,6 @@ log = "0.4" signal-hook = { version = "0.3.17", optional = true } thread_local = { version = "1.1.7", optional = true } rand = { version = "0.8.5", features = ["small_rng"] } -humantime = { version = "2.1.0", optional = true } [features] default = ["cli"] @@ -31,8 +30,7 @@ cli = [ "dep:serde", "dep:serde_yaml", "dep:signal-hook", - "dep:env_logger", - "dep:humantime" + "dep:env_logger" ] # opt into cadence feature to enable cadence adapter diff --git a/example.yaml b/example.yaml index 33ef053..a07d708 100644 --- a/example.yaml +++ b/example.yaml @@ -37,12 +37,10 @@ middlewares: # # aggregate_gauges: true - # Flush the aggregate buffer every `flush_interval` time units. - # By default, numbers without time unit suffixes are treated as seconds. - # Supported formats are defined here: https://docs.rs/humantime/latest/humantime/fn.parse_duration.html. + # Flush the aggregate buffer every `flush_interval` milliseconds. # Defaults to 1 second. # - # flush_interval: 1 + # flush_interval: 1000 # Normally the times at which metrics are flushed are approximately aligned # with a multiple of `flush_interval`. For example, a `flush_interval` of 1 diff --git a/src/config.rs b/src/config.rs index e2e52c3..975d72f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -126,21 +126,14 @@ fn deserialize_duration<'de, D>(deserializer: D) -> Result w type Value = Duration; fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { - formatter.write_str("a non negative number with optional unit suffix") + formatter.write_str("a non negative number") } fn visit_u64(self, v: u64) -> Result where E: serde::de::Error, { - Ok(Duration::from_secs(v)) - } - - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - humantime::parse_duration(v).map_err(serde::de::Error::custom) + Ok(Duration::from_millis(v)) } } @@ -153,22 +146,11 @@ mod tests { use super::*; #[test] - fn flush_duration_without_suffix() { - let yaml = r#" - middlewares: - - type: aggregate-metrics - flush_interval: 10 - "#; - let config = serde_yaml::from_str::(yaml).unwrap(); - assert!(matches!(&config.middlewares[0], MiddlewareConfig::AggregateMetrics(c) if c.flush_interval == Duration::from_secs(10))); - } - - #[test] - fn flush_duration_ms_suffix() { + fn flush_duration_milliseconds() { let yaml = r#" middlewares: - type: aggregate-metrics - flush_interval: 125ms + flush_interval: 125 "#; let config = serde_yaml::from_str::(yaml).unwrap(); assert!(matches!(&config.middlewares[0], MiddlewareConfig::AggregateMetrics(c) if c.flush_interval == Duration::from_millis(125))); @@ -185,17 +167,6 @@ mod tests { assert!(config.is_err()); } - #[test] - fn flush_duration_negative_number_with_suffix() { - let yaml = r#" - middleware: - - type: aggregate-metrics - flush_interval: -125ms - "#; - let config = serde_yaml::from_str::(yaml); - assert!(config.is_err()); - } - #[test] fn config() { let config = Config::new("example.yaml").unwrap(); diff --git a/src/middleware/aggregate.rs b/src/middleware/aggregate.rs index 06eb3f9..cd7598b 100644 --- a/src/middleware/aggregate.rs +++ b/src/middleware/aggregate.rs @@ -6,7 +6,6 @@ use std::{fmt, str}; use crate::{config::AggregateMetricsConfig, middleware::Middleware, types::Metric}; - #[derive(Hash, Eq, PartialEq)] struct BucketKey { // contains the raw metric bytes with the value stripped out From d024b19ce8d4038ae4eb1abe1c09b4a7bb359e9c Mon Sep 17 00:00:00 2001 From: Martin Linzmayer Date: Fri, 24 Jan 2025 11:27:15 +0100 Subject: [PATCH 8/8] trailing comma --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 26ad8e4..9bdd8ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ cli = [ "dep:serde", "dep:serde_yaml", "dep:signal-hook", - "dep:env_logger" + "dep:env_logger", ] # opt into cadence feature to enable cadence adapter