Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ middlewares:
#
# aggregate_gauges: true

# Flush the aggregate buffer every `flush_interval` seconds.
# Flush the aggregate buffer every `flush_interval` milliseconds.
# Defaults to 1 second.
#
# flush_interval: 1
# flush_interval: 1000

# Normally the times at which metrics are flushed are approximately aligned
# with a multiple of `flush_interval`. For example, a `flush_interval` of 1
Expand Down
63 changes: 58 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
use std::fmt::Formatter;
use std::time::Duration;
#[cfg(feature = "cli")]
use serde::de::Visitor;
#[cfg(feature = "cli")]
use serde::{Deserializer};
#[cfg(feature = "cli")]
use {anyhow::Error, serde::Deserialize, std::fs::File};

Expand Down Expand Up @@ -79,8 +85,8 @@ fn default_true() -> bool {
}

#[cfg(feature = "cli")]
fn default_flush_interval() -> u64 {
1
fn default_flush_interval() -> Duration {
Duration::from_secs(1)
}

#[cfg(feature = "cli")]
Expand All @@ -95,8 +101,8 @@ pub struct AggregateMetricsConfig {
pub aggregate_counters: bool,
#[cfg_attr(feature = "cli", serde(default = "default_true"))]
pub aggregate_gauges: bool,
#[cfg_attr(feature = "cli", serde(default = "default_flush_interval"))]
pub flush_interval: u64,
#[cfg_attr(feature = "cli", serde(default = "default_flush_interval", deserialize_with="deserialize_duration"))]
pub flush_interval: Duration,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have a custom deserializer so we can still parse e.g. 1 into seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that makes a lot of sense. I added the custom deserializer which defaults plain numbers to seconds (for backwards compatibility) while parsing numbers with unit suffixes using https://docs.rs/humantime/latest/humantime/index.html

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think humantime is too complicated here. why do we want it? does it just happen to align with how Relay represents duration in its config format? we use plain numbers in snuba, for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that we have backwards compatibility by assuming that plain numbers are seconds while providing a possibility to have sub-second intervals.
I added humantime because I didn't want to roll time string parsing myself, but I can also simplify it by allowing seconds or milliseconds only and write the parsing code myself, what do you thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relay configures this only in code, so the main concern was backwards compatibility

Copy link
Contributor Author

@Litarnus Litarnus Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could to get rid of the units entirely, although I'm not too happy to use floats when it's possible to represent it with integers

Copy link
Member

@untitaker untitaker Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use "humantime" for any configuration parameters at all, in any YAML file in {snuba, sentry}. rather we use a numeric value and specify the unit as part of the name of the configuration parameter, such as flush_interval_ms: 1000.

I think preserving backwards compat can be done purely with serde primitives and custom deserializers + alias, but I would also be OK with just making that breaking change. Snuba does not make this kind of thing configurable in the first place, so there are no config files for us to migrate. I don't know about Relay.

Copy link
Contributor Author

@Litarnus Litarnus Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so can we just make this a float then? instead of pulling in a new dep

Fair point with the new dep, I would still suggest to allow plain numbers (= seconds) or using the ms suffix to treat it as milliseconds. This should give us enough flexibility without having more dependencies while keeping the parsing code fairly easy. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use "humantime" for any configuration parameters at all, in any YAML file in {snuba, sentry}. rather we use a numeric value and specify the unit as part of the name of the configuration parameter, such as flush_interval_ms: 1000.

I think preserving backwards compat can be done purely with serde primitives and custom deserializers + alias, but I would also be OK with just making that breaking change. Snuba does not make this kind of thing configurable in the first place, so there are no config files for us to migrate. I don't know about Relay.

Thanks for the write up, I will look into adding support for an additional parameter that represents ms.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After giving it a bit more thought I decided to introduce the breaking change for the sake of simplicity

#[cfg_attr(feature = "cli", serde(default = "default_flush_offset"))]
pub flush_offset: i64,
#[cfg_attr(feature = "cli", serde(default))]
Expand All @@ -109,11 +115,58 @@ pub struct SampleConfig {
pub sample_rate: f64,
}

/// Deserializes a number or a time-string into a Duration struct.
/// Numbers without unit suffixes will be treated as seconds while suffixes will be
/// parsed using https://crates.io/crates/humantime
#[cfg(feature = "cli")]
fn deserialize_duration<'de, D>(deserializer: D) -> Result<Duration, D::Error> where D:Deserializer<'de> {
struct FlushIntervalVisitor;

impl Visitor<'_> for FlushIntervalVisitor {
type Value = Duration;

fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
formatter.write_str("a non negative number")
}

fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Duration::from_millis(v))
}
}

deserializer.deserialize_any(FlushIntervalVisitor)
}

