Skip to content

Commit 846468a

Browse files
Added a metric to track the number of running pipelines per index. (#5880)
1 parent 1f14542 commit 846468a

File tree

2 files changed

+15
-0
lines changed

2 files changed

+15
-0
lines changed

quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use quickwit_actors::{
2323
QueueCapacity, Supervisable,
2424
};
2525
use quickwit_common::KillSwitch;
26+
use quickwit_common::metrics::OwnedGaugeGuard;
2627
use quickwit_common::pubsub::EventBroker;
2728
use quickwit_common::temp_dir::TempDirectory;
2829
use quickwit_config::{IndexingSettings, RetentionPolicy, SourceConfig};
@@ -120,6 +121,7 @@ pub struct IndexingPipeline {
120121
// requiring a respawn of the pipeline.
121122
// We keep the list of shards here however, to reassign them after a respawn.
122123
shard_ids: BTreeSet<ShardId>,
124+
_indexing_pipelines_gauge_guard: OwnedGaugeGuard,
123125
}
124126

125127
#[async_trait]
@@ -154,6 +156,10 @@ impl Actor for IndexingPipeline {
154156

155157
impl IndexingPipeline {
156158
pub fn new(params: IndexingPipelineParams) -> Self {
159+
let indexing_pipelines_gauge = crate::metrics::INDEXER_METRICS
160+
.indexing_pipelines
161+
.with_label_values([&params.pipeline_id.index_uid.index_id]);
162+
let indexing_pipelines_gauge_guard = OwnedGaugeGuard::from_gauge(indexing_pipelines_gauge);
157163
let params_fingerprint = params.params_fingerprint;
158164
IndexingPipeline {
159165
params,
@@ -165,6 +171,7 @@ impl IndexingPipeline {
165171
..Default::default()
166172
},
167173
shard_ids: Default::default(),
174+
_indexing_pipelines_gauge_guard: indexing_pipelines_gauge_guard,
168175
}
169176
}
170177

quickwit/quickwit-indexing/src/metrics.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use quickwit_common::metrics::{
2121
pub struct IndexerMetrics {
2222
pub processed_docs_total: IntCounterVec<2>,
2323
pub processed_bytes: IntCounterVec<2>,
24+
pub indexing_pipelines: IntGaugeVec<1>,
2425
pub backpressure_micros: IntCounterVec<1>,
2526
pub available_concurrent_upload_permits: IntGaugeVec<1>,
2627
pub split_builders: IntGauge,
@@ -51,6 +52,13 @@ impl Default for IndexerMetrics {
5152
&[],
5253
["index", "docs_processed_status"],
5354
),
55+
indexing_pipelines: new_gauge_vec(
56+
"indexing_pipelines",
57+
"Number of running indexing pipelines",
58+
"indexing",
59+
&[],
60+
["index"],
61+
),
5462
backpressure_micros: new_counter_vec(
5563
"backpressure_micros",
5664
"Amount of time spent in backpressure (in micros). This time only includes the \

0 commit comments

Comments
 (0)