From 2c49eedf4c54126ae761c46165480c9f57c259c1 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 10 Nov 2025 08:18:42 -0800 Subject: [PATCH 1/3] Update SDK to use tandem pool --- .../model/fn_execution/v1/beam_fn_api.proto | 8 ++ .../examples/ratelimit/__init__.py | 0 .../ratelimit/beam_streaming_example.py | 102 ++++++++++++++++++ .../examples/ratelimit/buf.gen.yaml | 6 ++ .../apache_beam/examples/ratelimit/buf.lock | 38 +++++++ .../apache_beam/examples/ratelimit/buf.yaml | 3 + .../apache_beam/runners/worker/sdk_worker.py | 47 +++++++- 7 files changed, 203 insertions(+), 1 deletion(-) create mode 100644 sdks/python/apache_beam/examples/ratelimit/__init__.py create mode 100644 sdks/python/apache_beam/examples/ratelimit/beam_streaming_example.py create mode 100644 sdks/python/apache_beam/examples/ratelimit/buf.gen.yaml create mode 100644 sdks/python/apache_beam/examples/ratelimit/buf.lock create mode 100644 sdks/python/apache_beam/examples/ratelimit/buf.yaml diff --git a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto b/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto index 9b32048b4995..7704d4bdd3a5 100644 --- a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto +++ b/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto @@ -157,6 +157,7 @@ message InstructionRequest { MonitoringInfosMetadataRequest monitoring_infos = 1005; HarnessMonitoringInfosRequest harness_monitoring_infos = 1006; SampleDataRequest sample_data = 1007; + AiWorkerPoolMetadata ai_worker_pool_metadata = 1008; // DEPRECATED RegisterRequest register = 1000; @@ -529,6 +530,13 @@ message MonitoringInfosMetadataResponse { map monitoring_info = 1; } +message AiWorkerPoolMetadata { + // The external IP address of the AI worker pool. + string external_ip = 1; + // The external port of the AI worker pool. + int32 external_port = 2; +} + // Represents a request to the SDK to split a currently active bundle. message ProcessBundleSplitRequest { // (Required) A reference to an active process bundle request with the given diff --git a/sdks/python/apache_beam/examples/ratelimit/__init__.py b/sdks/python/apache_beam/examples/ratelimit/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/python/apache_beam/examples/ratelimit/beam_streaming_example.py b/sdks/python/apache_beam/examples/ratelimit/beam_streaming_example.py new file mode 100644 index 000000000000..cebe4eaa6a79 --- /dev/null +++ b/sdks/python/apache_beam/examples/ratelimit/beam_streaming_example.py @@ -0,0 +1,102 @@ +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions + +import grpc +import logging +import os +import sys + + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'generated_proto'))) + +from envoy.service.ratelimit.v3 import rls_pb2 +from envoy.service.ratelimit.v3 import rls_pb2_grpc +from envoy.extensions.common.ratelimit.v3 import ratelimit_pb2 + +# Set up logging +logging.basicConfig(level=logging.INFO) +_LOGGER = logging.getLogger(__name__) + +# Default Envoy proxy address +ENVOY_PROXY_ADDRESS = 'localhost:8081' + +class GRPCRateLimitClient(beam.DoFn): + """ + A DoFn that makes gRPC calls to an Envoy Rate Limit Service. + """ + def __init__(self, envoy_address): + self._envoy_address = envoy_address + self._channel = None + self._stub = None + + def setup(self): + """ + Initializes the gRPC channel and stub. + """ + _LOGGER.info(f"Setting up gRPC client for Envoy at {self._envoy_address}") + self._channel = grpc.insecure_channel(self._envoy_address) + self._stub = rls_pb2_grpc.RateLimitServiceStub(self._channel) + + def process(self, element): + client_id = element.get('client_id', 'unknown_client') + request_id = element.get('request_id', 'unknown_request') + + _LOGGER.info(f"Processing element: client_id={client_id}, request_id={request_id}") + + # Create a RateLimitDescriptor + descriptor = ratelimit_pb2.RateLimitDescriptor() + descriptor.entries.add(key="client_id", value=client_id) + descriptor.entries.add(key="request_id", value=request_id) + # Add more descriptors as needed for your rate limiting policy + + # Create a RateLimitRequest + request = rls_pb2.RateLimitRequest( + domain="my_service", # This should match your Envoy rate limit configuration + descriptors=[descriptor], + hits_addend=1 + ) + + try: + response = self._stub.ShouldRateLimit(request) + _LOGGER.info(f"RateLimitResponse for client_id={client_id}, request_id={request_id}: {response.overall_code}") + yield { + 'client_id': client_id, + 'request_id': request_id, + 'rate_limit_status': rls_pb2.RateLimitResponse.Code.Name(response.overall_code), + 'response_details': str(response) + } + except grpc.RpcError as e: + _LOGGER.error(f"gRPC call failed for client_id={client_id}, request_id={request_id}: {e.details()}") + yield { + 'client_id': client_id, + 'request_id': request_id, + 'rate_limit_status': 'ERROR', + 'error_details': e.details() + } + + def teardown(self): + if self._channel: + _LOGGER.info("Tearing down gRPC client.") + self._channel.close() + +def run(): + options = PipelineOptions() + options.view_as(StandardOptions).runner = 'DirectRunner' # Use DirectRunner for local testing + + with beam.Pipeline(options=options) as p: + # Sample input data + requests = p | 'CreateRequests' >> beam.Create([ + {'client_id': 'user_1', 'request_id': 'req_a'}, + {'client_id': 'user_2', 'request_id': 'req_b'}, + {'client_id': 'user_1', 'request_id': 'req_c'}, + {'client_id': 'user_3', 'request_id': 'req_d'}, + ]) + + # Apply the gRPC client DoFn + rate_limit_results = requests | 'CheckRateLimit' >> beam.ParDo(GRPCRateLimitClient(ENVOY_PROXY_ADDRESS)) + + # Log the results + rate_limit_results | 'LogResults' >> beam.Map(lambda x: _LOGGER.info(f"Result: {x}")) + +if __name__ == '__main__': + run() diff --git a/sdks/python/apache_beam/examples/ratelimit/buf.gen.yaml b/sdks/python/apache_beam/examples/ratelimit/buf.gen.yaml new file mode 100644 index 000000000000..9cd71733e671 --- /dev/null +++ b/sdks/python/apache_beam/examples/ratelimit/buf.gen.yaml @@ -0,0 +1,6 @@ +version: v1 +plugins: + - plugin: buf.build/protocolbuffers/python + out: generated_proto + - plugin: buf.build/grpc/python + out: generated_proto diff --git a/sdks/python/apache_beam/examples/ratelimit/buf.lock b/sdks/python/apache_beam/examples/ratelimit/buf.lock new file mode 100644 index 000000000000..fff2c8e7996f --- /dev/null +++ b/sdks/python/apache_beam/examples/ratelimit/buf.lock @@ -0,0 +1,38 @@ +# Generated by buf. DO NOT EDIT. +version: v1 +deps: + - remote: buf.build + owner: cncf + repository: xds + commit: af01e9416f124dc0a8e4a225392b4664 + digest: shake256:2b63fbc19ea95060f2ae22e16e4b2b467356a7a0c296923fe39d991b8128010b5d00780c1417bf61dfb1c6a0dd6c269ba1767b322fc3400d279b59f37d598cb2 + - remote: buf.build + owner: envoyproxy + repository: envoy + commit: 86528c2a78124314bed0b23bea1f7006 + digest: shake256:b05607c903c3c8d90d65de6ff83ab5565d5614952fcd6395a20c7e21644d12a58661f0a11b4f86a1a782ac7b81b6337caa5d7c15d930d7be63fe727fe05fc920 + - remote: buf.build + owner: envoyproxy + repository: protoc-gen-validate + commit: daf171c6cdb54629b5f51e345a79e4dd + digest: shake256:4ae167d7eed10da5f83a3f5df8c670d249170f11b1f2fd19afda06be2cff4d47dcc95e9e4a15151ecc8ce2d3d3614caf9a04d3ad82fb768a3870dedfa9455f36 + - remote: buf.build + owner: google + repository: cel-spec + commit: dffb8c8cf7814e96a7c06b6a5ac72fdc + digest: shake256:532025e30281fc0c3a26d6446cd4ebc98e82322800dda3c12ab5a6c0b24285c468ccd824e72fbd7f64510593882c8db1bfea7deb80603f2c9c1a38cd40847252 + - remote: buf.build + owner: googleapis + repository: googleapis + commit: 72c8614f3bd0466ea67931ef2c43d608 + digest: shake256:b3ac4d383db09f92ab0ca85d12bff8c49eddf7031bd3a854c260b6ac4ed6a2bb85b52b3393c316d28f8038bf3b8e70cb3d16470e8cc4423007678fb6d89d36d4 + - remote: buf.build + owner: opentelemetry + repository: opentelemetry + commit: 8f7f39d683754864af50f17437d43bdb + digest: shake256:876c5eafb14069d6ee37de358b4913cc3b2f3bcd5948e4a7b7694d4e95b53a30816c6a488e9757e15e7f11bf13090195746f13015bd935224df711215dbf0152 + - remote: buf.build + owner: prometheus + repository: client-model + commit: e171c0b235c546d5a9a597c2961bd357 + digest: shake256:7db3f73ac0f1dce71e70f304f318e9741e857fd78b7b42f0df7a3da353fbb2f387899da7b0a77ac9ee9565194510e39a913cdb9a8ab3c2ff4b8713428c795213 diff --git a/sdks/python/apache_beam/examples/ratelimit/buf.yaml b/sdks/python/apache_beam/examples/ratelimit/buf.yaml new file mode 100644 index 000000000000..113c50eb4d16 --- /dev/null +++ b/sdks/python/apache_beam/examples/ratelimit/buf.yaml @@ -0,0 +1,3 @@ +version: v1 +deps: + - buf.build/envoyproxy/envoy diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 6060ff8d54a8..63c38089d11f 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -69,7 +69,7 @@ from apache_beam.utils import thread_pool_executor from apache_beam.utils.sentinel import Sentinel from apache_beam.version import __version__ as beam_version - +import dataclasses if TYPE_CHECKING: from apache_beam.portability.api import endpoints_pb2 from apache_beam.utils.profiler import Profile @@ -104,6 +104,44 @@ }] }) +@dataclasses.dataclass +class AiWorkerPoolMetadata: + """Runtime metadata about AI worker pool resources, such as external IP and + port. + + Attributes: + external_ip (str): The external IP address of the AI worker pool. + external_port (int): The external port of the AI worker pool. + """ + external_ip: Optional[str] = None + external_port: Optional[int] = None + + @classmethod + def from_proto(cls, proto): + # type: (beam_fn_api_pb2.AiWorkerPoolMetadata) -> AiWorkerPoolMetadata + """Creates an instance from an AiWorkerPoolMetadata proto.""" + return cls( + external_ip=proto.external_ip if proto.external_ip else None, + external_port=proto.external_port if proto.external_port else None) + + +class _AiMetadataHolder: + """Singleton holder for AiWorkerPoolMetadata.""" + _metadata: Optional[AiWorkerPoolMetadata] = None + _lock = threading.Lock() + + @classmethod + def set_metadata(cls, proto): + # type: (beam_fn_api_pb2.AiWorkerPoolMetadata) -> None + with cls._lock: + cls._metadata = AiWorkerPoolMetadata.from_proto(proto) + + @classmethod + def get_metadata(cls) -> Optional[AiWorkerPoolMetadata]: + return cls._metadata + +def get_ai_worker_pool_metadata() -> Optional[AiWorkerPoolMetadata]: + return _AiMetadataHolder.get_metadata() class ShortIdCache(object): """ Cache for MonitoringInfo "short ids" @@ -393,6 +431,13 @@ def task(): _LOGGER.debug( "Currently using %s threads." % len(self._worker_thread_pool._workers)) + def _request_ai_worker_pool_metadata(self, request): + # type: (beam_fn_api_pb2.InstructionRequest) -> None + _AiMetadataHolder.set_metadata(request.ai_worker_pool_metadata) + _LOGGER.info("received metadata for AI worker pool: %s", request.ai_worker_pool_metadata) + self._responses.put( + beam_fn_api_pb2.InstructionResponse(instruction_id=request.instruction_id)) + def _request_sample_data(self, request): # type: (beam_fn_api_pb2.InstructionRequest) -> None From 5ee979662dcc1922121f56ee412c8d2b58836e37 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Tue, 11 Nov 2025 09:37:04 -0800 Subject: [PATCH 2/3] Add example --- .../ratelimit/beam_streaming_example.py | 15 ++++---- .../ratelimit/beam_streaming_example2.py | 31 +++++++++++++++ .../examples/ratelimit/buf.gen.yaml | 6 --- .../apache_beam/examples/ratelimit/buf.lock | 38 ------------------- 4 files changed, 38 insertions(+), 52 deletions(-) create mode 100644 sdks/python/apache_beam/examples/ratelimit/beam_streaming_example2.py delete mode 100644 sdks/python/apache_beam/examples/ratelimit/buf.gen.yaml delete mode 100644 sdks/python/apache_beam/examples/ratelimit/buf.lock diff --git a/sdks/python/apache_beam/examples/ratelimit/beam_streaming_example.py b/sdks/python/apache_beam/examples/ratelimit/beam_streaming_example.py index cebe4eaa6a79..ce44d83e75e0 100644 --- a/sdks/python/apache_beam/examples/ratelimit/beam_streaming_example.py +++ b/sdks/python/apache_beam/examples/ratelimit/beam_streaming_example.py @@ -1,5 +1,6 @@ import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions +from apache_beam.runners.worker.sdk_worker import get_ai_worker_pool_metadata import grpc import logging @@ -17,15 +18,12 @@ logging.basicConfig(level=logging.INFO) _LOGGER = logging.getLogger(__name__) -# Default Envoy proxy address -ENVOY_PROXY_ADDRESS = 'localhost:8081' - class GRPCRateLimitClient(beam.DoFn): """ A DoFn that makes gRPC calls to an Envoy Rate Limit Service. """ - def __init__(self, envoy_address): - self._envoy_address = envoy_address + def __init__(self): + self._envoy_address = None self._channel = None self._stub = None @@ -33,6 +31,8 @@ def setup(self): """ Initializes the gRPC channel and stub. """ + ai_worker_pool_metadata = get_ai_worker_pool_metadata() + self._envoy_address = f"{ai_worker_pool_metadata.external_ip}:{ai_worker_pool_metadata.external_port}" _LOGGER.info(f"Setting up gRPC client for Envoy at {self._envoy_address}") self._channel = grpc.insecure_channel(self._envoy_address) self._stub = rls_pb2_grpc.RateLimitServiceStub(self._channel) @@ -47,11 +47,10 @@ def process(self, element): descriptor = ratelimit_pb2.RateLimitDescriptor() descriptor.entries.add(key="client_id", value=client_id) descriptor.entries.add(key="request_id", value=request_id) - # Add more descriptors as needed for your rate limiting policy # Create a RateLimitRequest request = rls_pb2.RateLimitRequest( - domain="my_service", # This should match your Envoy rate limit configuration + domain="my_service", descriptors=[descriptor], hits_addend=1 ) @@ -93,7 +92,7 @@ def run(): ]) # Apply the gRPC client DoFn - rate_limit_results = requests | 'CheckRateLimit' >> beam.ParDo(GRPCRateLimitClient(ENVOY_PROXY_ADDRESS)) + rate_limit_results = requests | 'CheckRateLimit' >> beam.ParDo(GRPCRateLimitClient()) # Log the results rate_limit_results | 'LogResults' >> beam.Map(lambda x: _LOGGER.info(f"Result: {x}")) diff --git a/sdks/python/apache_beam/examples/ratelimit/beam_streaming_example2.py b/sdks/python/apache_beam/examples/ratelimit/beam_streaming_example2.py new file mode 100644 index 000000000000..c561e43bea11 --- /dev/null +++ b/sdks/python/apache_beam/examples/ratelimit/beam_streaming_example2.py @@ -0,0 +1,31 @@ +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions +import logging +from apache_beam.runners.worker.sdk_worker import get_ai_worker_pool_metadata + +logging.basicConfig(level=logging.INFO) + +class PrintFn(beam.DoFn): + def process(self, element): + logging.info(f"Processing element: {element} and worker metadata {get_ai_worker_pool_metadata()}") + yield element + +pipeline_options = PipelineOptions() +pipeline = beam.Pipeline(options=pipeline_options) + +# Create a PCollection from a list of elements for this batch job. +data = pipeline | 'Create' >> beam.Create([ + 'Hello', + 'World', + 'This', + 'is', + 'a', + 'batch', + 'example', +]) + +# Apply the custom DoFn with resource hints. +data | 'PrintWithDoFn' >> beam.ParDo(PrintFn()) + +result = pipeline.run() +result.wait_until_finish() diff --git a/sdks/python/apache_beam/examples/ratelimit/buf.gen.yaml b/sdks/python/apache_beam/examples/ratelimit/buf.gen.yaml deleted file mode 100644 index 9cd71733e671..000000000000 --- a/sdks/python/apache_beam/examples/ratelimit/buf.gen.yaml +++ /dev/null @@ -1,6 +0,0 @@ -version: v1 -plugins: - - plugin: buf.build/protocolbuffers/python - out: generated_proto - - plugin: buf.build/grpc/python - out: generated_proto diff --git a/sdks/python/apache_beam/examples/ratelimit/buf.lock b/sdks/python/apache_beam/examples/ratelimit/buf.lock deleted file mode 100644 index fff2c8e7996f..000000000000 --- a/sdks/python/apache_beam/examples/ratelimit/buf.lock +++ /dev/null @@ -1,38 +0,0 @@ -# Generated by buf. DO NOT EDIT. -version: v1 -deps: - - remote: buf.build - owner: cncf - repository: xds - commit: af01e9416f124dc0a8e4a225392b4664 - digest: shake256:2b63fbc19ea95060f2ae22e16e4b2b467356a7a0c296923fe39d991b8128010b5d00780c1417bf61dfb1c6a0dd6c269ba1767b322fc3400d279b59f37d598cb2 - - remote: buf.build - owner: envoyproxy - repository: envoy - commit: 86528c2a78124314bed0b23bea1f7006 - digest: shake256:b05607c903c3c8d90d65de6ff83ab5565d5614952fcd6395a20c7e21644d12a58661f0a11b4f86a1a782ac7b81b6337caa5d7c15d930d7be63fe727fe05fc920 - - remote: buf.build - owner: envoyproxy - repository: protoc-gen-validate - commit: daf171c6cdb54629b5f51e345a79e4dd - digest: shake256:4ae167d7eed10da5f83a3f5df8c670d249170f11b1f2fd19afda06be2cff4d47dcc95e9e4a15151ecc8ce2d3d3614caf9a04d3ad82fb768a3870dedfa9455f36 - - remote: buf.build - owner: google - repository: cel-spec - commit: dffb8c8cf7814e96a7c06b6a5ac72fdc - digest: shake256:532025e30281fc0c3a26d6446cd4ebc98e82322800dda3c12ab5a6c0b24285c468ccd824e72fbd7f64510593882c8db1bfea7deb80603f2c9c1a38cd40847252 - - remote: buf.build - owner: googleapis - repository: googleapis - commit: 72c8614f3bd0466ea67931ef2c43d608 - digest: shake256:b3ac4d383db09f92ab0ca85d12bff8c49eddf7031bd3a854c260b6ac4ed6a2bb85b52b3393c316d28f8038bf3b8e70cb3d16470e8cc4423007678fb6d89d36d4 - - remote: buf.build - owner: opentelemetry - repository: opentelemetry - commit: 8f7f39d683754864af50f17437d43bdb - digest: shake256:876c5eafb14069d6ee37de358b4913cc3b2f3bcd5948e4a7b7694d4e95b53a30816c6a488e9757e15e7f11bf13090195746f13015bd935224df711215dbf0152 - - remote: buf.build - owner: prometheus - repository: client-model - commit: e171c0b235c546d5a9a597c2961bd357 - digest: shake256:7db3f73ac0f1dce71e70f304f318e9741e857fd78b7b42f0df7a3da353fbb2f387899da7b0a77ac9ee9565194510e39a913cdb9a8ab3c2ff4b8713428c795213 From 4de6d37c8f93ab7b4cedf6d44875636abeddd481 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Tue, 11 Nov 2025 09:39:37 -0800 Subject: [PATCH 3/3] Rename files --- .../ratelimit/{beam_streaming_example.py => beam_example.py} | 0 .../ratelimit/{beam_streaming_example2.py => beam_example2.py} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename sdks/python/apache_beam/examples/ratelimit/{beam_streaming_example.py => beam_example.py} (100%) rename sdks/python/apache_beam/examples/ratelimit/{beam_streaming_example2.py => beam_example2.py} (100%) diff --git a/sdks/python/apache_beam/examples/ratelimit/beam_streaming_example.py b/sdks/python/apache_beam/examples/ratelimit/beam_example.py similarity index 100% rename from sdks/python/apache_beam/examples/ratelimit/beam_streaming_example.py rename to sdks/python/apache_beam/examples/ratelimit/beam_example.py diff --git a/sdks/python/apache_beam/examples/ratelimit/beam_streaming_example2.py b/sdks/python/apache_beam/examples/ratelimit/beam_example2.py similarity index 100% rename from sdks/python/apache_beam/examples/ratelimit/beam_streaming_example2.py rename to sdks/python/apache_beam/examples/ratelimit/beam_example2.py