#[cfg(test)]
#[cfg(feature = "cli")]
mod tests {
use super::*;

#[test]
fn flush_duration_milliseconds() {
let yaml = r#"
middlewares:
- type: aggregate-metrics
flush_interval: 125
"#;
let config = serde_yaml::from_str::<Config>(yaml).unwrap();
assert!(matches!(&config.middlewares[0], MiddlewareConfig::AggregateMetrics(c) if c.flush_interval == Duration::from_millis(125)));
}

#[test]
fn flush_duration_negative_number() {
let yaml = r#"
middleware:
- type: aggregate-metrics
flush_interval: -1000
"#;
let config = serde_yaml::from_str::<Config>(yaml);
assert!(config.is_err());
}

#[test]
fn config() {
let config = Config::new("example.yaml").unwrap();
Expand Down Expand Up @@ -152,7 +205,7 @@ mod tests {
AggregateMetricsConfig {
aggregate_counters: true,
aggregate_gauges: true,
flush_interval: 1,
flush_interval: 1s,
flush_offset: 0,
max_map_size: None,
},
Expand Down
70 changes: 56 additions & 14 deletions src/middleware/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
#[cfg(test)]
use std::sync::Mutex;

use std::{
collections::HashMap,
time::{SystemTime, UNIX_EPOCH},
};
use std::{collections::HashMap, time::{SystemTime, UNIX_EPOCH}};
use std::{fmt, str};

use crate::{config::AggregateMetricsConfig, middleware::Middleware, types::Metric};
Expand Down Expand Up @@ -142,16 +139,16 @@ where
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
.as_millis() as u64
});

let rounded_bucket =
i64::try_from((now / self.config.flush_interval) * self.config.flush_interval)
i64::try_from((now / self.config.flush_interval.as_millis() as u64) * self.config.flush_interval.as_millis() as u64)
.expect("overflow when calculating with flush_interval");
let rounded_bucket = u64::try_from(rounded_bucket + self.config.flush_offset)
.expect("overflow when calculating with flush_interval");

if self.last_flushed_at + self.config.flush_interval <= rounded_bucket {
if self.last_flushed_at + self.config.flush_interval.as_millis() as u64 <= rounded_bucket {
self.flush_metrics();
self.last_flushed_at = rounded_bucket;
}
Expand All @@ -173,7 +170,7 @@ where
#[cfg(test)]
mod tests {
use std::cell::RefCell;

use std::time::Duration;
use super::*;

use crate::testutils::FnStep;
Expand All @@ -183,7 +180,52 @@ mod tests {
let config = AggregateMetricsConfig {
aggregate_counters: true,
aggregate_gauges: true,
flush_interval: 10,
flush_interval: Duration::from_millis(100),
flush_offset: 0,
max_map_size: None,
};
let results = RefCell::new(vec![]);
let next = FnStep(|metric: &mut Metric| {
results.borrow_mut().push(metric.clone());
});
let mut aggregator = AggregateMetrics::new(config, next);

*CURRENT_TIME.lock().unwrap() = Some(0);

aggregator.poll();

aggregator.submit(&mut Metric::new(
b"users.online:1|c|@0.5|#country:china".to_vec(),
));

*CURRENT_TIME.lock().unwrap() = Some(10);

aggregator.poll();

aggregator.submit(&mut Metric::new(
b"users.online:1|c|@0.5|#country:china".to_vec(),
));

assert_eq!(results.borrow_mut().len(), 0);

*CURRENT_TIME.lock().unwrap() = Some(110);

aggregator.poll();

assert_eq!(
results.borrow_mut().as_slice(),
&[Metric::new(
b"users.online:2|c|@0.5|#country:china".to_vec()
)]
);
}

#[test]
fn counter_seconds() {
let config = AggregateMetricsConfig {
aggregate_counters: true,
aggregate_gauges: true,
flush_interval: Duration::from_secs(1),
flush_offset: 0,
max_map_size: None,
};
Expand All @@ -201,7 +243,7 @@ mod tests {
b"users.online:1|c|@0.5|#country:china".to_vec(),
));

*CURRENT_TIME.lock().unwrap() = Some(1);
*CURRENT_TIME.lock().unwrap() = Some(101);

aggregator.poll();

Expand All @@ -211,7 +253,7 @@ mod tests {

assert_eq!(results.borrow_mut().len(), 0);

*CURRENT_TIME.lock().unwrap() = Some(11);
*CURRENT_TIME.lock().unwrap() = Some(1001);

aggregator.poll();

Expand All @@ -228,7 +270,7 @@ mod tests {
let config = AggregateMetricsConfig {
aggregate_counters: true,
aggregate_gauges: true,
flush_interval: 10,
flush_interval: Duration::from_millis(100),
flush_offset: 0,
max_map_size: None,
};
Expand All @@ -246,7 +288,7 @@ mod tests {
b"users.online:3|g|@0.5|#country:china".to_vec(),
));

*CURRENT_TIME.lock().unwrap() = Some(1);
*CURRENT_TIME.lock().unwrap() = Some(10);

aggregator.poll();

Expand All @@ -256,7 +298,7 @@ mod tests {

assert_eq!(results.borrow_mut().len(), 0);

*CURRENT_TIME.lock().unwrap() = Some(11);
*CURRENT_TIME.lock().unwrap() = Some(110);

aggregator.poll();

Expand Down
8 changes: 4 additions & 4 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct MetricTag<'a> {
pub name_value_sep_pos: Option<usize>,
}

impl<'a> MetricTag<'a> {
impl MetricTag<'_> {
pub fn new(bytes: &[u8]) -> MetricTag {
MetricTag {
raw: bytes,
Expand All @@ -58,7 +58,7 @@ impl<'a> MetricTag<'a> {
}
}

impl<'a> fmt::Debug for MetricTag<'a> {
impl fmt::Debug for MetricTag<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.name_value_sep_pos.is_none() {
f.debug_struct("MetricTag")
Expand All @@ -85,7 +85,7 @@ impl<'a> Iterator for MetricTagIterator<'a> {
let mut tag_pos_iter = remaining_tags.iter();
let next_tag_sep_pos = tag_pos_iter.position(|&b| b == b',');

return if let Some(tag_sep_pos) = next_tag_sep_pos {
if let Some(tag_sep_pos) = next_tag_sep_pos {
// Got a tag and more tags remain
let tag = MetricTag::new(&remaining_tags[..tag_sep_pos]);
self.remaining_tags = Some(&remaining_tags[tag_sep_pos + 1..]);
Expand All @@ -96,7 +96,7 @@ impl<'a> Iterator for MetricTagIterator<'a> {
let tag = MetricTag::new(remaining_tags);
self.remaining_tags = None;
Some(tag)
};
}
}
}

Expand Down
Loading