Skip to content

Commit 2dae384

Browse files
fix(preprod): use separate producer for preprod EAP (#105197)
Realized I accidentally piggy-backed off Replay's Kafka producer, so this creates our own named producer. I also renamed `produce_preprod_size_metric_to_eap` to make it more obvious it goes through Kafka and therefore not a synchronous write. --------- Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com>
1 parent 4f8f84f commit 2dae384

File tree

5 files changed

+39
-23
lines changed

5 files changed

+39
-23
lines changed

src/sentry/preprod/eap/write.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,28 @@
44
from typing import Any
55

66
from arroyo import Topic as ArroyoTopic
7-
from arroyo.backends.kafka import KafkaPayload
7+
from arroyo.backends.kafka import KafkaPayload, KafkaProducer
88
from google.protobuf.timestamp_pb2 import Timestamp
9+
from sentry_kafka_schemas.codecs import Codec
910
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
11+
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem
1012
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem as EAPTraceItem
1113

12-
from sentry.conf.types.kafka_definition import Topic
14+
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
1315
from sentry.preprod.eap.constants import PREPROD_NAMESPACE
1416
from sentry.preprod.models import PreprodArtifactSizeMetrics
15-
from sentry.replays.lib.kafka import EAP_ITEMS_CODEC, eap_producer
1617
from sentry.search.eap.rpc_utils import anyvalue
18+
from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer
1719
from sentry.utils.kafka_config import get_topic_definition
1820

1921

20-
def write_preprod_size_metric_to_eap(
22+
def produce_preprod_size_metric_to_eap(
2123
size_metric: PreprodArtifactSizeMetrics,
2224
organization_id: int,
2325
project_id: int,
2426
) -> None:
2527
"""
26-
Write a PreprodArtifactSizeMetrics to EAP as a TRACE_ITEM_TYPE_PREPROD trace item.
28+
Write a PreprodArtifactSizeMetrics to EAP topic as a TRACE_ITEM_TYPE_PREPROD trace item.
2729
2830
NOTE: EAP is append-only, so this function should only be called after the size metric has been successfully committed
2931
since we cannot update fields later on. EAP does support ReplacingMergeTree deduplication,
@@ -110,4 +112,18 @@ def write_preprod_size_metric_to_eap(
110112

111113
topic = get_topic_definition(Topic.SNUBA_ITEMS)["real_topic_name"]
112114
payload = KafkaPayload(None, EAP_ITEMS_CODEC.encode(trace_item), [])
113-
eap_producer.produce(ArroyoTopic(topic), payload)
115+
_eap_producer.produce(ArroyoTopic(topic), payload)
116+
117+
118+
EAP_ITEMS_CODEC: Codec[TraceItem] = get_topic_codec(Topic.SNUBA_ITEMS)
119+
120+
121+
def _get_eap_items_producer() -> KafkaProducer:
122+
"""Get a Kafka producer for EAP TraceItems."""
123+
return get_arroyo_producer(
124+
name="sentry.preprod.lib.kafka.eap_items",
125+
topic=Topic.SNUBA_ITEMS,
126+
)
127+
128+
129+
_eap_producer = SingletonProducer(_get_eap_items_producer)

src/sentry/preprod/tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from sentry.models.commitcomparison import CommitComparison
1616
from sentry.models.organization import Organization
1717
from sentry.models.project import Project
18-
from sentry.preprod.eap.write import write_preprod_size_metric_to_eap
18+
from sentry.preprod.eap.write import produce_preprod_size_metric_to_eap
1919
from sentry.preprod.models import (
2020
PreprodArtifact,
2121
PreprodArtifactSizeComparison,
@@ -477,7 +477,7 @@ def _assemble_preprod_artifact_size_analysis(
477477
organization = preprod_artifact.project.organization
478478
if features.has("organizations:preprod-size-metrics-eap-write", organization):
479479
for size_metric in size_metrics_updated:
480-
write_preprod_size_metric_to_eap(
480+
produce_preprod_size_metric_to_eap(
481481
size_metric=size_metric,
482482
organization_id=org_id,
483483
project_id=project.id,

tests/sentry/preprod/eap/test_write.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
88
from sentry.models.commitcomparison import CommitComparison
9-
from sentry.preprod.eap.write import write_preprod_size_metric_to_eap
9+
from sentry.preprod.eap.write import produce_preprod_size_metric_to_eap
1010
from sentry.preprod.models import (
1111
PreprodArtifact,
1212
PreprodArtifactSizeMetrics,
@@ -16,7 +16,7 @@
1616

1717

1818
class WritePreprodSizeMetricToEAPTest(TestCase):
19-
@patch("sentry.preprod.eap.write.eap_producer.produce")
19+
@patch("sentry.preprod.eap.write._eap_producer.produce")
2020
def test_write_preprod_size_metric_encodes_all_fields_correctly(self, mock_produce):
2121
commit_comparison = CommitComparison.objects.create(
2222
organization_id=self.organization.id,
@@ -61,7 +61,7 @@ def test_write_preprod_size_metric_encodes_all_fields_correctly(self, mock_produ
6161
analysis_file_id=123,
6262
)
6363

64-
write_preprod_size_metric_to_eap(
64+
produce_preprod_size_metric_to_eap(
6565
size_metric=size_metric,
6666
organization_id=self.organization.id,
6767
project_id=self.project.id,
@@ -117,7 +117,7 @@ def test_write_preprod_size_metric_encodes_all_fields_correctly(self, mock_produ
117117
assert attrs["git_base_ref"].string_value == "main"
118118
assert attrs["git_pr_number"].int_value == 42
119119

120-
@patch("sentry.preprod.eap.write.eap_producer.produce")
120+
@patch("sentry.preprod.eap.write._eap_producer.produce")
121121
def test_write_preprod_size_metric_handles_optional_fields(self, mock_produce):
122122
artifact = self.create_preprod_artifact(
123123
project=self.project,
@@ -130,7 +130,7 @@ def test_write_preprod_size_metric_handles_optional_fields(self, mock_produce):
130130
state=PreprodArtifactSizeMetrics.SizeAnalysisState.COMPLETED,
131131
)
132132

133-
write_preprod_size_metric_to_eap(
133+
produce_preprod_size_metric_to_eap(
134134
size_metric=size_metric,
135135
organization_id=self.organization.id,
136136
project_id=self.project.id,

tests/sentry/preprod/test_tasks.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -755,15 +755,15 @@ def test_assemble_preprod_artifact_size_analysis_removes_stale_metrics(self) ->
755755
def test_assemble_preprod_artifact_size_analysis_writes_to_eap_when_flag_enabled(self) -> None:
756756
"""Test that size metrics are written to EAP when feature flag is enabled"""
757757
with self.feature("organizations:preprod-size-metrics-eap-write"):
758-
with patch("sentry.preprod.tasks.write_preprod_size_metric_to_eap") as mock_eap_write:
758+
with patch("sentry.preprod.tasks.produce_preprod_size_metric_to_eap") as mock_eap_write:
759759
status, details = self._run_task_and_verify_status(
760760
b'{"analysis_duration": 1.5, "download_size": 1000, "install_size": 2000, "treemap": null, "analysis_version": null, "app_components": [{"component_type": 0, "name": "Main App", "app_id": "com.example.app", "path": "/", "download_size": 1000, "install_size": 2000}]}'
761761
)
762762

763763
assert status == ChunkFileState.OK
764764
assert details is None
765765

766-
# Verify write_preprod_size_metric_to_eap was called exactly once
766+
# Verify produce_preprod_size_metric_to_eap was called exactly once
767767
# We have other integration tests that verify the EAP write itself works
768768
assert mock_eap_write.call_count == 1
769769

@@ -776,15 +776,15 @@ def test_assemble_preprod_artifact_size_analysis_writes_to_eap_when_flag_enabled
776776

777777
def test_assemble_preprod_artifact_size_analysis_skips_eap_when_flag_disabled(self) -> None:
778778
"""Test that size metrics are NOT written to EAP when feature flag is disabled"""
779-
with patch("sentry.preprod.tasks.write_preprod_size_metric_to_eap") as mock_eap_write:
779+
with patch("sentry.preprod.tasks.produce_preprod_size_metric_to_eap") as mock_eap_write:
780780
status, details = self._run_task_and_verify_status(
781781
b'{"analysis_duration": 1.5, "download_size": 1000, "install_size": 2000, "treemap": null, "analysis_version": null, "app_components": [{"component_type": 0, "name": "Main App", "app_id": "com.example.app", "path": "/", "download_size": 1000, "install_size": 2000}]}'
782782
)
783783

784784
assert status == ChunkFileState.OK
785785
assert details is None
786786

787-
# Verify write_preprod_size_metric_to_eap was NOT called
787+
# Verify produce_preprod_size_metric_to_eap was NOT called
788788
mock_eap_write.assert_not_called()
789789

790790
def test_assemble_preprod_artifact_size_analysis_eap_write_failure_does_not_fail_task(
@@ -793,7 +793,7 @@ def test_assemble_preprod_artifact_size_analysis_eap_write_failure_does_not_fail
793793
"""Test that EAP write failures don't cause the main task to fail"""
794794
with self.feature("organizations:preprod-size-metrics-eap-write"):
795795
with patch(
796-
"sentry.preprod.tasks.write_preprod_size_metric_to_eap",
796+
"sentry.preprod.tasks.produce_preprod_size_metric_to_eap",
797797
side_effect=Exception("EAP write failed"),
798798
):
799799
status, details = self._run_task_and_verify_status(
@@ -814,7 +814,7 @@ def test_assemble_preprod_artifact_size_analysis_eap_write_failure_does_not_fail
814814
def test_assemble_preprod_artifact_size_analysis_writes_multiple_metrics_to_eap(self) -> None:
815815
"""Test that all size metrics (main + components) are written to EAP when flag is enabled"""
816816
with self.feature("organizations:preprod-size-metrics-eap-write"):
817-
with patch("sentry.preprod.tasks.write_preprod_size_metric_to_eap") as mock_eap_write:
817+
with patch("sentry.preprod.tasks.produce_preprod_size_metric_to_eap") as mock_eap_write:
818818
status, details = self._run_task_and_verify_status(
819819
b'{"analysis_duration": 2.5, "download_size": 5000, "install_size": 10000, "treemap": null, "analysis_version": "1.0", "app_components": [{"component_type": 0, "name": "Main App", "app_id": "com.example.app", "path": "/", "download_size": 3000, "install_size": 6000}, {"component_type": 1, "name": "Watch App", "app_id": "com.example.app.watchkitapp", "path": "/Watch", "download_size": 2000, "install_size": 4000}]}'
820820
)

tests/snuba/preprod/eap/test_preprod_eap_integration.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from sentry.models.commitcomparison import CommitComparison
99
from sentry.preprod.eap.read import query_preprod_size_metrics
10-
from sentry.preprod.eap.write import write_preprod_size_metric_to_eap
10+
from sentry.preprod.eap.write import produce_preprod_size_metric_to_eap
1111
from sentry.preprod.models import (
1212
PreprodArtifact,
1313
PreprodArtifactSizeMetrics,
@@ -58,7 +58,7 @@ def test_write_and_read_size_metric_round_trip(self):
5858
analysis_file_id=123,
5959
)
6060

61-
write_preprod_size_metric_to_eap(
61+
produce_preprod_size_metric_to_eap(
6262
size_metric=size_metric,
6363
organization_id=self.organization.id,
6464
project_id=self.project.id,
@@ -153,13 +153,13 @@ def test_write_multiple_size_metrics_same_artifact(self):
153153
max_install_size=1000,
154154
)
155155

156-
write_preprod_size_metric_to_eap(
156+
produce_preprod_size_metric_to_eap(
157157
size_metric=size_metric_main,
158158
organization_id=self.organization.id,
159159
project_id=self.project.id,
160160
)
161161

162-
write_preprod_size_metric_to_eap(
162+
produce_preprod_size_metric_to_eap(
163163
size_metric=size_metric_watch,
164164
organization_id=self.organization.id,
165165
project_id=self.project.id,

0 commit comments

Comments
 (0)