From fac3c98316915693d5ff7c6adccd157d52c0e68e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Bartosi=C5=84ski?= Date: Wed, 16 Jul 2025 10:14:53 +0200 Subject: [PATCH] Add a health check API endpoint --- inbox/interruptible_threading.py | 16 +++++- inbox/mailsync/frontend.py | 90 ++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/inbox/interruptible_threading.py b/inbox/interruptible_threading.py index e59b3bd78..e29e3c73e 100644 --- a/inbox/interruptible_threading.py +++ b/inbox/interruptible_threading.py @@ -84,7 +84,8 @@ def __init__( ) self.__exception: Exception | None = None - self._timeout_deadline: "float | None" = None + self._timeout_deadline: float | None = None + self.last_ping_time: float | None = None super().__init__() @@ -158,6 +159,11 @@ def _check_interrupted(self) -> None: ): raise InterruptibleThreadTimeout() + self._ping() + + def _ping(self) -> None: + self.last_ping_time = time.monotonic() + P = ParamSpec("P") T = TypeVar("T") @@ -241,6 +247,14 @@ def check_interrupted(current_thread: InterruptibleThread, /) -> None: return current_thread._check_interrupted() +@_interruptible(lambda: None) +def ping(current_thread: InterruptibleThread, /) -> None: + """ + Bump the last ping timestamp for the current thread. + """ + return current_thread._ping() + + class InterruptibleThreadTimeout(BaseException): """ Exception raised when the the timeout set by `timeout` context manager diff --git a/inbox/mailsync/frontend.py b/inbox/mailsync/frontend.py index d7308e6d4..733f6e5f2 100644 --- a/inbox/mailsync/frontend.py +++ b/inbox/mailsync/frontend.py @@ -1,10 +1,17 @@ +import random import threading +import time +import structlog from flask import Flask, jsonify, request +from flask.typing import ResponseReturnValue from pympler import muppy, summary # type: ignore[import-untyped] from werkzeug.serving import WSGIRequestHandler, run_simple from inbox.instrumentation import ProfileCollector +from inbox.interruptible_threading import InterruptibleThread + +log = structlog.get_logger() class ProfilingHTTPFrontend: @@ -18,6 +25,11 @@ class ProfilingHTTPFrontend: def __init__(self, port, profile) -> None: # type: ignore[no-untyped-def] self.port = port self.profiler = ProfileCollector() if profile else None + # Start reporting as unhealthy after 240-360 minutes to allow + # this process to be restarted after this time. + self.report_unhealthy_at = time.monotonic() + random.randint( + 240 * 60, 360 * 60 + ) def _create_app(self): # type: ignore[no-untyped-def] app = Flask(__name__) @@ -50,6 +62,84 @@ def profile(): # type: ignore[no-untyped-def] def load() -> str: return "Load tracing disabled\n" + @app.route("/health") + def health() -> ResponseReturnValue: + now = time.monotonic() + threads = [ + thread + for thread in threading.enumerate() + if isinstance(thread, InterruptibleThread) + ] + threads_count = len(threads) + threads_delayed_5m_count = sum( + 1 + for thread in threads + if not thread.last_ping_time + or now - thread.last_ping_time > 5 * 60 + ) + threads_delayed_20m_count = sum( + 1 + for thread in threads + if not thread.last_ping_time + or now - thread.last_ping_time > 20 * 60 + ) + threads_delayed_60m_count = sum( + 1 + for thread in threads + if not thread.last_ping_time + or now - thread.last_ping_time > 60 * 60 + ) + + longevity_deadline_reached = now >= self.report_unhealthy_at + service_stuck = ( + # Treat as stuck if there are threads running, and: + threads_count + and ( + # Any of them are delayed by 60m+ + threads_delayed_60m_count + or ( + # Or there are at least 50 threads, and 10%+ are + # delayed by 20m+ + threads_count >= 50 + and threads_delayed_20m_count / threads_count >= 0.1 + ) + or ( + # Or there are at least 10 threads, and 40%+ are + # delayed by 5m+ + threads_count >= 10 + and threads_delayed_5m_count / threads_count >= 0.4 + ) + ) + ) + + is_healthy = not longevity_deadline_reached and not service_stuck + stats = { + "threads_delayed_5m_count": threads_delayed_5m_count, + "threads_delayed_20m_count": threads_delayed_20m_count, + "threads_delayed_60m_count": threads_delayed_60m_count, + "max_delay": max( + # XXX: Temporary. Remove me if everything's working fine in prod. + ( + ( + now - thread.last_ping_time + if thread.last_ping_time is not None + else -1 + ), + thread.__class__.__name__, + ) + for thread in threads + ), + "threads_count": threads_count, + "longevity_deadline_reached": longevity_deadline_reached, + "is_healthy": is_healthy, + } + + if service_stuck: + log.error("The service is stuck", stats=stats) + + response_status = 200 if is_healthy else 503 + return jsonify(stats), response_status + @app.route("/mem") def mem(): # type: ignore[no-untyped-def] objs = muppy.get_objects()