From c5bd1c3daf21002dfb04d34dbf1d3ca962108257 Mon Sep 17 00:00:00 2001 From: Tomer Keshet Date: Wed, 14 Jan 2026 13:21:15 +0200 Subject: [PATCH 1/6] add tcp keepalive and option to disable websocket keepalive --- src/robusta/core/model/env_vars.py | 9 +++++++ src/robusta/integrations/receiver.py | 35 +++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/src/robusta/core/model/env_vars.py b/src/robusta/core/model/env_vars.py index 8c45ff7a7..27da71633 100644 --- a/src/robusta/core/model/env_vars.py +++ b/src/robusta/core/model/env_vars.py @@ -82,6 +82,15 @@ def load_bool(env_var, default: bool): # Timeout for the ping response, before killing the connection. Must be smaller than the interval WEBSOCKET_PING_TIMEOUT = int(os.environ.get("WEBSOCKET_PING_TIMEOUT", 30)) +# TCP keepalive configuration (disabled by default) +WEBSOCKET_TCP_KEEPALIVE_ENABLED = os.environ.get("WEBSOCKET_TCP_KEEPALIVE_ENABLED", "false").lower() == "true" +# Time in seconds before sending the first keepalive probe (Linux: TCP_KEEPIDLE, macOS: TCP_KEEPALIVE) +WEBSOCKET_TCP_KEEPALIVE_IDLE = int(os.environ.get("WEBSOCKET_TCP_KEEPALIVE_IDLE", 2)) +# Interval in seconds between keepalive probes +WEBSOCKET_TCP_KEEPALIVE_INTERVAL = int(os.environ.get("WEBSOCKET_TCP_KEEPALIVE_INTERVAL", 2)) +# Number of failed probes before connection is considered dead +WEBSOCKET_TCP_KEEPALIVE_COUNT = int(os.environ.get("WEBSOCKET_TCP_KEEPALIVE_COUNT", 5)) + TRACE_INCOMING_REQUESTS = load_bool("TRACE_INCOMING_REQUESTS", False) TRACE_INCOMING_ALERTS = load_bool("TRACE_INCOMING_ALERTS", False) diff --git a/src/robusta/integrations/receiver.py b/src/robusta/integrations/receiver.py index 6f674b18f..6e6701a6a 100644 --- a/src/robusta/integrations/receiver.py +++ b/src/robusta/integrations/receiver.py @@ -4,6 +4,8 @@ import json import logging import os +import socket +import sys import time from concurrent.futures import ThreadPoolExecutor from contextlib import nullcontext @@ -24,6 +26,10 @@ SENTRY_ENABLED, WEBSOCKET_PING_INTERVAL, WEBSOCKET_PING_TIMEOUT, + WEBSOCKET_TCP_KEEPALIVE_COUNT, + WEBSOCKET_TCP_KEEPALIVE_ENABLED, + WEBSOCKET_TCP_KEEPALIVE_IDLE, + WEBSOCKET_TCP_KEEPALIVE_INTERVAL, WEBSOCKET_APP_KEEPALIVE_ENABLED, ) from robusta.core.playbooks.playbook_utils import to_safe_str from robusta.core.playbooks.playbooks_event_handler import PlaybooksEventHandler @@ -42,6 +48,22 @@ WEBSOCKET_THREADPOOL_SIZE = int(os.environ.get("WEBSOCKET_THREADPOOL_SIZE", 10)) +def _get_tcp_keepalive_options() -> tuple: + """Build TCP keepalive socket options tuple for run_forever(sockopt=...).""" + # TCP_KEEPIDLE is Linux-only; macOS uses TCP_KEEPALIVE (0x10) for the same purpose + if sys.platform == "darwin": + tcp_keepalive_idle = 0x10 + else: + tcp_keepalive_idle = socket.TCP_KEEPIDLE + + return ( + (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), + (socket.IPPROTO_TCP, tcp_keepalive_idle, WEBSOCKET_TCP_KEEPALIVE_IDLE), + (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, WEBSOCKET_TCP_KEEPALIVE_INTERVAL), + (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, WEBSOCKET_TCP_KEEPALIVE_COUNT), + ) + + class ValidationResponse(BaseModel): http_code: int = 200 error_code: Optional[int] = None @@ -114,11 +136,22 @@ def start_receiver(self): def run_forever(self): logging.info("starting relay receiver") + sockopt = None + if WEBSOCKET_TCP_KEEPALIVE_ENABLED: + sockopt = _get_tcp_keepalive_options() + logging.info( + f"TCP keepalive enabled: idle={WEBSOCKET_TCP_KEEPALIVE_IDLE}s, " + f"interval={WEBSOCKET_TCP_KEEPALIVE_INTERVAL}s, count={WEBSOCKET_TCP_KEEPALIVE_COUNT}" + ) while self.active: + # Handles WEBSOCKET_PING_INTERVAL == 0 + ping_timeout = WEBSOCKET_PING_TIMEOUT if WEBSOCKET_PING_INTERVAL else None + logging.info("relay websocket starting") self.ws.run_forever( ping_interval=WEBSOCKET_PING_INTERVAL, ping_payload="p", - ping_timeout=WEBSOCKET_PING_TIMEOUT, + ping_timeout=ping_timeout, + sockopt=sockopt, ) logging.info("relay websocket closed") time.sleep(INCOMING_WEBSOCKET_RECONNECT_DELAY_SEC) From a1cedb2bbe41392d1d383302129cbf82001ed140 Mon Sep 17 00:00:00 2001 From: Tomer Keshet Date: Wed, 14 Jan 2026 13:37:29 +0200 Subject: [PATCH 2/6] add debug logs for getting messages --- src/robusta/integrations/receiver.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/robusta/integrations/receiver.py b/src/robusta/integrations/receiver.py index 6e6701a6a..f09f20121 100644 --- a/src/robusta/integrations/receiver.py +++ b/src/robusta/integrations/receiver.py @@ -208,7 +208,11 @@ def __exec_external_request(self, action_request: ExternalActionRequest, validat if sync_response: http_code = 200 if response.get("success") else 500 + logging.debug( + f"Sending results for `{action_request.body.action_name}` {to_safe_str(action_request.body.action_params)} - {http_code}") self.ws.send(data=json.dumps(self.__sync_response(http_code, action_request.request_id, response))) + logging.debug( + f"After Sending results for `{action_request.body.action_name}` {to_safe_str(action_request.body.action_params)} - {http_code}") def __exec_external_stream_request(self, action_request: ExternalActionRequest, validate_timestamp: bool): logging.debug(f"Callback `{action_request.body.action_name}` {to_safe_str(action_request.body.action_params)}") @@ -225,7 +229,11 @@ def __exec_external_stream_request(self, action_request: ExternalActionRequest, action_request.body.action_params, lambda data: self.__stream_response(request_id=action_request.request_id, data=data)) res = "" if res.get("success") else f"event: error\ndata: {json.dumps(res)}\n\n" + + logging.debug(f"Stream Sending result `{action_request.body.action_name}` {to_safe_str(action_request.body.action_params)} - {res}") self.__close_stream_response(action_request.request_id, res) + logging.debug( + f"After Stream Sending result `{action_request.body.action_name}` {to_safe_str(action_request.body.action_params)} - {res}") def _process_action(self, action: ExternalActionRequest, validate_timestamp: bool) -> None: self._executor.submit(self._process_action_sync, action, validate_timestamp) @@ -284,12 +292,18 @@ def on_message(self, ws: websocket.WebSocketApp, message: str) -> None: return if isinstance(incoming_event, SlackActionsMessage): + logging.debug( + f"on_message got Slack callback: {len(incoming_event.actions)} action(s) from " + f"user={incoming_event.user.username if incoming_event.user else 'unknown'}" + ) # slack callbacks have a list of 'actions'. Within each action there a 'value' field, # which container the actual action details we need to run. # This wrapper format is part of the slack API, and cannot be changed by us. for slack_action_request in incoming_event.actions: self._process_action(slack_action_request.value, validate_timestamp=False) else: + logging.debug( + f"on_message got external action request: `{incoming_event.body.action_name}` {to_safe_str(incoming_event.body.action_params)}") self._process_action(incoming_event, validate_timestamp=True) @staticmethod From 0b81089608c65a8946fc25b8d1d4c86d5c571317 Mon Sep 17 00:00:00 2001 From: Tomer Keshet Date: Wed, 14 Jan 2026 14:21:16 +0200 Subject: [PATCH 3/6] fix broken value add log --- src/robusta/integrations/receiver.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/robusta/integrations/receiver.py b/src/robusta/integrations/receiver.py index f09f20121..13e12d3e2 100644 --- a/src/robusta/integrations/receiver.py +++ b/src/robusta/integrations/receiver.py @@ -29,7 +29,7 @@ WEBSOCKET_TCP_KEEPALIVE_COUNT, WEBSOCKET_TCP_KEEPALIVE_ENABLED, WEBSOCKET_TCP_KEEPALIVE_IDLE, - WEBSOCKET_TCP_KEEPALIVE_INTERVAL, WEBSOCKET_APP_KEEPALIVE_ENABLED, + WEBSOCKET_TCP_KEEPALIVE_INTERVAL, ) from robusta.core.playbooks.playbook_utils import to_safe_str from robusta.core.playbooks.playbooks_event_handler import PlaybooksEventHandler @@ -143,6 +143,11 @@ def run_forever(self): f"TCP keepalive enabled: idle={WEBSOCKET_TCP_KEEPALIVE_IDLE}s, " f"interval={WEBSOCKET_TCP_KEEPALIVE_INTERVAL}s, count={WEBSOCKET_TCP_KEEPALIVE_COUNT}" ) + if WEBSOCKET_PING_TIMEOUT: + logging.info( + f"Websocket keepalive enabled: timeout={WEBSOCKET_PING_TIMEOUT}s, " + f"interval={WEBSOCKET_PING_INTERVAL}s" + ) while self.active: # Handles WEBSOCKET_PING_INTERVAL == 0 ping_timeout = WEBSOCKET_PING_TIMEOUT if WEBSOCKET_PING_INTERVAL else None From e5fdee3c2f48d3ef1121a400d171771be78533dd Mon Sep 17 00:00:00 2001 From: Tomer Keshet Date: Wed, 14 Jan 2026 14:31:03 +0200 Subject: [PATCH 4/6] fix log messages --- src/robusta/integrations/receiver.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/robusta/integrations/receiver.py b/src/robusta/integrations/receiver.py index 13e12d3e2..5d001f3c4 100644 --- a/src/robusta/integrations/receiver.py +++ b/src/robusta/integrations/receiver.py @@ -143,10 +143,10 @@ def run_forever(self): f"TCP keepalive enabled: idle={WEBSOCKET_TCP_KEEPALIVE_IDLE}s, " f"interval={WEBSOCKET_TCP_KEEPALIVE_INTERVAL}s, count={WEBSOCKET_TCP_KEEPALIVE_COUNT}" ) - if WEBSOCKET_PING_TIMEOUT: + if WEBSOCKET_PING_INTERVAL: logging.info( - f"Websocket keepalive enabled: timeout={WEBSOCKET_PING_TIMEOUT}s, " - f"interval={WEBSOCKET_PING_INTERVAL}s" + f"Websocket keepalive enabled: interval={WEBSOCKET_PING_INTERVAL}s, " + f"timeout={WEBSOCKET_PING_TIMEOUT}s" ) while self.active: # Handles WEBSOCKET_PING_INTERVAL == 0 @@ -220,7 +220,7 @@ def __exec_external_request(self, action_request: ExternalActionRequest, validat f"After Sending results for `{action_request.body.action_name}` {to_safe_str(action_request.body.action_params)} - {http_code}") def __exec_external_stream_request(self, action_request: ExternalActionRequest, validate_timestamp: bool): - logging.debug(f"Callback `{action_request.body.action_name}` {to_safe_str(action_request.body.action_params)}") + logging.debug(f"Stream Callback `{action_request.body.action_name}` {to_safe_str(action_request.body.action_params)}") validation_response = self.__validate_request(action_request, validate_timestamp) if validation_response.http_code != 200: From cfbac23ab2a349f98356d4bf9aff67394a0c1cdd Mon Sep 17 00:00:00 2001 From: Tomer Keshet Date: Wed, 14 Jan 2026 16:24:06 +0200 Subject: [PATCH 5/6] fixed a CI problem with slack tests --- tests/test_slack.py | 4 ++-- tests/utils/slack_utils.py | 13 +++++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/tests/test_slack.py b/tests/test_slack.py index 9c0818e73..521a5ce44 100644 --- a/tests/test_slack.py +++ b/tests/test_slack.py @@ -23,8 +23,8 @@ def test_send_to_slack(slack_channel: SlackChannel): finding = Finding(title=msg, aggregation_key=msg) finding.add_enrichment([MarkdownBlock("testing")]) slack_params = SlackSinkParams(name="test_slack", slack_channel=slack_channel.channel_name, api_key="") - slack_sender.send_finding_to_slack(finding, slack_params, False) - assert slack_channel.get_latest_message() == msg + ts = slack_sender.send_finding_to_slack(finding, slack_params, False) + assert slack_channel.get_message_by_ts(ts) == msg def test_long_slack_messages(slack_channel: SlackChannel): diff --git a/tests/utils/slack_utils.py b/tests/utils/slack_utils.py index 4890eeffb..a359c1ec5 100644 --- a/tests/utils/slack_utils.py +++ b/tests/utils/slack_utils.py @@ -18,10 +18,23 @@ def was_message_sent_recently(self, expected) -> bool: return False def get_latest_message(self): + # Note: Prefer get_message_by_ts() to avoid race conditions when tests share a channel results = self.client.conversations_history(channel=self.channel_id) messages = results["messages"] return messages[0]["text"] + def get_message_by_ts(self, ts: str) -> str: + """Get message by timestamp - avoids race conditions unlike get_latest_message().""" + results = self.client.conversations_history( + channel=self.channel_id, + latest=ts, + oldest=ts, + inclusive=True, + limit=1 + ) + messages = results["messages"] + return messages[0]["text"] if messages else None + def get_complete_latest_message(self): results = self.client.conversations_history(channel=self.channel_id) messages = results["messages"] From a268c9b3344c37c92e59dac608c24cc2ac7e8e74 Mon Sep 17 00:00:00 2001 From: Tomer Keshet Date: Wed, 14 Jan 2026 16:49:38 +0200 Subject: [PATCH 6/6] fix return value in test --- tests/utils/slack_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/utils/slack_utils.py b/tests/utils/slack_utils.py index a359c1ec5..040b89499 100644 --- a/tests/utils/slack_utils.py +++ b/tests/utils/slack_utils.py @@ -23,7 +23,7 @@ def get_latest_message(self): messages = results["messages"] return messages[0]["text"] - def get_message_by_ts(self, ts: str) -> str: + def get_message_by_ts(self, ts: str) -> str | None: """Get message by timestamp - avoids race conditions unlike get_latest_message().""" results = self.client.conversations_history( channel=self.channel_id,