Skip to content

Commit ba49b92

Browse files
committed
Support MultiProcessCollector in RestrictedRegistry.
This change makes it so that the RestrictedRegistry will always attempt to collect metrics from a collector for which it couldn’t find any metrics name. Although this can be used generally, this is meant to be used with MultiProcessCollector. This changes the current behavior of the code but should be somehow safe as it enables filtering in case where it was not working previously. If this is an issue, an alternative approach with an explicit flag could be used (set either in the MultiProcessCollector or in the registry). The intent here is to allow collecting a subset of metrics from production fastapi servers (running in multiprocess mode). So not having to change the library usage in these servers is advantageous to have filtering work out-of-the-box with this change.
1 parent e8f8bae commit ba49b92

File tree

5 files changed

+73
-2
lines changed

5 files changed

+73
-2
lines changed

docs/content/multiprocess/_index.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ it's common to have processes rather than threads to handle large workloads.
1010
To handle this the client library can be put in multiprocess mode.
1111
This comes with a number of limitations:
1212

13-
- Registries can not be used as normal, all instantiated metrics are exported
13+
- Registries can not be used as normal
1414
- Registering metrics to a registry later used by a `MultiProcessCollector`
1515
may cause duplicate metrics to be exported
16+
- Filtering on metrics works but is inefficient
1617
- Custom collectors do not work (e.g. cpu and memory metrics)
1718
- Gauges cannot use `set_function`
1819
- Info and Enum metrics do not work

prometheus_client/registry.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def __init__(self, auto_describe: bool = False, target_info: Optional[Dict[str,
3333
self._lock = Lock()
3434
self._target_info: Optional[Dict[str, str]] = {}
3535
self.set_target_info(target_info)
36+
self._collectors_with_no_names: List[Collector] = []
3637

3738
def register(self, collector: Collector) -> None:
3839
"""Add a collector to the registry."""
@@ -46,6 +47,8 @@ def register(self, collector: Collector) -> None:
4647
for name in names:
4748
self._names_to_collectors[name] = collector
4849
self._collector_to_names[collector] = names
50+
if not names:
51+
self._collectors_with_no_names.append(collector)
4952

5053
def unregister(self, collector: Collector) -> None:
5154
"""Remove a collector from the registry."""
@@ -148,7 +151,7 @@ def __init__(self, names: Iterable[str], registry: CollectorRegistry):
148151
self._registry = registry
149152

150153
def collect(self) -> Iterable[Metric]:
151-
collectors = set()
154+
collectors = set(self._registry._collectors_with_no_names)
152155
target_info_metric = None
153156
with self._registry._lock:
154157
if 'target_info' in self._name_set and self._registry._target_info:

tests/test_asgi.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,3 +223,32 @@ def test_qs_parsing(self):
223223
asyncio.new_event_loop().run_until_complete(
224224
self.communicator.wait()
225225
)
226+
227+
def test_qs_parsing_multi(self):
228+
"""Only metrics that match the 'name[]' query string param appear"""
229+
230+
app = make_asgi_app(self.registry)
231+
metrics = [
232+
("asdf", "first test metric", 1),
233+
("bsdf", "second test metric", 2),
234+
("csdf", "third test metric", 3)
235+
]
236+
237+
for m in metrics:
238+
self.increment_metrics(*m)
239+
240+
self.seed_app(app)
241+
self.scope['query_string'] = "&".join([f"name[]={m[0]}_total" for m in metrics[0:2]]).encode("utf-8")
242+
self.send_default_request()
243+
244+
outputs = self.get_all_output()
245+
response_body = outputs[1]
246+
output = response_body['body'].decode('utf8')
247+
248+
self.assert_metrics(output, *metrics[0])
249+
self.assert_metrics(output, *metrics[1])
250+
self.assert_not_metrics(output, *metrics[2])
251+
252+
asyncio.new_event_loop().run_until_complete(
253+
self.communicator.wait()
254+
)

tests/test_core.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,6 +1024,15 @@ def test_restricted_registry_does_not_call_extra(self):
10241024
self.assertEqual([m], list(registry.restricted_registry(['s_sum']).collect()))
10251025
mock_collector.collect.assert_not_called()
10261026

1027+
def test_restricted_registry_collects_no_names_collectors(self):
1028+
from unittest.mock import MagicMock
1029+
registry = CollectorRegistry()
1030+
mock_collector = MagicMock()
1031+
mock_collector.describe.return_value = []
1032+
registry.register(mock_collector)
1033+
self.assertEqual(list(registry.restricted_registry(['metric']).collect()), [])
1034+
mock_collector.collect.assert_called()
1035+
10271036
def test_restricted_registry_does_not_yield_while_locked(self):
10281037
registry = CollectorRegistry(target_info={'foo': 'bar'})
10291038
Summary('s', 'help', registry=registry).observe(7)

tests/test_multiprocess.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,35 @@ def add_label(key, value):
301301

302302
self.assertEqual(metrics['h'].samples, expected_histogram)
303303

304+
def test_restrict(self):
305+
pid = 0
306+
values.ValueClass = MultiProcessValue(lambda: pid)
307+
labels = {i: i for i in 'abcd'}
308+
309+
def add_label(key, value):
310+
l = labels.copy()
311+
l[key] = value
312+
return l
313+
314+
c = Counter('c', 'help', labelnames=labels.keys(), registry=None)
315+
g = Gauge('g', 'help', labelnames=labels.keys(), registry=None)
316+
317+
c.labels(**labels).inc(1)
318+
g.labels(**labels).set(1)
319+
320+
pid = 1
321+
322+
c.labels(**labels).inc(1)
323+
g.labels(**labels).set(1)
324+
325+
metrics = {m.name: m for m in self.registry.restricted_registry(['c_total']).collect()}
326+
327+
self.assertEqual(metrics.keys(), {'c'})
328+
329+
self.assertEqual(
330+
metrics['c'].samples, [Sample('c_total', labels, 2.0)]
331+
)
332+
304333
def test_collect_preserves_help(self):
305334
pid = 0
306335
values.ValueClass = MultiProcessValue(lambda: pid)

0 commit comments

Comments
 (0)