From 0c4d3f524d4e807de2cc8ca265e84e7c757034f4 Mon Sep 17 00:00:00 2001 From: Summer Date: Tue, 24 Jun 2025 17:25:49 +0000 Subject: [PATCH 01/30] [06/24] Add gke_code_executor.py --- src/google/adk/code_executors/__init__.py | 11 + .../adk/code_executors/gke_code_executor.py | 223 ++++++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 src/google/adk/code_executors/gke_code_executor.py diff --git a/src/google/adk/code_executors/__init__.py b/src/google/adk/code_executors/__init__.py index 12b6d870ad..41d406f63d 100644 --- a/src/google/adk/code_executors/__init__.py +++ b/src/google/adk/code_executors/__init__.py @@ -28,6 +28,7 @@ 'UnsafeLocalCodeExecutor', 'VertexAiCodeExecutor', 'ContainerCodeExecutor', + 'GkeCodeExecutor', ] @@ -52,4 +53,14 @@ def __getattr__(name: str): 'ContainerCodeExecutor requires additional dependencies. ' 'Please install with: pip install "google-adk[extensions]"' ) from e + elif name == 'GkeCodeExecutor': + try: + from .gke_code_executor import GkeCodeExecutor + + return GkeCodeExecutor + except ImportError as e: + raise ImportError( + 'GkeCodeExecutor requires additional dependencies. ' + 'Please install with: pip install "google-adk[extensions]"' + ) from e raise AttributeError(f"module '{__name__}' has no attribute '{name}'") diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py new file mode 100644 index 0000000000..dba2dcb4df --- /dev/null +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -0,0 +1,223 @@ +import logging +import uuid +from typing import Any + +from google.adk.agents.invocation_context import InvocationContext +from google.adk.code_executors.base_code_executor import BaseCodeExecutor +from google.adk.code_executors.code_execution_utils import CodeExecutionInput, CodeExecutionResult + +from kubernetes import client, config +from kubernetes.client.rest import ApiException +from kubernetes.watch import Watch + +logger = logging.getLogger(__name__) + +class GkeCodeExecutor(BaseCodeExecutor): + """ + A secure, robust, and efficient code executor that runs Python code in a + sandboxed gVisor Pod on GKE. + + Features includes: + - Secure code execution via ConfigMaps and a strict security context. + - Kubernetes-native job and pod garbage collection via TTL. + - Efficient, event-driven waiting using the Kubernetes watch API. + - Explicit resource limits to prevent abuse. + """ + namespace: str = "default" + image: str = "python:3.11-slim" + timeout_seconds: int = 3000 + cpu_limit: str = "500m" + mem_limit: str = "512Mi" + use_gvisor_sandbox: bool = True + + _batch_v1: Any = None + _core_v1: Any = None + + def __init__(self, **data): + """ + Initializes the Pydantic model and the Kubernetes clients. + """ + super().__init__(**data) + + try: + config.load_incluster_config() + logger.info("Using in-cluster Kubernetes configuration.") + except config.ConfigException: + logger.info("In-cluster config not found. Falling back to local kubeconfig.") + config.load_kube_config() + + self._batch_v1 = client.BatchV1Api() + self._core_v1 = client.CoreV1Api() + + def execute_code( + self, + invocation_context: InvocationContext, + code_execution_input: CodeExecutionInput, + ) -> CodeExecutionResult: + """ + Orchestrates the secure execution of a code snippet on GKE. + """ + job_name = f"adk-exec-{uuid.uuid4().hex[:10]}" + configmap_name = f"code-src-{job_name}" + + try: + # 1. Create a ConfigMap to hold the code securely. + self._create_code_configmap(configmap_name, code_execution_input.code) + # 2. Create the Job manifest with all security features. + job_manifest = self._create_job_manifest(job_name, configmap_name) + # 3. Create and run the Job on the cluster. + self._batch_v1.create_namespaced_job( + body=job_manifest, namespace=self.namespace + ) + logger.info(f"Submitted Job '{job_name}' to namespace '{self.namespace}'.") + # 4. Efficiently watch for the Job's completion. + return self._watch_job_completion(job_name) + + except Exception as e: + logger.error( + f"An unexpected error occurred during execution of job '{job_name}': {e}", + exc_info=True, + ) + return CodeExecutionResult(stderr=f"Executor failed: {e}") + finally: + # 5. Always clean up the ConfigMap. The Job is cleaned up by Kubernetes. + self._cleanup_configmap(configmap_name) + + def _create_job_manifest(self, job_name: str, configmap_name: str) -> client.V1Job: + """Creates the complete V1Job object with security best practices.""" + # Define the container that will run the code. + container = client.V1Container( + name="code-runner", + image=self.image, + command=["python3", "/app/code.py"], + volume_mounts=[ + client.V1VolumeMount(name="code-volume", mount_path="/app") + ], + # BEST PRACTICE: Enforce a strict security context. + security_context=client.V1SecurityContext( + run_as_non_root=True, + run_as_user=1001, + allow_privilege_escalation=False, + read_only_root_filesystem=True, + capabilities=client.V1Capabilities(drop=["ALL"]), + ), + # BEST PRACTICE: Set resource limits to prevent abuse. + resources=client.V1ResourceRequirements( + requests={"cpu": "100m", "memory": "128Mi"}, + limits={"cpu": self.cpu_limit, "memory": self.mem_limit}, + ), + ) + + # Pod Spec Customization for A/B Testing + pod_spec_args = { + "restart_policy": "Never", + "containers": [container], + "volumes": [ + client.V1Volume( + name="code-volume", + config_map=client.V1ConfigMapVolumeSource(name=configmap_name), + ) + ], + } + + if self.use_gvisor_sandbox: + pod_spec_args["runtime_class_name"] = "gvisor" + pod_spec_args["node_selector"] = { + "cloud.google.com/gke-nodepool": "gvisor-nodepool" + } + pod_spec_args["tolerations"] = [ + client.V1Toleration( + key="sandbox.gke.io/runtime", + operator="Equal", + value="gvisor", + effect="NoSchedule", + ) + ] + else: + pod_spec_args["node_selector"] = { + "cloud.google.com/gke-nodepool": "standard-nodepool" + } + + # Define the pod spec, mounting the code and targeting gVisor. + pod_spec = client.V1PodSpec(**pod_spec_args) + + # Define the Job specification. + job_spec = client.V1JobSpec( + template=client.V1PodTemplateSpec(spec=pod_spec), + backoff_limit=0, # Do not retry the Job on failure. + # BEST PRACTICE: Let the Kubernetes TTL controller handle cleanup. + # This is more robust than client-side cleanup. + ttl_seconds_after_finished=600, # Garbage collect after 10 minutes. + ) + + # Assemble and return the final Job object. + return client.V1Job( + api_version="batch/v1", + kind="Job", + metadata=client.V1ObjectMeta(name=job_name), + spec=job_spec, + ) + + def _watch_job_completion(self, job_name: str) -> CodeExecutionResult: + """Uses the watch API to efficiently wait for job completion.""" + watch = Watch() + try: + for event in watch.stream( + self._batch_v1.list_namespaced_job, + namespace=self.namespace, + field_selector=f"metadata.name={job_name}", + timeout_seconds=self.timeout_seconds, + ): + job = event["object"] + if job.status.succeeded: + watch.stop() + logger.info(f"Job '{job_name}' succeeded.") + logs = self._get_pod_logs(job_name) + return CodeExecutionResult(stdout=logs) + if job.status.failed: + watch.stop() + logger.error(f"Job '{job_name}' failed.") + logs = self._get_pod_logs(job_name) + return CodeExecutionResult(stderr=f"Job failed. Logs:\n{logs}") + + # If the loop finishes without returning, the watch timed out. + raise TimeoutError( + f"Job '{job_name}' did not complete within {self.timeout_seconds}s." + ) + finally: + watch.stop() + + def _get_pod_logs(self, job_name: str) -> str: + """Retrieves logs from the pod created by the specified job.""" + try: + pods = self._core_v1.list_namespaced_pod( + namespace=self.namespace, label_selector=f"job-name={job_name}", limit=1 + ) + if not pods.items: + return "Error: Could not find pod for job." + pod_name = pods.items[0].metadata.name + + return self._core_v1.read_namespaced_pod_log( + name=pod_name, namespace=self.namespace + ) + except ApiException as e: + logger.error(f"Could not retrieve logs for job '{job_name}': {e}") + return f"Error retrieving logs: {e.reason}" + + def _create_code_configmap(self, name: str, code: str) -> None: + """Creates a ConfigMap to hold the Python code.""" + body = client.V1ConfigMap( + metadata=client.V1ObjectMeta(name=name), data={"code.py": code} + ) + self._core_v1.create_namespaced_config_map( + namespace=self.namespace, body=body + ) + + def _cleanup_configmap(self, name: str) -> None: + """Deletes a ConfigMap.""" + try: + self._core_v1.delete_namespaced_config_map(name=name, namespace=self.namespace) + logger.info(f"Cleaned up ConfigMap '{name}'.") + except ApiException as e: + if e.status != 404: + logger.warning(f"Could not delete ConfigMap '{name}': {e.reason}") From f28f4b72518929b701d4e9474da05007770b086d Mon Sep 17 00:00:00 2001 From: Summer Date: Tue, 24 Jun 2025 19:20:42 +0000 Subject: [PATCH 02/30] [06/24] Add gke_code_executor.py --- .../adk/code_executors/gke_code_executor.py | 119 +++++++++--------- 1 file changed, 58 insertions(+), 61 deletions(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index dba2dcb4df..f0abeb982d 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -1,6 +1,6 @@ import logging import uuid -from typing import Any +from typing import Optional from google.adk.agents.invocation_context import InvocationContext from google.adk.code_executors.base_code_executor import BaseCodeExecutor @@ -13,37 +13,42 @@ logger = logging.getLogger(__name__) class GkeCodeExecutor(BaseCodeExecutor): - """ - A secure, robust, and efficient code executor that runs Python code in a - sandboxed gVisor Pod on GKE. + """Executes Python code in a secure gVisor-sandboxed Pod on GKE. + + This executor securely runs code by dynamically creating a Kubernetes Job for + each execution request. The user's code is mounted via a ConfigMap, and the + Pod is hardened with a strict security context and resource limits. - Features includes: - - Secure code execution via ConfigMaps and a strict security context. - - Kubernetes-native job and pod garbage collection via TTL. + Key Features: + - Sandboxed execution using the gVisor runtime. + - Ephemeral, per-execution environments using Kubernetes Jobs. + - Secure-by-default Pod configuration (non-root, no privileges). + - Automatic garbage collection of completed Jobs and Pods via TTL. - Efficient, event-driven waiting using the Kubernetes watch API. - - Explicit resource limits to prevent abuse. """ namespace: str = "default" image: str = "python:3.11-slim" - timeout_seconds: int = 3000 + timeout_seconds: int = 300 cpu_limit: str = "500m" mem_limit: str = "512Mi" - use_gvisor_sandbox: bool = True - _batch_v1: Any = None - _core_v1: Any = None + _batch_v1: client.BatchV1Api + _core_v1: client.CoreV1Api def __init__(self, **data): - """ - Initializes the Pydantic model and the Kubernetes clients. + """Initializes the executor and the Kubernetes API clients. + + This constructor supports overriding default class attributes (like + 'namespace', 'image', etc.) by passing them as keyword arguments. It + automatically configures the Kubernetes client to work either within a + cluster (in-cluster config) or locally using a kubeconfig file. """ super().__init__(**data) - try: config.load_incluster_config() logger.info("Using in-cluster Kubernetes configuration.") except config.ConfigException: - logger.info("In-cluster config not found. Falling back to local kubeconfig.") + logger.info("In-cluster config not found. Falling back to kubeconfig.") config.load_kube_config() self._batch_v1 = client.BatchV1Api() @@ -54,33 +59,40 @@ def execute_code( invocation_context: InvocationContext, code_execution_input: CodeExecutionInput, ) -> CodeExecutionResult: - """ - Orchestrates the secure execution of a code snippet on GKE. - """ + """Orchestrates the secure execution of a code snippet on GKE.""" job_name = f"adk-exec-{uuid.uuid4().hex[:10]}" configmap_name = f"code-src-{job_name}" try: - # 1. Create a ConfigMap to hold the code securely. self._create_code_configmap(configmap_name, code_execution_input.code) - # 2. Create the Job manifest with all security features. job_manifest = self._create_job_manifest(job_name, configmap_name) - # 3. Create and run the Job on the cluster. + self._batch_v1.create_namespaced_job( body=job_manifest, namespace=self.namespace ) logger.info(f"Submitted Job '{job_name}' to namespace '{self.namespace}'.") - # 4. Efficiently watch for the Job's completion. - return self._watch_job_completion(job_name) + return self._watch_for_job_completion(job_name) + except ApiException as e: + logger.error( + "A Kubernetes API error occurred during job" + f" '{job_name}': {e.reason}", + exc_info=True, + ) + return CodeExecutionResult(stderr=f"Kubernetes API error: {e.reason}") + except TimeoutError as e: + logger.error(e, exc_info=True) + logs = self._get_pod_logs(job_name) + stderr = f"Executor timed out: {e}\n\nPod Logs:\n{logs}" + return CodeExecutionResult(stderr=stderr) except Exception as e: logger.error( - f"An unexpected error occurred during execution of job '{job_name}': {e}", + f"An unexpected error occurred during job '{job_name}': {e}", exc_info=True, ) - return CodeExecutionResult(stderr=f"Executor failed: {e}") + return CodeExecutionResult(stderr=f"An unexpected executor error occurred: {e}") finally: - # 5. Always clean up the ConfigMap. The Job is cleaned up by Kubernetes. + # The Job is cleaned up by the TTL controller, and we ensure the ConfigMap is always deleted. self._cleanup_configmap(configmap_name) def _create_job_manifest(self, job_name: str, configmap_name: str) -> client.V1Job: @@ -93,7 +105,7 @@ def _create_job_manifest(self, job_name: str, configmap_name: str) -> client.V1J volume_mounts=[ client.V1VolumeMount(name="code-volume", mount_path="/app") ], - # BEST PRACTICE: Enforce a strict security context. + # Enforce a strict security context. security_context=client.V1SecurityContext( run_as_non_root=True, run_as_user=1001, @@ -101,52 +113,38 @@ def _create_job_manifest(self, job_name: str, configmap_name: str) -> client.V1J read_only_root_filesystem=True, capabilities=client.V1Capabilities(drop=["ALL"]), ), - # BEST PRACTICE: Set resource limits to prevent abuse. + # Set resource limits to prevent abuse. resources=client.V1ResourceRequirements( requests={"cpu": "100m", "memory": "128Mi"}, limits={"cpu": self.cpu_limit, "memory": self.mem_limit}, ), ) - # Pod Spec Customization for A/B Testing - pod_spec_args = { - "restart_policy": "Never", - "containers": [container], - "volumes": [ + # Use tolerations to request a gVisor node. + pod_spec = client.V1PodSpec( + restart_policy="Never", + containers=[container], + volumes=[ client.V1Volume( name="code-volume", config_map=client.V1ConfigMapVolumeSource(name=configmap_name), ) ], - } - - if self.use_gvisor_sandbox: - pod_spec_args["runtime_class_name"] = "gvisor" - pod_spec_args["node_selector"] = { - "cloud.google.com/gke-nodepool": "gvisor-nodepool" - } - pod_spec_args["tolerations"] = [ + runtime_class_name="gvisor", # Request the gVisor runtime. + tolerations=[ client.V1Toleration( key="sandbox.gke.io/runtime", operator="Equal", value="gvisor", effect="NoSchedule", ) - ] - else: - pod_spec_args["node_selector"] = { - "cloud.google.com/gke-nodepool": "standard-nodepool" - } - - # Define the pod spec, mounting the code and targeting gVisor. - pod_spec = client.V1PodSpec(**pod_spec_args) + ], + ) - # Define the Job specification. job_spec = client.V1JobSpec( template=client.V1PodTemplateSpec(spec=pod_spec), backoff_limit=0, # Do not retry the Job on failure. - # BEST PRACTICE: Let the Kubernetes TTL controller handle cleanup. - # This is more robust than client-side cleanup. + # Kubernetes TTL controller will handle Job/Pod cleanup. ttl_seconds_after_finished=600, # Garbage collect after 10 minutes. ) @@ -162,10 +160,11 @@ def _watch_job_completion(self, job_name: str) -> CodeExecutionResult: """Uses the watch API to efficiently wait for job completion.""" watch = Watch() try: + field_selector = f"metadata.name={job_name}" for event in watch.stream( self._batch_v1.list_namespaced_job, namespace=self.namespace, - field_selector=f"metadata.name={job_name}", + field_selector=field_selector, timeout_seconds=self.timeout_seconds, ): job = event["object"] @@ -179,13 +178,13 @@ def _watch_job_completion(self, job_name: str) -> CodeExecutionResult: logger.error(f"Job '{job_name}' failed.") logs = self._get_pod_logs(job_name) return CodeExecutionResult(stderr=f"Job failed. Logs:\n{logs}") + finally: + watch.stop() # If the loop finishes without returning, the watch timed out. raise TimeoutError( f"Job '{job_name}' did not complete within {self.timeout_seconds}s." ) - finally: - watch.stop() def _get_pod_logs(self, job_name: str) -> str: """Retrieves logs from the pod created by the specified job.""" @@ -194,14 +193,14 @@ def _get_pod_logs(self, job_name: str) -> str: namespace=self.namespace, label_selector=f"job-name={job_name}", limit=1 ) if not pods.items: + logger.warning(f"Could not find Pod for Job '{job_name}' to retrieve logs.") return "Error: Could not find pod for job." pod_name = pods.items[0].metadata.name - return self._core_v1.read_namespaced_pod_log( name=pod_name, namespace=self.namespace ) except ApiException as e: - logger.error(f"Could not retrieve logs for job '{job_name}': {e}") + logger.error(f"API error retrieving logs for job '{job_name}': {e.reason}") return f"Error retrieving logs: {e.reason}" def _create_code_configmap(self, name: str, code: str) -> None: @@ -209,9 +208,7 @@ def _create_code_configmap(self, name: str, code: str) -> None: body = client.V1ConfigMap( metadata=client.V1ObjectMeta(name=name), data={"code.py": code} ) - self._core_v1.create_namespaced_config_map( - namespace=self.namespace, body=body - ) + self._core_v1.create_namespaced_config_map(namespace=self.namespace, body=body) def _cleanup_configmap(self, name: str) -> None: """Deletes a ConfigMap.""" From 760fb7f39f6fe6af4c1e6d48d578ca2449a8c45b Mon Sep 17 00:00:00 2001 From: Summer Date: Tue, 24 Jun 2025 19:22:04 +0000 Subject: [PATCH 03/30] [06/24] Add gke_code_executor.py --- src/google/adk/code_executors/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/google/adk/code_executors/__init__.py b/src/google/adk/code_executors/__init__.py index 41d406f63d..7923738a4d 100644 --- a/src/google/adk/code_executors/__init__.py +++ b/src/google/adk/code_executors/__init__.py @@ -56,7 +56,6 @@ def __getattr__(name: str): elif name == 'GkeCodeExecutor': try: from .gke_code_executor import GkeCodeExecutor - return GkeCodeExecutor except ImportError as e: raise ImportError( From 6b0e77218afb702c1c230c7e00021517b2e593fd Mon Sep 17 00:00:00 2001 From: Summer Date: Tue, 24 Jun 2025 19:47:34 +0000 Subject: [PATCH 04/30] [06/24] Add gke_code_executor.py --- src/google/adk/code_executors/gke_code_executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index f0abeb982d..ecd847aa77 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -1,6 +1,5 @@ import logging import uuid -from typing import Optional from google.adk.agents.invocation_context import InvocationContext from google.adk.code_executors.base_code_executor import BaseCodeExecutor From 33446f81b391cf5b35910d387bf46a177c5a9170 Mon Sep 17 00:00:00 2001 From: Summer Date: Thu, 10 Jul 2025 20:30:34 +0000 Subject: [PATCH 05/30] [07/10] Add deployemnt_rbac.yaml into samples folder --- .../gke_agent_sandbox/deployment_rbac.yaml | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 contributing/samples/gke_agent_sandbox/deployment_rbac.yaml diff --git a/contributing/samples/gke_agent_sandbox/deployment_rbac.yaml b/contributing/samples/gke_agent_sandbox/deployment_rbac.yaml new file mode 100644 index 0000000000..db54c08bec --- /dev/null +++ b/contributing/samples/gke_agent_sandbox/deployment_rbac.yaml @@ -0,0 +1,50 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: agent-sandbox +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: adk-agent-sa + namespace: agent-sandbox +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: adk-agent-role + namespace: agent-sandbox +rules: +- apiGroups: ["batch"] + resources: ["jobs"] + # create: Needed for _batch_v1.create_namespaced_job(). + # watch: Needed for watch.stream(self._batch_v1.list_namespaced_job, ...) to efficiently wait for completion + # list/get: Required for the watch to initialize and to get job details. + verbs: ["create", "get", "watch", "list", "delete"] +- apiGroups: [""] + resources: ["configmaps"] + # create: Needed mount the agent's code into the Job's Pod. + # delete: Needed for cleanup in the finally block + verbs: ["create", "get", "list", "delete"] +- apiGroups: [""] + resources: ["pods"] + # list: Needed to find the correct Pod _core_v1.list_namespaced_pod(label_selector=...) + verbs: ["get", "list", "delete"] +- apiGroups: [""] + # get: Needed for _core_v1.read_namespaced_pod_log() to get the code execution results and logs. + resources: ["pods/log"] + verbs: ["get", "list"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: adk-agent-binding + namespace: agent-sandbox +subjects: +- kind: ServiceAccount + name: adk-agent-sa + namespace: agent-sandbox +roleRef: + kind: Role + name: adk-agent-role + apiGroup: rbac.authorization.k8s.io From d09c319a01c78e9a00f2948511df342119b4a0b5 Mon Sep 17 00:00:00 2001 From: Summer Date: Thu, 10 Jul 2025 20:33:52 +0000 Subject: [PATCH 06/30] [07/10] Add deployemnt_rbac.yaml into samples folder --- src/google/adk/code_executors/gke_code_executor.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index ecd847aa77..a9e74201bf 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -24,6 +24,13 @@ class GkeCodeExecutor(BaseCodeExecutor): - Secure-by-default Pod configuration (non-root, no privileges). - Automatic garbage collection of completed Jobs and Pods via TTL. - Efficient, event-driven waiting using the Kubernetes watch API. + + RBAC Permissions: + This executor interacts with the Kubernetes API and requires a ServiceAccount + with specific RBAC permissions to function. The agent's pod needs permissions + to create/watch Jobs, create/delete ConfigMaps, and list Pods to read logs. + For a complete, working example of the required Role and RoleBinding, see the + file at: contributing/samples/gke_agent_sandbox/deployment_rbac.yaml """ namespace: str = "default" image: str = "python:3.11-slim" From 1f06fe1b06a4b6ffa3a5b3b83094267a456714d6 Mon Sep 17 00:00:00 2001 From: Summer Date: Thu, 10 Jul 2025 20:34:23 +0000 Subject: [PATCH 07/30] [07/10] Add deployemnt_rbac.yaml into samples folder --- contributing/samples/gke_agent_sandbox/deployment_rbac.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contributing/samples/gke_agent_sandbox/deployment_rbac.yaml b/contributing/samples/gke_agent_sandbox/deployment_rbac.yaml index db54c08bec..16572276d1 100644 --- a/contributing/samples/gke_agent_sandbox/deployment_rbac.yaml +++ b/contributing/samples/gke_agent_sandbox/deployment_rbac.yaml @@ -18,7 +18,7 @@ rules: - apiGroups: ["batch"] resources: ["jobs"] # create: Needed for _batch_v1.create_namespaced_job(). - # watch: Needed for watch.stream(self._batch_v1.list_namespaced_job, ...) to efficiently wait for completion + # watch: Needed for watch.stream(self._batch_v1.list_namespaced_job, ...) to wait for completion # list/get: Required for the watch to initialize and to get job details. verbs: ["create", "get", "watch", "list", "delete"] - apiGroups: [""] From 254bdcbd85b97659c9eb96280a61287541c7c8b3 Mon Sep 17 00:00:00 2001 From: Summer Date: Thu, 10 Jul 2025 20:42:50 +0000 Subject: [PATCH 08/30] [07/10] Make cpu/memory requests configurable --- src/google/adk/code_executors/gke_code_executor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index a9e74201bf..546bb80239 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -35,6 +35,8 @@ class GkeCodeExecutor(BaseCodeExecutor): namespace: str = "default" image: str = "python:3.11-slim" timeout_seconds: int = 300 + cpu_request: str = "200m" + mem_request: str = "256Mi" cpu_limit: str = "500m" mem_limit: str = "512Mi" @@ -121,7 +123,7 @@ def _create_job_manifest(self, job_name: str, configmap_name: str) -> client.V1J ), # Set resource limits to prevent abuse. resources=client.V1ResourceRequirements( - requests={"cpu": "100m", "memory": "128Mi"}, + requests={"cpu": self.cpu_request, "memory": self.mem_request}, limits={"cpu": self.cpu_limit, "memory": self.mem_limit}, ), ) From 285b53aaf3500366248d4e7085a8b3d68ffadf6b Mon Sep 17 00:00:00 2001 From: Summer Date: Fri, 11 Jul 2025 03:48:17 +0000 Subject: [PATCH 09/30] [07/10] Add annotation --- src/google/adk/code_executors/gke_code_executor.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index 546bb80239..1e14e3bffc 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -73,7 +73,7 @@ def execute_code( try: self._create_code_configmap(configmap_name, code_execution_input.code) - job_manifest = self._create_job_manifest(job_name, configmap_name) + job_manifest = self._create_job_manifest(job_name, configmap_name, invocation_context) self._batch_v1.create_namespaced_job( body=job_manifest, namespace=self.namespace @@ -103,7 +103,7 @@ def execute_code( # The Job is cleaned up by the TTL controller, and we ensure the ConfigMap is always deleted. self._cleanup_configmap(configmap_name) - def _create_job_manifest(self, job_name: str, configmap_name: str) -> client.V1Job: + def _create_job_manifest(self, job_name: str, configmap_name: str, invocation_context: InvocationContext) -> client.V1Job: """Creates the complete V1Job object with security best practices.""" # Define the container that will run the code. container = client.V1Container( @@ -157,10 +157,13 @@ def _create_job_manifest(self, job_name: str, configmap_name: str) -> client.V1J ) # Assemble and return the final Job object. + annotations = { + "adk.agent.google.com/invocation-id": invocation_context.invocation_id + } return client.V1Job( api_version="batch/v1", kind="Job", - metadata=client.V1ObjectMeta(name=job_name), + metadata=client.V1ObjectMeta(name=job_name, annotations=annotations), spec=job_spec, ) From 6a93e12b3e6df22ddca33ec9bcab73981afd601e Mon Sep 17 00:00:00 2001 From: syangx39 Date: Fri, 11 Jul 2025 04:40:19 +0000 Subject: [PATCH 10/30] [07/10] Raise error in _get_pod_logs() --- .../adk/code_executors/gke_code_executor.py | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index 1e14e3bffc..7674e5c753 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -56,7 +56,7 @@ def __init__(self, **data): config.load_incluster_config() logger.info("Using in-cluster Kubernetes configuration.") except config.ConfigException: - logger.info("In-cluster config not found. Falling back to kubeconfig.") + logger.info("In-cluster config not found. Falling back to local kubeconfig.") config.load_kube_config() self._batch_v1 = client.BatchV1Api() @@ -79,7 +79,7 @@ def execute_code( body=job_manifest, namespace=self.namespace ) logger.info(f"Submitted Job '{job_name}' to namespace '{self.namespace}'.") - return self._watch_for_job_completion(job_name) + return self._watch_job_completion(job_name) except ApiException as e: logger.error( @@ -171,11 +171,10 @@ def _watch_job_completion(self, job_name: str) -> CodeExecutionResult: """Uses the watch API to efficiently wait for job completion.""" watch = Watch() try: - field_selector = f"metadata.name={job_name}" for event in watch.stream( self._batch_v1.list_namespaced_job, namespace=self.namespace, - field_selector=field_selector, + field_selector=f"metadata.name={job_name}", timeout_seconds=self.timeout_seconds, ): job = event["object"] @@ -189,30 +188,33 @@ def _watch_job_completion(self, job_name: str) -> CodeExecutionResult: logger.error(f"Job '{job_name}' failed.") logs = self._get_pod_logs(job_name) return CodeExecutionResult(stderr=f"Job failed. Logs:\n{logs}") - finally: - watch.stop() # If the loop finishes without returning, the watch timed out. raise TimeoutError( f"Job '{job_name}' did not complete within {self.timeout_seconds}s." ) + finally: + watch.stop() def _get_pod_logs(self, job_name: str) -> str: - """Retrieves logs from the pod created by the specified job.""" + """Retrieves logs from the pod created by the specified job. + + Raises: + RuntimeError: If the pod cannot be found or logs cannot be fetched. + """ try: pods = self._core_v1.list_namespaced_pod( namespace=self.namespace, label_selector=f"job-name={job_name}", limit=1 ) if not pods.items: - logger.warning(f"Could not find Pod for Job '{job_name}' to retrieve logs.") - return "Error: Could not find pod for job." + raise RuntimeError(f"Could not find Pod for Job '{job_name}' to retrieve logs.") + pod_name = pods.items[0].metadata.name return self._core_v1.read_namespaced_pod_log( name=pod_name, namespace=self.namespace ) except ApiException as e: - logger.error(f"API error retrieving logs for job '{job_name}': {e.reason}") - return f"Error retrieving logs: {e.reason}" + raise RuntimeError(f"API error retrieving logs for job '{job_name}': {e.reason}") from e def _create_code_configmap(self, name: str, code: str) -> None: """Creates a ConfigMap to hold the Python code.""" From 2b16d7b04f482ff5ffabfa5cd22ae2fc5632d008 Mon Sep 17 00:00:00 2001 From: syangx39 Date: Fri, 11 Jul 2025 05:31:55 +0000 Subject: [PATCH 11/30] [07/10] Add owner_reference --- .../adk/code_executors/gke_code_executor.py | 38 +++++++++++++------ 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index 7674e5c753..0fe110e03f 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -74,10 +74,11 @@ def execute_code( try: self._create_code_configmap(configmap_name, code_execution_input.code) job_manifest = self._create_job_manifest(job_name, configmap_name, invocation_context) - - self._batch_v1.create_namespaced_job( + created_job = self._batch_v1.create_namespaced_job( body=job_manifest, namespace=self.namespace ) + self._add_owner_reference(created_job, configmap_name) + logger.info(f"Submitted Job '{job_name}' to namespace '{self.namespace}'.") return self._watch_job_completion(job_name) @@ -99,9 +100,6 @@ def execute_code( exc_info=True, ) return CodeExecutionResult(stderr=f"An unexpected executor error occurred: {e}") - finally: - # The Job is cleaned up by the TTL controller, and we ensure the ConfigMap is always deleted. - self._cleanup_configmap(configmap_name) def _create_job_manifest(self, job_name: str, configmap_name: str, invocation_context: InvocationContext) -> client.V1Job: """Creates the complete V1Job object with security best practices.""" @@ -223,11 +221,29 @@ def _create_code_configmap(self, name: str, code: str) -> None: ) self._core_v1.create_namespaced_config_map(namespace=self.namespace, body=body) - def _cleanup_configmap(self, name: str) -> None: - """Deletes a ConfigMap.""" + def _add_owner_reference(self, owner_job: client.V1Job, configmap_name: str) -> None: + """Patches the ConfigMap to be owned by the Job for auto-cleanup.""" + owner_reference = client.V1OwnerReference( + api_version=owner_job.api_version, + kind=owner_job.kind, + name=owner_job.metadata.name, + uid=owner_job.metadata.uid, + controller=True, + ) + patch_body = { + "metadata": {"ownerReferences": [owner_reference.to_dict()]} + } + try: - self._core_v1.delete_namespaced_config_map(name=name, namespace=self.namespace) - logger.info(f"Cleaned up ConfigMap '{name}'.") + self._core_v1.patch_namespaced_config_map( + name=configmap_name, + namespace=self.namespace, + body=patch_body, + ) + logger.info(f"Set Job '{owner_job.metadata.name}' as owner of ConfigMap '{configmap_name}'.") except ApiException as e: - if e.status != 404: - logger.warning(f"Could not delete ConfigMap '{name}': {e.reason}") + logger.warning( + f"Failed to set ownerReference on ConfigMap '{configmap_name}'. " + f"Manual cleanup is required. Reason: {e.reason}" + ) + From 9982d39213a202163730e3804c0c668a43c33fa4 Mon Sep 17 00:00:00 2001 From: syangx39 Date: Fri, 11 Jul 2025 05:55:24 +0000 Subject: [PATCH 12/30] [07/10] Add owner_reference --- src/google/adk/code_executors/gke_code_executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index 0fe110e03f..769eb5fa48 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -246,4 +246,3 @@ def _add_owner_reference(self, owner_job: client.V1Job, configmap_name: str) -> f"Failed to set ownerReference on ConfigMap '{configmap_name}'. " f"Manual cleanup is required. Reason: {e.reason}" ) - From 6685f63603d8058a9ee0cc25d119ba4f6d1ca4af Mon Sep 17 00:00:00 2001 From: syangx39 Date: Fri, 25 Jul 2025 08:21:04 +0000 Subject: [PATCH 13/30] [07/25] add the sample --- .../code_execution/gke_sandbox.agent.py | 49 ++++ .../adk/code_executors/gke_code_executor.py | 5 + .../code_executors/test_gke_code_executor.py | 209 ++++++++++++++++++ 3 files changed, 263 insertions(+) create mode 100644 contributing/samples/code_execution/gke_sandbox.agent.py create mode 100644 tests/unittests/code_executors/test_gke_code_executor.py diff --git a/contributing/samples/code_execution/gke_sandbox.agent.py b/contributing/samples/code_execution/gke_sandbox.agent.py new file mode 100644 index 0000000000..5ee64513bf --- /dev/null +++ b/contributing/samples/code_execution/gke_sandbox.agent.py @@ -0,0 +1,49 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A Python coding agent using the GkeCodeExecutor for secure execution.""" + +from google.adk.agents import LlmAgent +from google.adk.code_executors import GkeCodeExecutor + + +def gke_agent_system_instruction(): + """Returns: The system instruction for the GKE-based coding agent.""" + return """You are a helpful and capable AI agent that can write and execute Python code to answer questions and perform tasks. + +When a user asks a question, follow these steps: +1. Analyze the request. +2. Write a complete, self-contained Python script to accomplish the task. +3. Your code will be executed in a secure, sandboxed environment. +4. Return the full and complete output from the code execution, including any text, results, or error messages.""" + + +gke_executor = GkeCodeExecutor( + # This must match the namespace in your deployment_rbac.yaml where the + # agent's ServiceAccount and Role have permissions. + namespace="agent-sandbox", + # Setting an explicit timeout prevents a stuck job from running forever. + timeout_seconds=600, +) + +root_agent = LlmAgent( + name="gke_coding_agent", + model="gemini-2.0-flash", + description=( + "A general-purpose agent that executes Python code in a secure GKE" + " Sandbox." + ), + instruction=gke_agent_system_instruction(), + code_executor=gke_executor, +) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index 769eb5fa48..4901d2140b 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -37,6 +37,7 @@ class GkeCodeExecutor(BaseCodeExecutor): timeout_seconds: int = 300 cpu_request: str = "200m" mem_request: str = "256Mi" + # The maximum CPU the container can use, in "millicores". 1000m is 1 full CPU core. cpu_limit: str = "500m" mem_limit: str = "512Mi" @@ -72,6 +73,10 @@ def execute_code( configmap_name = f"code-src-{job_name}" try: + # The execution process: + # 1. Create a ConfigMap to mount LLM-generated code into the Pod. + # 2. Create a Job that runs the code from the ConfigMap. + # 3. Set the Job as the ConfigMap's owner for automatic cleanup. self._create_code_configmap(configmap_name, code_execution_input.code) job_manifest = self._create_job_manifest(job_name, configmap_name, invocation_context) created_job = self._batch_v1.create_namespaced_job( diff --git a/tests/unittests/code_executors/test_gke_code_executor.py b/tests/unittests/code_executors/test_gke_code_executor.py new file mode 100644 index 0000000000..549ec1398d --- /dev/null +++ b/tests/unittests/code_executors/test_gke_code_executor.py @@ -0,0 +1,209 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import MagicMock, patch + +from google.adk.agents.invocation_context import InvocationContext +from google.adk.code_executors.code_execution_utils import CodeExecutionInput +from google.adk.code_executors.gke_code_executor import GkeCodeExecutor +from kubernetes.client.rest import ApiException +from kubernetes import client, config +import pytest + + +@pytest.fixture +def mock_invocation_context() -> InvocationContext: + """Fixture for a mock InvocationContext.""" + mock = MagicMock(spec=InvocationContext) + mock.invocation_id = "test-invocation-123" + return mock + + +@pytest.fixture(autouse=True) +def mock_k8s_config(): + """Fixture for auto-mocking Kubernetes config loading.""" + with patch("google.adk.code_executors.gke_code_executor.config") as mock_config: + # Simulate fallback from in-cluster to kubeconfig + mock_config.ConfigException = config.ConfigException + mock_config.load_incluster_config.side_effect = config.ConfigException + yield mock_config + + +@pytest.fixture +def mock_k8s_clients(): + """Fixture for mock Kubernetes API clients.""" + with patch( + "google.adk.code_executors.gke_code_executor.client" + ) as mock_client_class: + mock_batch_v1 = MagicMock(spec=client.BatchV1Api) + mock_core_v1 = MagicMock(spec=client.CoreV1Api) + mock_client_class.BatchV1Api.return_value = mock_batch_v1 + mock_client_class.CoreV1Api.return_value = mock_core_v1 + yield { + "batch_v1": mock_batch_v1, + "core_v1": mock_core_v1, + } + + +class TestGkeCodeExecutor: + """Unit tests for the GkeCodeExecutor.""" + + def test_init_defaults(self): + """Tests that the executor initializes with correct default values.""" + executor = GkeCodeExecutor() + assert executor.namespace == "default" + assert executor.image == "python:3.11-slim" + assert executor.timeout_seconds == 300 + assert executor.cpu_request == "200m" + assert executor.mem_limit == "512Mi" + + def test_init_with_overrides(self): + """Tests that class attributes can be overridden at instantiation.""" + executor = GkeCodeExecutor( + namespace="test-ns", + image="custom-python:latest", + timeout_seconds=60, + cpu_limit="1000m", + ) + assert executor.namespace == "test-ns" + assert executor.image == "custom-python:latest" + assert executor.timeout_seconds == 60 + assert executor.cpu_limit == "1000m" + + @patch("google.adk.code_executors.gke_code_executor.Watch") + def test_execute_code_success( + self, + mock_watch, + mock_k8s_clients, + mock_invocation_context, + ): + """Tests the happy path for successful code execution.""" + # Setup Mocks + mock_job = MagicMock() + mock_job.status.succeeded = True + mock_job.status.failed = None + mock_watch.return_value.stream.return_value = [{"object": mock_job}] + + mock_pod_list = MagicMock() + mock_pod_list.items = [MagicMock()] + mock_pod_list.items[0].metadata.name = "test-pod-name" + mock_k8s_clients["core_v1"].list_namespaced_pod.return_value = mock_pod_list + mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = "hello world" + + # Execute + executor = GkeCodeExecutor() + code_input = CodeExecutionInput(code='print("hello world")') + result = executor.execute_code(mock_invocation_context, code_input) + + # Assert + assert result.stdout == "hello world" + assert result.stderr == "" + mock_k8s_clients["core_v1"].create_namespaced_config_map.assert_called_once() + mock_k8s_clients["batch_v1"].create_namespaced_job.assert_called_once() + mock_k8s_clients["core_v1"].patch_namespaced_config_map.assert_called_once() + mock_k8s_clients["core_v1"].read_namespaced_pod_log.assert_called_once() + + @patch("google.adk.code_executors.gke_code_executor.Watch") + def test_execute_code_job_failed( + self, + mock_watch, + mock_k8s_clients, + mock_invocation_context, + ): + """Tests the path where the Kubernetes Job fails.""" + mock_job = MagicMock() + mock_job.status.succeeded = None + mock_job.status.failed = True + mock_watch.return_value.stream.return_value = [{"object": mock_job}] + mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = ( + "Traceback...\nValueError: failure" + ) + + executor = GkeCodeExecutor() + result = executor.execute_code(mock_invocation_context, CodeExecutionInput(code="fail")) + + assert result.stdout == "" + assert "Job failed. Logs:" in result.stderr + assert "ValueError: failure" in result.stderr + + def test_execute_code_api_exception( + self, mock_k8s_clients, mock_invocation_context + ): + """Tests handling of an ApiException from the K8s client.""" + mock_k8s_clients["core_v1"].create_namespaced_config_map.side_effect = ( + ApiException(reason="Test API Error") + ) + executor = GkeCodeExecutor() + result = executor.execute_code(mock_invocation_context, CodeExecutionInput(code="...")) + + assert result.stdout == "" + assert "Kubernetes API error: Test API Error" in result.stderr + + @patch("google.adk.code_executors.gke_code_executor.Watch") + def test_execute_code_timeout( + self, + mock_watch, + mock_k8s_clients, + mock_invocation_context, + ): + """Tests the case where the job watch times out.""" + mock_watch.return_value.stream.return_value = [] # Empty stream simulates timeout + mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = ( + "Still running..." + ) + + executor = GkeCodeExecutor(timeout_seconds=1) + result = executor.execute_code(mock_invocation_context, CodeExecutionInput(code="...")) + + assert result.stdout == "" + assert "Executor timed out" in result.stderr + assert "did not complete within 1s" in result.stderr + assert "Pod Logs:\nStill running..." in result.stderr + + def test_create_job_manifest_structure(self, mock_invocation_context): + """Tests the correctness of the generated Job manifest.""" + executor = GkeCodeExecutor(namespace="test-ns", image="test-img:v1") + job = executor._create_job_manifest("test-job", "test-cm", mock_invocation_context) + + # Check top-level properties + assert isinstance(job, client.V1Job) + assert job.api_version == "batch/v1" + assert job.kind == "Job" + assert job.metadata.name == "test-job" + assert job.spec.backoff_limit == 0 + assert job.spec.ttl_seconds_after_finished == 600 + + # Check pod template properties + pod_spec = job.spec.template.spec + assert pod_spec.restart_policy == "Never" + assert pod_spec.runtime_class_name == "gvisor" + assert len(pod_spec.tolerations) == 1 + assert pod_spec.tolerations[0].value == "gvisor" + assert len(pod_spec.volumes) == 1 + assert pod_spec.volumes[0].name == "code-volume" + assert pod_spec.volumes[0].config_map.name == "test-cm" + + # Check container properties + container = pod_spec.containers[0] + assert container.name == "code-runner" + assert container.image == "test-img:v1" + assert container.command == ["python3", "/app/code.py"] + + # Check security context + sec_context = container.security_context + assert sec_context.run_as_non_root is True + assert sec_context.run_as_user == 1001 + assert sec_context.allow_privilege_escalation is False + assert sec_context.read_only_root_filesystem is True + assert sec_context.capabilities.drop == ["ALL"] \ No newline at end of file From efd7a76cb1289467ceada6de04bf549317266956 Mon Sep 17 00:00:00 2001 From: syangx39 Date: Fri, 25 Jul 2025 08:50:21 +0000 Subject: [PATCH 14/30] [07/25] modify pyproject.toml to add gke-specific dependency --- pyproject.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 269e1bc22a..c8cb923f8d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -145,6 +145,10 @@ extensions = [ "toolbox-core>=0.1.0", # For tools.toolbox_toolset.ToolboxToolset ] +# For GKE-specific features, primarily the GkeCodeExecutor. +gke = [ + "kubernetes>=29.0.0", +] [tool.pyink] # Format py files following Google style-guide From 6914958de75c84d5eac197a8f5b78235cb43976f Mon Sep 17 00:00:00 2001 From: syangx39 Date: Fri, 15 Aug 2025 20:35:01 +0000 Subject: [PATCH 15/30] [08/15] rename cpu_request --- src/google/adk/code_executors/gke_code_executor.py | 6 +++--- tests/unittests/code_executors/test_gke_code_executor.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index 4901d2140b..2c94f3a3f7 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -35,8 +35,8 @@ class GkeCodeExecutor(BaseCodeExecutor): namespace: str = "default" image: str = "python:3.11-slim" timeout_seconds: int = 300 - cpu_request: str = "200m" - mem_request: str = "256Mi" + cpu_requested: str = "200m" + mem_requested: str = "256Mi" # The maximum CPU the container can use, in "millicores". 1000m is 1 full CPU core. cpu_limit: str = "500m" mem_limit: str = "512Mi" @@ -126,7 +126,7 @@ def _create_job_manifest(self, job_name: str, configmap_name: str, invocation_co ), # Set resource limits to prevent abuse. resources=client.V1ResourceRequirements( - requests={"cpu": self.cpu_request, "memory": self.mem_request}, + requests={"cpu": self.cpu_requested, "memory": self.mem_requested}, limits={"cpu": self.cpu_limit, "memory": self.mem_limit}, ), ) diff --git a/tests/unittests/code_executors/test_gke_code_executor.py b/tests/unittests/code_executors/test_gke_code_executor.py index 549ec1398d..6be0ea4f19 100644 --- a/tests/unittests/code_executors/test_gke_code_executor.py +++ b/tests/unittests/code_executors/test_gke_code_executor.py @@ -65,7 +65,7 @@ def test_init_defaults(self): assert executor.namespace == "default" assert executor.image == "python:3.11-slim" assert executor.timeout_seconds == 300 - assert executor.cpu_request == "200m" + assert executor.cpu_requested == "200m" assert executor.mem_limit == "512Mi" def test_init_with_overrides(self): From 5b7595c90011e3bddb62c2a7a786c7d30b68446b Mon Sep 17 00:00:00 2001 From: syangx39 Date: Fri, 15 Aug 2025 21:40:50 +0000 Subject: [PATCH 16/30] [08/15] rename cpu_request --- .../adk/code_executors/gke_code_executor.py | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index 2c94f3a3f7..8deffdfb16 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -26,11 +26,23 @@ class GkeCodeExecutor(BaseCodeExecutor): - Efficient, event-driven waiting using the Kubernetes watch API. RBAC Permissions: - This executor interacts with the Kubernetes API and requires a ServiceAccount - with specific RBAC permissions to function. The agent's pod needs permissions - to create/watch Jobs, create/delete ConfigMaps, and list Pods to read logs. - For a complete, working example of the required Role and RoleBinding, see the - file at: contributing/samples/gke_agent_sandbox/deployment_rbac.yaml + This executor requires a ServiceAccount with specific RBAC permissions. The + Role granted to the ServiceAccount must include rules to manage Jobs, + ConfigMaps, and Pod logs. Below is a minimal set of required permissions: + + rules: + # For creating/deleting code ConfigMaps and patching ownerReferences + - apiGroups: [""] # Core API Group + resources: ["configmaps"] + verbs: ["create", "delete", "get", "patch"] + # For watching Job completion status + - apiGroups: ["batch"] + resources: ["jobs"] + verbs: ["get", "list", "watch", "create", "delete"] + # For retrieving logs from the completed Job's Pod + - apiGroups: [""] # Core API Group + resources: ["pods", "pods/log"] + verbs: ["get", "list"] """ namespace: str = "default" image: str = "python:3.11-slim" From 1a6eca2dbd2d7b8b7ee4fdd63692027c49294e56 Mon Sep 17 00:00:00 2001 From: syangx39 Date: Fri, 15 Aug 2025 21:53:04 +0000 Subject: [PATCH 17/30] [08/15] rename cpu_request --- tests/unittests/code_executors/test_gke_code_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unittests/code_executors/test_gke_code_executor.py b/tests/unittests/code_executors/test_gke_code_executor.py index 6be0ea4f19..9738a1b950 100644 --- a/tests/unittests/code_executors/test_gke_code_executor.py +++ b/tests/unittests/code_executors/test_gke_code_executor.py @@ -206,4 +206,4 @@ def test_create_job_manifest_structure(self, mock_invocation_context): assert sec_context.run_as_user == 1001 assert sec_context.allow_privilege_escalation is False assert sec_context.read_only_root_filesystem is True - assert sec_context.capabilities.drop == ["ALL"] \ No newline at end of file + assert sec_context.capabilities.drop == ["ALL"] From 6010a3bf611f94ed9d202c85b89bf496e8eeca95 Mon Sep 17 00:00:00 2001 From: syangx39 Date: Mon, 25 Aug 2025 18:56:25 +0000 Subject: [PATCH 18/30] [08/25] Modify ADK --- src/google/adk/code_executors/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/google/adk/code_executors/__init__.py b/src/google/adk/code_executors/__init__.py index 7923738a4d..41d406f63d 100644 --- a/src/google/adk/code_executors/__init__.py +++ b/src/google/adk/code_executors/__init__.py @@ -56,6 +56,7 @@ def __getattr__(name: str): elif name == 'GkeCodeExecutor': try: from .gke_code_executor import GkeCodeExecutor + return GkeCodeExecutor except ImportError as e: raise ImportError( From 4dc4753a0294e516f623f0a1a3d562048dd3fea6 Mon Sep 17 00:00:00 2001 From: syangx39 Date: Mon, 25 Aug 2025 18:57:51 +0000 Subject: [PATCH 19/30] [08/25] Modify ADK --- src/google/adk/code_executors/__init__.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/google/adk/code_executors/__init__.py b/src/google/adk/code_executors/__init__.py index 41d406f63d..32f8a4e6de 100644 --- a/src/google/adk/code_executors/__init__.py +++ b/src/google/adk/code_executors/__init__.py @@ -53,14 +53,4 @@ def __getattr__(name: str): 'ContainerCodeExecutor requires additional dependencies. ' 'Please install with: pip install "google-adk[extensions]"' ) from e - elif name == 'GkeCodeExecutor': - try: - from .gke_code_executor import GkeCodeExecutor - - return GkeCodeExecutor - except ImportError as e: - raise ImportError( - 'GkeCodeExecutor requires additional dependencies. ' - 'Please install with: pip install "google-adk[extensions]"' - ) from e raise AttributeError(f"module '{__name__}' has no attribute '{name}'") From 9ec5e8b21f2c3eb8e78e7df3d142237b76f11b17 Mon Sep 17 00:00:00 2001 From: syangx39 Date: Mon, 25 Aug 2025 19:03:15 +0000 Subject: [PATCH 20/30] [08/25] Modify ADK --- .../code_execution/{gke_sandbox.agent.py => gke_sandbox_agent.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename contributing/samples/code_execution/{gke_sandbox.agent.py => gke_sandbox_agent.py} (100%) diff --git a/contributing/samples/code_execution/gke_sandbox.agent.py b/contributing/samples/code_execution/gke_sandbox_agent.py similarity index 100% rename from contributing/samples/code_execution/gke_sandbox.agent.py rename to contributing/samples/code_execution/gke_sandbox_agent.py From 3c962ff40280a684f72686cb2e5f582b0c3409f7 Mon Sep 17 00:00:00 2001 From: syangx39 Date: Mon, 25 Aug 2025 19:35:46 +0000 Subject: [PATCH 21/30] [08/25] Modify ADK --- pyproject.toml | 6 +----- src/google/adk/code_executors/__init__.py | 10 ++++++++++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c8cb923f8d..cfeb211804 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -139,17 +139,13 @@ extensions = [ "docker>=7.0.0", # For ContainerCodeExecutor "langgraph>=0.2.60", # For LangGraphAgent "litellm>=1.75.5", # For LiteLlm class. Currently has OpenAI limitations. TODO: once LiteLlm fix it + "kubernetes>=29.0.0", # For GkeCodeExecutor "llama-index-readers-file>=0.4.0", # For retrieval using LlamaIndex. "llama-index-embeddings-google-genai>=0.3.0",# For files retrieval using LlamaIndex. "lxml>=5.3.0", # For load_web_page tool. "toolbox-core>=0.1.0", # For tools.toolbox_toolset.ToolboxToolset ] -# For GKE-specific features, primarily the GkeCodeExecutor. -gke = [ - "kubernetes>=29.0.0", -] - [tool.pyink] # Format py files following Google style-guide line-length = 80 diff --git a/src/google/adk/code_executors/__init__.py b/src/google/adk/code_executors/__init__.py index 32f8a4e6de..41d406f63d 100644 --- a/src/google/adk/code_executors/__init__.py +++ b/src/google/adk/code_executors/__init__.py @@ -53,4 +53,14 @@ def __getattr__(name: str): 'ContainerCodeExecutor requires additional dependencies. ' 'Please install with: pip install "google-adk[extensions]"' ) from e + elif name == 'GkeCodeExecutor': + try: + from .gke_code_executor import GkeCodeExecutor + + return GkeCodeExecutor + except ImportError as e: + raise ImportError( + 'GkeCodeExecutor requires additional dependencies. ' + 'Please install with: pip install "google-adk[extensions]"' + ) from e raise AttributeError(f"module '{__name__}' has no attribute '{name}'") From ae138009287c30517d99fbc9c3809d58af2d678e Mon Sep 17 00:00:00 2001 From: syangx39 Date: Mon, 25 Aug 2025 19:36:38 +0000 Subject: [PATCH 22/30] [08/25] Modify ADK --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index cfeb211804..a999a3e2ef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -146,6 +146,7 @@ extensions = [ "toolbox-core>=0.1.0", # For tools.toolbox_toolset.ToolboxToolset ] + [tool.pyink] # Format py files following Google style-guide line-length = 80 From d3c1dd03c5bc1ba03cb1e80b97b050ea06419446 Mon Sep 17 00:00:00 2001 From: syangx39 Date: Mon, 25 Aug 2025 20:03:02 +0000 Subject: [PATCH 23/30] [08/25] Modify ADK --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index a999a3e2ef..5706e3a8aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,6 +110,7 @@ test = [ "langchain-community>=0.3.17", "langgraph>=0.2.60, <= 0.4.10", # For LangGraphAgent "litellm>=1.75.5, <2.0.0", # For LiteLLM tests + "kubernetes>=29.0.0", # For GkeCodeExecutor "llama-index-readers-file>=0.4.0", # For retrieval tests "openai>=1.100.2", # For LiteLLM "pytest-asyncio>=0.25.0", From 67ebf383471950af47355fd6fdba3d1ed7170b2c Mon Sep 17 00:00:00 2001 From: Summer Date: Thu, 28 Aug 2025 16:59:53 -0700 Subject: [PATCH 24/30] [08/28] fix kubeconfig --- .../adk/code_executors/gke_code_executor.py | 48 ++++++++++++++----- 1 file changed, 37 insertions(+), 11 deletions(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index 8deffdfb16..b323b75eac 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -53,24 +53,50 @@ class GkeCodeExecutor(BaseCodeExecutor): cpu_limit: str = "500m" mem_limit: str = "512Mi" + kubeconfig_path: str | None = None + kubeconfig_context: str | None = None + _batch_v1: client.BatchV1Api _core_v1: client.CoreV1Api - def __init__(self, **data): + def __init__( + self, + kubeconfig_path: str | None = None, + kubeconfig_context: str | None = None, + **data + ): """Initializes the executor and the Kubernetes API clients. - This constructor supports overriding default class attributes (like - 'namespace', 'image', etc.) by passing them as keyword arguments. It - automatically configures the Kubernetes client to work either within a - cluster (in-cluster config) or locally using a kubeconfig file. + This constructor supports multiple authentication methods: + 1. Explicitly via a kubeconfig file path and context. + 2. Automatically via in-cluster service account (when running in GKE). + 3. Automatically via the default local kubeconfig file (~/.kube/config). """ super().__init__(**data) - try: - config.load_incluster_config() - logger.info("Using in-cluster Kubernetes configuration.") - except config.ConfigException: - logger.info("In-cluster config not found. Falling back to local kubeconfig.") - config.load_kube_config() + self.kubeconfig_path = kubeconfig_path + self.kubeconfig_context = kubeconfig_context + + if self.kubeconfig_path: + try: + logger.info(f"Using explicit kubeconfig from '{self.kubeconfig_path}'.") + config.load_kube_config( + config_file=self.kubeconfig_path, + context=self.kubeconfig_context + ) + except config.ConfigException as e: + logger.error(f"Failed to load explicit kubeconfig from {self.kubeconfig_path}", exc_info=True) + raise RuntimeError("Failed to configure Kubernetes client from provided path.") from e + else: + try: + config.load_incluster_config() + logger.info("Using in-cluster Kubernetes configuration.") + except config.ConfigException: + try: + logger.info("In-cluster config not found. Falling back to default local kubeconfig.") + config.load_kube_config() + except config.ConfigException as e: + logger.error("Could not configure Kubernetes client automatically.", exc_info=True) + raise RuntimeError("Failed to find any valid Kubernetes configuration.") from e self._batch_v1 = client.BatchV1Api() self._core_v1 = client.CoreV1Api() From 5a7002a3198af4eb7ddb13115003ea233162c132 Mon Sep 17 00:00:00 2001 From: GWeale Date: Fri, 5 Sep 2025 17:20:50 -0700 Subject: [PATCH 25/30] fix(gke): add future annotations for py3.9 compatibility #non-breaking --- .../code_execution/gke_sandbox_agent.py | 4 +- .../adk/code_executors/gke_code_executor.py | 571 ++++++++++-------- .../code_executors/test_gke_code_executor.py | 366 +++++------ 3 files changed, 499 insertions(+), 442 deletions(-) diff --git a/contributing/samples/code_execution/gke_sandbox_agent.py b/contributing/samples/code_execution/gke_sandbox_agent.py index 5ee64513bf..4baaf52152 100644 --- a/contributing/samples/code_execution/gke_sandbox_agent.py +++ b/contributing/samples/code_execution/gke_sandbox_agent.py @@ -19,8 +19,8 @@ def gke_agent_system_instruction(): - """Returns: The system instruction for the GKE-based coding agent.""" - return """You are a helpful and capable AI agent that can write and execute Python code to answer questions and perform tasks. + """Returns: The system instruction for the GKE-based coding agent.""" + return """You are a helpful and capable AI agent that can write and execute Python code to answer questions and perform tasks. When a user asks a question, follow these steps: 1. Analyze the request. diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index b323b75eac..81b4bdceaf 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -1,291 +1,330 @@ +from __future__ import annotations + import logging import uuid from google.adk.agents.invocation_context import InvocationContext from google.adk.code_executors.base_code_executor import BaseCodeExecutor -from google.adk.code_executors.code_execution_utils import CodeExecutionInput, CodeExecutionResult - -from kubernetes import client, config +from google.adk.code_executors.code_execution_utils import CodeExecutionInput +from google.adk.code_executors.code_execution_utils import CodeExecutionResult +from kubernetes import client +from kubernetes import config from kubernetes.client.rest import ApiException from kubernetes.watch import Watch -logger = logging.getLogger(__name__) +logger = logging.getLogger("google_adk." + __name__) + class GkeCodeExecutor(BaseCodeExecutor): - """Executes Python code in a secure gVisor-sandboxed Pod on GKE. - - This executor securely runs code by dynamically creating a Kubernetes Job for - each execution request. The user's code is mounted via a ConfigMap, and the - Pod is hardened with a strict security context and resource limits. - - Key Features: - - Sandboxed execution using the gVisor runtime. - - Ephemeral, per-execution environments using Kubernetes Jobs. - - Secure-by-default Pod configuration (non-root, no privileges). - - Automatic garbage collection of completed Jobs and Pods via TTL. - - Efficient, event-driven waiting using the Kubernetes watch API. - - RBAC Permissions: - This executor requires a ServiceAccount with specific RBAC permissions. The - Role granted to the ServiceAccount must include rules to manage Jobs, - ConfigMaps, and Pod logs. Below is a minimal set of required permissions: - - rules: - # For creating/deleting code ConfigMaps and patching ownerReferences - - apiGroups: [""] # Core API Group - resources: ["configmaps"] - verbs: ["create", "delete", "get", "patch"] - # For watching Job completion status - - apiGroups: ["batch"] - resources: ["jobs"] - verbs: ["get", "list", "watch", "create", "delete"] - # For retrieving logs from the completed Job's Pod - - apiGroups: [""] # Core API Group - resources: ["pods", "pods/log"] - verbs: ["get", "list"] + """Executes Python code in a secure gVisor-sandboxed Pod on GKE. + + This executor securely runs code by dynamically creating a Kubernetes Job for + each execution request. The user's code is mounted via a ConfigMap, and the + Pod is hardened with a strict security context and resource limits. + + Key Features: + - Sandboxed execution using the gVisor runtime. + - Ephemeral, per-execution environments using Kubernetes Jobs. + - Secure-by-default Pod configuration (non-root, no privileges). + - Automatic garbage collection of completed Jobs and Pods via TTL. + - Efficient, event-driven waiting using the Kubernetes watch API. + + RBAC Permissions: + This executor requires a ServiceAccount with specific RBAC permissions. The + Role granted to the ServiceAccount must include rules to manage Jobs, + ConfigMaps, and Pod logs. Below is a minimal set of required permissions: + + rules: + # For creating/deleting code ConfigMaps and patching ownerReferences + - apiGroups: [""] # Core API Group + resources: ["configmaps"] + verbs: ["create", "delete", "get", "patch"] + # For watching Job completion status + - apiGroups: ["batch"] + resources: ["jobs"] + verbs: ["get", "list", "watch", "create", "delete"] + # For retrieving logs from the completed Job's Pod + - apiGroups: [""] # Core API Group + resources: ["pods", "pods/log"] + verbs: ["get", "list"] + """ + + namespace: str = "default" + image: str = "python:3.11-slim" + timeout_seconds: int = 300 + cpu_requested: str = "200m" + mem_requested: str = "256Mi" + # The maximum CPU the container can use, in "millicores". 1000m is 1 full CPU core. + cpu_limit: str = "500m" + mem_limit: str = "512Mi" + + kubeconfig_path: str | None = None + kubeconfig_context: str | None = None + + _batch_v1: client.BatchV1Api + _core_v1: client.CoreV1Api + + def __init__( + self, + kubeconfig_path: str | None = None, + kubeconfig_context: str | None = None, + **data, + ): + """Initializes the executor and the Kubernetes API clients. + + This constructor supports multiple authentication methods: + 1. Explicitly via a kubeconfig file path and context. + 2. Automatically via in-cluster service account (when running in GKE). + 3. Automatically via the default local kubeconfig file (~/.kube/config). """ - namespace: str = "default" - image: str = "python:3.11-slim" - timeout_seconds: int = 300 - cpu_requested: str = "200m" - mem_requested: str = "256Mi" - # The maximum CPU the container can use, in "millicores". 1000m is 1 full CPU core. - cpu_limit: str = "500m" - mem_limit: str = "512Mi" - - kubeconfig_path: str | None = None - kubeconfig_context: str | None = None - - _batch_v1: client.BatchV1Api - _core_v1: client.CoreV1Api - - def __init__( - self, - kubeconfig_path: str | None = None, - kubeconfig_context: str | None = None, - **data - ): - """Initializes the executor and the Kubernetes API clients. - - This constructor supports multiple authentication methods: - 1. Explicitly via a kubeconfig file path and context. - 2. Automatically via in-cluster service account (when running in GKE). - 3. Automatically via the default local kubeconfig file (~/.kube/config). - """ - super().__init__(**data) - self.kubeconfig_path = kubeconfig_path - self.kubeconfig_context = kubeconfig_context - - if self.kubeconfig_path: - try: - logger.info(f"Using explicit kubeconfig from '{self.kubeconfig_path}'.") - config.load_kube_config( - config_file=self.kubeconfig_path, - context=self.kubeconfig_context - ) - except config.ConfigException as e: - logger.error(f"Failed to load explicit kubeconfig from {self.kubeconfig_path}", exc_info=True) - raise RuntimeError("Failed to configure Kubernetes client from provided path.") from e - else: - try: - config.load_incluster_config() - logger.info("Using in-cluster Kubernetes configuration.") - except config.ConfigException: - try: - logger.info("In-cluster config not found. Falling back to default local kubeconfig.") - config.load_kube_config() - except config.ConfigException as e: - logger.error("Could not configure Kubernetes client automatically.", exc_info=True) - raise RuntimeError("Failed to find any valid Kubernetes configuration.") from e - - self._batch_v1 = client.BatchV1Api() - self._core_v1 = client.CoreV1Api() - - def execute_code( - self, - invocation_context: InvocationContext, - code_execution_input: CodeExecutionInput, - ) -> CodeExecutionResult: - """Orchestrates the secure execution of a code snippet on GKE.""" - job_name = f"adk-exec-{uuid.uuid4().hex[:10]}" - configmap_name = f"code-src-{job_name}" + super().__init__(**data) + self.kubeconfig_path = kubeconfig_path + self.kubeconfig_context = kubeconfig_context + if self.kubeconfig_path: + try: + logger.info(f"Using explicit kubeconfig from '{self.kubeconfig_path}'.") + config.load_kube_config( + config_file=self.kubeconfig_path, context=self.kubeconfig_context + ) + except config.ConfigException as e: + logger.error( + f"Failed to load explicit kubeconfig from {self.kubeconfig_path}", + exc_info=True, + ) + raise RuntimeError( + "Failed to configure Kubernetes client from provided path." + ) from e + else: + try: + config.load_incluster_config() + logger.info("Using in-cluster Kubernetes configuration.") + except config.ConfigException: try: - # The execution process: - # 1. Create a ConfigMap to mount LLM-generated code into the Pod. - # 2. Create a Job that runs the code from the ConfigMap. - # 3. Set the Job as the ConfigMap's owner for automatic cleanup. - self._create_code_configmap(configmap_name, code_execution_input.code) - job_manifest = self._create_job_manifest(job_name, configmap_name, invocation_context) - created_job = self._batch_v1.create_namespaced_job( - body=job_manifest, namespace=self.namespace - ) - self._add_owner_reference(created_job, configmap_name) + logger.info( + "In-cluster config not found. Falling back to default local" + " kubeconfig." + ) + config.load_kube_config() + except config.ConfigException as e: + logger.error( + "Could not configure Kubernetes client automatically.", + exc_info=True, + ) + raise RuntimeError( + "Failed to find any valid Kubernetes configuration." + ) from e + + self._batch_v1 = client.BatchV1Api() + self._core_v1 = client.CoreV1Api() + + def execute_code( + self, + invocation_context: InvocationContext, + code_execution_input: CodeExecutionInput, + ) -> CodeExecutionResult: + """Orchestrates the secure execution of a code snippet on GKE.""" + job_name = f"adk-exec-{uuid.uuid4().hex[:10]}" + configmap_name = f"code-src-{job_name}" + + try: + # The execution process: + # 1. Create a ConfigMap to mount LLM-generated code into the Pod. + # 2. Create a Job that runs the code from the ConfigMap. + # 3. Set the Job as the ConfigMap's owner for automatic cleanup. + self._create_code_configmap(configmap_name, code_execution_input.code) + job_manifest = self._create_job_manifest( + job_name, configmap_name, invocation_context + ) + created_job = self._batch_v1.create_namespaced_job( + body=job_manifest, namespace=self.namespace + ) + self._add_owner_reference(created_job, configmap_name) + + logger.info( + f"Submitted Job '{job_name}' to namespace '{self.namespace}'." + ) + return self._watch_job_completion(job_name) - logger.info(f"Submitted Job '{job_name}' to namespace '{self.namespace}'.") - return self._watch_job_completion(job_name) + except ApiException as e: + logger.error( + "A Kubernetes API error occurred during job" + f" '{job_name}': {e.reason}", + exc_info=True, + ) + return CodeExecutionResult(stderr=f"Kubernetes API error: {e.reason}") + except TimeoutError as e: + logger.error(e, exc_info=True) + logs = self._get_pod_logs(job_name) + stderr = f"Executor timed out: {e}\n\nPod Logs:\n{logs}" + return CodeExecutionResult(stderr=stderr) + except Exception as e: + logger.error( + f"An unexpected error occurred during job '{job_name}': {e}", + exc_info=True, + ) + return CodeExecutionResult( + stderr=f"An unexpected executor error occurred: {e}" + ) - except ApiException as e: - logger.error( - "A Kubernetes API error occurred during job" - f" '{job_name}': {e.reason}", - exc_info=True, + def _create_job_manifest( + self, + job_name: str, + configmap_name: str, + invocation_context: InvocationContext, + ) -> client.V1Job: + """Creates the complete V1Job object with security best practices.""" + # Define the container that will run the code. + container = client.V1Container( + name="code-runner", + image=self.image, + command=["python3", "/app/code.py"], + volume_mounts=[ + client.V1VolumeMount(name="code-volume", mount_path="/app") + ], + # Enforce a strict security context. + security_context=client.V1SecurityContext( + run_as_non_root=True, + run_as_user=1001, + allow_privilege_escalation=False, + read_only_root_filesystem=True, + capabilities=client.V1Capabilities(drop=["ALL"]), + ), + # Set resource limits to prevent abuse. + resources=client.V1ResourceRequirements( + requests={"cpu": self.cpu_requested, "memory": self.mem_requested}, + limits={"cpu": self.cpu_limit, "memory": self.mem_limit}, + ), + ) + + # Use tolerations to request a gVisor node. + pod_spec = client.V1PodSpec( + restart_policy="Never", + containers=[container], + volumes=[ + client.V1Volume( + name="code-volume", + config_map=client.V1ConfigMapVolumeSource(name=configmap_name), ) - return CodeExecutionResult(stderr=f"Kubernetes API error: {e.reason}") - except TimeoutError as e: - logger.error(e, exc_info=True) - logs = self._get_pod_logs(job_name) - stderr = f"Executor timed out: {e}\n\nPod Logs:\n{logs}" - return CodeExecutionResult(stderr=stderr) - except Exception as e: - logger.error( - f"An unexpected error occurred during job '{job_name}': {e}", - exc_info=True, + ], + runtime_class_name="gvisor", # Request the gVisor runtime. + tolerations=[ + client.V1Toleration( + key="sandbox.gke.io/runtime", + operator="Equal", + value="gvisor", + effect="NoSchedule", ) - return CodeExecutionResult(stderr=f"An unexpected executor error occurred: {e}") - - def _create_job_manifest(self, job_name: str, configmap_name: str, invocation_context: InvocationContext) -> client.V1Job: - """Creates the complete V1Job object with security best practices.""" - # Define the container that will run the code. - container = client.V1Container( - name="code-runner", - image=self.image, - command=["python3", "/app/code.py"], - volume_mounts=[ - client.V1VolumeMount(name="code-volume", mount_path="/app") - ], - # Enforce a strict security context. - security_context=client.V1SecurityContext( - run_as_non_root=True, - run_as_user=1001, - allow_privilege_escalation=False, - read_only_root_filesystem=True, - capabilities=client.V1Capabilities(drop=["ALL"]), - ), - # Set resource limits to prevent abuse. - resources=client.V1ResourceRequirements( - requests={"cpu": self.cpu_requested, "memory": self.mem_requested}, - limits={"cpu": self.cpu_limit, "memory": self.mem_limit}, - ), - ) - - # Use tolerations to request a gVisor node. - pod_spec = client.V1PodSpec( - restart_policy="Never", - containers=[container], - volumes=[ - client.V1Volume( - name="code-volume", - config_map=client.V1ConfigMapVolumeSource(name=configmap_name), - ) - ], - runtime_class_name="gvisor", # Request the gVisor runtime. - tolerations=[ - client.V1Toleration( - key="sandbox.gke.io/runtime", - operator="Equal", - value="gvisor", - effect="NoSchedule", - ) - ], - ) + ], + ) - job_spec = client.V1JobSpec( - template=client.V1PodTemplateSpec(spec=pod_spec), - backoff_limit=0, # Do not retry the Job on failure. - # Kubernetes TTL controller will handle Job/Pod cleanup. - ttl_seconds_after_finished=600, # Garbage collect after 10 minutes. - ) - - # Assemble and return the final Job object. - annotations = { - "adk.agent.google.com/invocation-id": invocation_context.invocation_id - } - return client.V1Job( - api_version="batch/v1", - kind="Job", - metadata=client.V1ObjectMeta(name=job_name, annotations=annotations), - spec=job_spec, - ) + job_spec = client.V1JobSpec( + template=client.V1PodTemplateSpec(spec=pod_spec), + backoff_limit=0, # Do not retry the Job on failure. + # Kubernetes TTL controller will handle Job/Pod cleanup. + ttl_seconds_after_finished=600, # Garbage collect after 10 minutes. + ) - def _watch_job_completion(self, job_name: str) -> CodeExecutionResult: - """Uses the watch API to efficiently wait for job completion.""" - watch = Watch() - try: - for event in watch.stream( - self._batch_v1.list_namespaced_job, - namespace=self.namespace, - field_selector=f"metadata.name={job_name}", - timeout_seconds=self.timeout_seconds, - ): - job = event["object"] - if job.status.succeeded: - watch.stop() - logger.info(f"Job '{job_name}' succeeded.") - logs = self._get_pod_logs(job_name) - return CodeExecutionResult(stdout=logs) - if job.status.failed: - watch.stop() - logger.error(f"Job '{job_name}' failed.") - logs = self._get_pod_logs(job_name) - return CodeExecutionResult(stderr=f"Job failed. Logs:\n{logs}") - - # If the loop finishes without returning, the watch timed out. - raise TimeoutError( - f"Job '{job_name}' did not complete within {self.timeout_seconds}s." - ) - finally: - watch.stop() + # Assemble and return the final Job object. + annotations = { + "adk.agent.google.com/invocation-id": invocation_context.invocation_id + } + return client.V1Job( + api_version="batch/v1", + kind="Job", + metadata=client.V1ObjectMeta(name=job_name, annotations=annotations), + spec=job_spec, + ) - def _get_pod_logs(self, job_name: str) -> str: - """Retrieves logs from the pod created by the specified job. + def _watch_job_completion(self, job_name: str) -> CodeExecutionResult: + """Uses the watch API to efficiently wait for job completion.""" + watch = Watch() + try: + for event in watch.stream( + self._batch_v1.list_namespaced_job, + namespace=self.namespace, + field_selector=f"metadata.name={job_name}", + timeout_seconds=self.timeout_seconds, + ): + job = event["object"] + if job.status.succeeded: + watch.stop() + logger.info(f"Job '{job_name}' succeeded.") + logs = self._get_pod_logs(job_name) + return CodeExecutionResult(stdout=logs) + if job.status.failed: + watch.stop() + logger.error(f"Job '{job_name}' failed.") + logs = self._get_pod_logs(job_name) + return CodeExecutionResult(stderr=f"Job failed. Logs:\n{logs}") - Raises: - RuntimeError: If the pod cannot be found or logs cannot be fetched. - """ - try: - pods = self._core_v1.list_namespaced_pod( - namespace=self.namespace, label_selector=f"job-name={job_name}", limit=1 - ) - if not pods.items: - raise RuntimeError(f"Could not find Pod for Job '{job_name}' to retrieve logs.") + # If the loop finishes without returning, the watch timed out. + raise TimeoutError( + f"Job '{job_name}' did not complete within {self.timeout_seconds}s." + ) + finally: + watch.stop() - pod_name = pods.items[0].metadata.name - return self._core_v1.read_namespaced_pod_log( - name=pod_name, namespace=self.namespace - ) - except ApiException as e: - raise RuntimeError(f"API error retrieving logs for job '{job_name}': {e.reason}") from e + def _get_pod_logs(self, job_name: str) -> str: + """Retrieves logs from the pod created by the specified job. - def _create_code_configmap(self, name: str, code: str) -> None: - """Creates a ConfigMap to hold the Python code.""" - body = client.V1ConfigMap( - metadata=client.V1ObjectMeta(name=name), data={"code.py": code} - ) - self._core_v1.create_namespaced_config_map(namespace=self.namespace, body=body) - - def _add_owner_reference(self, owner_job: client.V1Job, configmap_name: str) -> None: - """Patches the ConfigMap to be owned by the Job for auto-cleanup.""" - owner_reference = client.V1OwnerReference( - api_version=owner_job.api_version, - kind=owner_job.kind, - name=owner_job.metadata.name, - uid=owner_job.metadata.uid, - controller=True, + Raises: + RuntimeError: If the pod cannot be found or logs cannot be fetched. + """ + try: + pods = self._core_v1.list_namespaced_pod( + namespace=self.namespace, + label_selector=f"job-name={job_name}", + limit=1, + ) + if not pods.items: + raise RuntimeError( + f"Could not find Pod for Job '{job_name}' to retrieve logs." ) - patch_body = { - "metadata": {"ownerReferences": [owner_reference.to_dict()]} - } - try: - self._core_v1.patch_namespaced_config_map( - name=configmap_name, - namespace=self.namespace, - body=patch_body, - ) - logger.info(f"Set Job '{owner_job.metadata.name}' as owner of ConfigMap '{configmap_name}'.") - except ApiException as e: - logger.warning( - f"Failed to set ownerReference on ConfigMap '{configmap_name}'. " - f"Manual cleanup is required. Reason: {e.reason}" - ) + pod_name = pods.items[0].metadata.name + return self._core_v1.read_namespaced_pod_log( + name=pod_name, namespace=self.namespace + ) + except ApiException as e: + raise RuntimeError( + f"API error retrieving logs for job '{job_name}': {e.reason}" + ) from e + + def _create_code_configmap(self, name: str, code: str) -> None: + """Creates a ConfigMap to hold the Python code.""" + body = client.V1ConfigMap( + metadata=client.V1ObjectMeta(name=name), data={"code.py": code} + ) + self._core_v1.create_namespaced_config_map( + namespace=self.namespace, body=body + ) + + def _add_owner_reference( + self, owner_job: client.V1Job, configmap_name: str + ) -> None: + """Patches the ConfigMap to be owned by the Job for auto-cleanup.""" + owner_reference = client.V1OwnerReference( + api_version=owner_job.api_version, + kind=owner_job.kind, + name=owner_job.metadata.name, + uid=owner_job.metadata.uid, + controller=True, + ) + patch_body = {"metadata": {"ownerReferences": [owner_reference.to_dict()]}} + + try: + self._core_v1.patch_namespaced_config_map( + name=configmap_name, + namespace=self.namespace, + body=patch_body, + ) + logger.info( + f"Set Job '{owner_job.metadata.name}' as owner of ConfigMap" + f" '{configmap_name}'." + ) + except ApiException as e: + logger.warning( + f"Failed to set ownerReference on ConfigMap '{configmap_name}'. " + f"Manual cleanup is required. Reason: {e.reason}" + ) diff --git a/tests/unittests/code_executors/test_gke_code_executor.py b/tests/unittests/code_executors/test_gke_code_executor.py index 9738a1b950..5ef99792f3 100644 --- a/tests/unittests/code_executors/test_gke_code_executor.py +++ b/tests/unittests/code_executors/test_gke_code_executor.py @@ -12,198 +12,216 @@ # See the License for the specific language governing permissions and # limitations under the License. -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock +from unittest.mock import patch from google.adk.agents.invocation_context import InvocationContext from google.adk.code_executors.code_execution_utils import CodeExecutionInput from google.adk.code_executors.gke_code_executor import GkeCodeExecutor +from kubernetes import client +from kubernetes import config from kubernetes.client.rest import ApiException -from kubernetes import client, config import pytest @pytest.fixture def mock_invocation_context() -> InvocationContext: - """Fixture for a mock InvocationContext.""" - mock = MagicMock(spec=InvocationContext) - mock.invocation_id = "test-invocation-123" - return mock + """Fixture for a mock InvocationContext.""" + mock = MagicMock(spec=InvocationContext) + mock.invocation_id = "test-invocation-123" + return mock @pytest.fixture(autouse=True) def mock_k8s_config(): - """Fixture for auto-mocking Kubernetes config loading.""" - with patch("google.adk.code_executors.gke_code_executor.config") as mock_config: - # Simulate fallback from in-cluster to kubeconfig - mock_config.ConfigException = config.ConfigException - mock_config.load_incluster_config.side_effect = config.ConfigException - yield mock_config + """Fixture for auto-mocking Kubernetes config loading.""" + with patch( + "google.adk.code_executors.gke_code_executor.config" + ) as mock_config: + # Simulate fallback from in-cluster to kubeconfig + mock_config.ConfigException = config.ConfigException + mock_config.load_incluster_config.side_effect = config.ConfigException + yield mock_config @pytest.fixture def mock_k8s_clients(): - """Fixture for mock Kubernetes API clients.""" - with patch( - "google.adk.code_executors.gke_code_executor.client" - ) as mock_client_class: - mock_batch_v1 = MagicMock(spec=client.BatchV1Api) - mock_core_v1 = MagicMock(spec=client.CoreV1Api) - mock_client_class.BatchV1Api.return_value = mock_batch_v1 - mock_client_class.CoreV1Api.return_value = mock_core_v1 - yield { - "batch_v1": mock_batch_v1, - "core_v1": mock_core_v1, - } + """Fixture for mock Kubernetes API clients.""" + with patch( + "google.adk.code_executors.gke_code_executor.client" + ) as mock_client_class: + mock_batch_v1 = MagicMock(spec=client.BatchV1Api) + mock_core_v1 = MagicMock(spec=client.CoreV1Api) + mock_client_class.BatchV1Api.return_value = mock_batch_v1 + mock_client_class.CoreV1Api.return_value = mock_core_v1 + yield { + "batch_v1": mock_batch_v1, + "core_v1": mock_core_v1, + } class TestGkeCodeExecutor: - """Unit tests for the GkeCodeExecutor.""" - - def test_init_defaults(self): - """Tests that the executor initializes with correct default values.""" - executor = GkeCodeExecutor() - assert executor.namespace == "default" - assert executor.image == "python:3.11-slim" - assert executor.timeout_seconds == 300 - assert executor.cpu_requested == "200m" - assert executor.mem_limit == "512Mi" - - def test_init_with_overrides(self): - """Tests that class attributes can be overridden at instantiation.""" - executor = GkeCodeExecutor( - namespace="test-ns", - image="custom-python:latest", - timeout_seconds=60, - cpu_limit="1000m", - ) - assert executor.namespace == "test-ns" - assert executor.image == "custom-python:latest" - assert executor.timeout_seconds == 60 - assert executor.cpu_limit == "1000m" - - @patch("google.adk.code_executors.gke_code_executor.Watch") - def test_execute_code_success( - self, - mock_watch, - mock_k8s_clients, - mock_invocation_context, - ): - """Tests the happy path for successful code execution.""" - # Setup Mocks - mock_job = MagicMock() - mock_job.status.succeeded = True - mock_job.status.failed = None - mock_watch.return_value.stream.return_value = [{"object": mock_job}] - - mock_pod_list = MagicMock() - mock_pod_list.items = [MagicMock()] - mock_pod_list.items[0].metadata.name = "test-pod-name" - mock_k8s_clients["core_v1"].list_namespaced_pod.return_value = mock_pod_list - mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = "hello world" - - # Execute - executor = GkeCodeExecutor() - code_input = CodeExecutionInput(code='print("hello world")') - result = executor.execute_code(mock_invocation_context, code_input) - - # Assert - assert result.stdout == "hello world" - assert result.stderr == "" - mock_k8s_clients["core_v1"].create_namespaced_config_map.assert_called_once() - mock_k8s_clients["batch_v1"].create_namespaced_job.assert_called_once() - mock_k8s_clients["core_v1"].patch_namespaced_config_map.assert_called_once() - mock_k8s_clients["core_v1"].read_namespaced_pod_log.assert_called_once() - - @patch("google.adk.code_executors.gke_code_executor.Watch") - def test_execute_code_job_failed( - self, - mock_watch, - mock_k8s_clients, - mock_invocation_context, - ): - """Tests the path where the Kubernetes Job fails.""" - mock_job = MagicMock() - mock_job.status.succeeded = None - mock_job.status.failed = True - mock_watch.return_value.stream.return_value = [{"object": mock_job}] - mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = ( - "Traceback...\nValueError: failure" - ) - - executor = GkeCodeExecutor() - result = executor.execute_code(mock_invocation_context, CodeExecutionInput(code="fail")) - - assert result.stdout == "" - assert "Job failed. Logs:" in result.stderr - assert "ValueError: failure" in result.stderr - - def test_execute_code_api_exception( - self, mock_k8s_clients, mock_invocation_context - ): - """Tests handling of an ApiException from the K8s client.""" - mock_k8s_clients["core_v1"].create_namespaced_config_map.side_effect = ( - ApiException(reason="Test API Error") - ) - executor = GkeCodeExecutor() - result = executor.execute_code(mock_invocation_context, CodeExecutionInput(code="...")) - - assert result.stdout == "" - assert "Kubernetes API error: Test API Error" in result.stderr - - @patch("google.adk.code_executors.gke_code_executor.Watch") - def test_execute_code_timeout( - self, - mock_watch, - mock_k8s_clients, - mock_invocation_context, - ): - """Tests the case where the job watch times out.""" - mock_watch.return_value.stream.return_value = [] # Empty stream simulates timeout - mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = ( - "Still running..." - ) - - executor = GkeCodeExecutor(timeout_seconds=1) - result = executor.execute_code(mock_invocation_context, CodeExecutionInput(code="...")) - - assert result.stdout == "" - assert "Executor timed out" in result.stderr - assert "did not complete within 1s" in result.stderr - assert "Pod Logs:\nStill running..." in result.stderr - - def test_create_job_manifest_structure(self, mock_invocation_context): - """Tests the correctness of the generated Job manifest.""" - executor = GkeCodeExecutor(namespace="test-ns", image="test-img:v1") - job = executor._create_job_manifest("test-job", "test-cm", mock_invocation_context) - - # Check top-level properties - assert isinstance(job, client.V1Job) - assert job.api_version == "batch/v1" - assert job.kind == "Job" - assert job.metadata.name == "test-job" - assert job.spec.backoff_limit == 0 - assert job.spec.ttl_seconds_after_finished == 600 - - # Check pod template properties - pod_spec = job.spec.template.spec - assert pod_spec.restart_policy == "Never" - assert pod_spec.runtime_class_name == "gvisor" - assert len(pod_spec.tolerations) == 1 - assert pod_spec.tolerations[0].value == "gvisor" - assert len(pod_spec.volumes) == 1 - assert pod_spec.volumes[0].name == "code-volume" - assert pod_spec.volumes[0].config_map.name == "test-cm" - - # Check container properties - container = pod_spec.containers[0] - assert container.name == "code-runner" - assert container.image == "test-img:v1" - assert container.command == ["python3", "/app/code.py"] - - # Check security context - sec_context = container.security_context - assert sec_context.run_as_non_root is True - assert sec_context.run_as_user == 1001 - assert sec_context.allow_privilege_escalation is False - assert sec_context.read_only_root_filesystem is True - assert sec_context.capabilities.drop == ["ALL"] + """Unit tests for the GkeCodeExecutor.""" + + def test_init_defaults(self): + """Tests that the executor initializes with correct default values.""" + executor = GkeCodeExecutor() + assert executor.namespace == "default" + assert executor.image == "python:3.11-slim" + assert executor.timeout_seconds == 300 + assert executor.cpu_requested == "200m" + assert executor.mem_limit == "512Mi" + + def test_init_with_overrides(self): + """Tests that class attributes can be overridden at instantiation.""" + executor = GkeCodeExecutor( + namespace="test-ns", + image="custom-python:latest", + timeout_seconds=60, + cpu_limit="1000m", + ) + assert executor.namespace == "test-ns" + assert executor.image == "custom-python:latest" + assert executor.timeout_seconds == 60 + assert executor.cpu_limit == "1000m" + + @patch("google.adk.code_executors.gke_code_executor.Watch") + def test_execute_code_success( + self, + mock_watch, + mock_k8s_clients, + mock_invocation_context, + ): + """Tests the happy path for successful code execution.""" + # Setup Mocks + mock_job = MagicMock() + mock_job.status.succeeded = True + mock_job.status.failed = None + mock_watch.return_value.stream.return_value = [{"object": mock_job}] + + mock_pod_list = MagicMock() + mock_pod_list.items = [MagicMock()] + mock_pod_list.items[0].metadata.name = "test-pod-name" + mock_k8s_clients["core_v1"].list_namespaced_pod.return_value = mock_pod_list + mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = ( + "hello world" + ) + + # Execute + executor = GkeCodeExecutor() + code_input = CodeExecutionInput(code='print("hello world")') + result = executor.execute_code(mock_invocation_context, code_input) + + # Assert + assert result.stdout == "hello world" + assert result.stderr == "" + mock_k8s_clients[ + "core_v1" + ].create_namespaced_config_map.assert_called_once() + mock_k8s_clients["batch_v1"].create_namespaced_job.assert_called_once() + mock_k8s_clients["core_v1"].patch_namespaced_config_map.assert_called_once() + mock_k8s_clients["core_v1"].read_namespaced_pod_log.assert_called_once() + + @patch("google.adk.code_executors.gke_code_executor.Watch") + def test_execute_code_job_failed( + self, + mock_watch, + mock_k8s_clients, + mock_invocation_context, + ): + """Tests the path where the Kubernetes Job fails.""" + mock_job = MagicMock() + mock_job.status.succeeded = None + mock_job.status.failed = True + mock_watch.return_value.stream.return_value = [{"object": mock_job}] + mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = ( + "Traceback...\nValueError: failure" + ) + + executor = GkeCodeExecutor() + result = executor.execute_code( + mock_invocation_context, CodeExecutionInput(code="fail") + ) + + assert result.stdout == "" + assert "Job failed. Logs:" in result.stderr + assert "ValueError: failure" in result.stderr + + def test_execute_code_api_exception( + self, mock_k8s_clients, mock_invocation_context + ): + """Tests handling of an ApiException from the K8s client.""" + mock_k8s_clients["core_v1"].create_namespaced_config_map.side_effect = ( + ApiException(reason="Test API Error") + ) + executor = GkeCodeExecutor() + result = executor.execute_code( + mock_invocation_context, CodeExecutionInput(code="...") + ) + + assert result.stdout == "" + assert "Kubernetes API error: Test API Error" in result.stderr + + @patch("google.adk.code_executors.gke_code_executor.Watch") + def test_execute_code_timeout( + self, + mock_watch, + mock_k8s_clients, + mock_invocation_context, + ): + """Tests the case where the job watch times out.""" + mock_watch.return_value.stream.return_value = ( + [] + ) # Empty stream simulates timeout + mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = ( + "Still running..." + ) + + executor = GkeCodeExecutor(timeout_seconds=1) + result = executor.execute_code( + mock_invocation_context, CodeExecutionInput(code="...") + ) + + assert result.stdout == "" + assert "Executor timed out" in result.stderr + assert "did not complete within 1s" in result.stderr + assert "Pod Logs:\nStill running..." in result.stderr + + def test_create_job_manifest_structure(self, mock_invocation_context): + """Tests the correctness of the generated Job manifest.""" + executor = GkeCodeExecutor(namespace="test-ns", image="test-img:v1") + job = executor._create_job_manifest( + "test-job", "test-cm", mock_invocation_context + ) + + # Check top-level properties + assert isinstance(job, client.V1Job) + assert job.api_version == "batch/v1" + assert job.kind == "Job" + assert job.metadata.name == "test-job" + assert job.spec.backoff_limit == 0 + assert job.spec.ttl_seconds_after_finished == 600 + + # Check pod template properties + pod_spec = job.spec.template.spec + assert pod_spec.restart_policy == "Never" + assert pod_spec.runtime_class_name == "gvisor" + assert len(pod_spec.tolerations) == 1 + assert pod_spec.tolerations[0].value == "gvisor" + assert len(pod_spec.volumes) == 1 + assert pod_spec.volumes[0].name == "code-volume" + assert pod_spec.volumes[0].config_map.name == "test-cm" + + # Check container properties + container = pod_spec.containers[0] + assert container.name == "code-runner" + assert container.image == "test-img:v1" + assert container.command == ["python3", "/app/code.py"] + + # Check security context + sec_context = container.security_context + assert sec_context.run_as_non_root is True + assert sec_context.run_as_user == 1001 + assert sec_context.allow_privilege_escalation is False + assert sec_context.read_only_root_filesystem is True + assert sec_context.capabilities.drop == ["ALL"] From 7b40d2235f573e688118cfce162a54d0b751eda1 Mon Sep 17 00:00:00 2001 From: GWeale Date: Fri, 5 Sep 2025 17:13:02 -0700 Subject: [PATCH 26/30] fix(gke_code_executor): add annotations to the fil --- src/google/adk/code_executors/gke_code_executor.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index 81b4bdceaf..4d4bc902c7 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -1,3 +1,17 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from __future__ import annotations import logging From 2049d68707ab710630df0d3f7a2a2ba56a76015e Mon Sep 17 00:00:00 2001 From: GWeale Date: Fri, 5 Sep 2025 17:44:26 -0700 Subject: [PATCH 27/30] style(gke): use relative imports; no cli imports #non-breaking --- src/google/adk/code_executors/gke_code_executor.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index 4d4bc902c7..e6a7587933 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -17,15 +17,16 @@ import logging import uuid -from google.adk.agents.invocation_context import InvocationContext -from google.adk.code_executors.base_code_executor import BaseCodeExecutor -from google.adk.code_executors.code_execution_utils import CodeExecutionInput -from google.adk.code_executors.code_execution_utils import CodeExecutionResult from kubernetes import client from kubernetes import config from kubernetes.client.rest import ApiException from kubernetes.watch import Watch +from ..agents.invocation_context import InvocationContext +from .base_code_executor import BaseCodeExecutor +from .code_execution_utils import CodeExecutionInput +from .code_execution_utils import CodeExecutionResult + logger = logging.getLogger("google_adk." + __name__) From adb2ad47ed5e46206509fa352287943a66123d33 Mon Sep 17 00:00:00 2001 From: GWeale Date: Fri, 5 Sep 2025 17:47:48 -0700 Subject: [PATCH 28/30] style(gke): make imports relative; avoid cli import pattern #non-breaking --- .../adk/code_executors/gke_code_executor.py | 70 ++++++++++--------- 1 file changed, 36 insertions(+), 34 deletions(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index e6a7587933..b15c936852 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -17,9 +17,7 @@ import logging import uuid -from kubernetes import client -from kubernetes import config -from kubernetes.client.rest import ApiException +import kubernetes as k8s from kubernetes.watch import Watch from ..agents.invocation_context import InvocationContext @@ -76,8 +74,8 @@ class GkeCodeExecutor(BaseCodeExecutor): kubeconfig_path: str | None = None kubeconfig_context: str | None = None - _batch_v1: client.BatchV1Api - _core_v1: client.CoreV1Api + _batch_v1: k8s.client.BatchV1Api + _core_v1: k8s.client.CoreV1Api def __init__( self, @@ -99,10 +97,10 @@ def __init__( if self.kubeconfig_path: try: logger.info(f"Using explicit kubeconfig from '{self.kubeconfig_path}'.") - config.load_kube_config( + k8s.config.load_kube_config( config_file=self.kubeconfig_path, context=self.kubeconfig_context ) - except config.ConfigException as e: + except k8s.config.ConfigException as e: logger.error( f"Failed to load explicit kubeconfig from {self.kubeconfig_path}", exc_info=True, @@ -112,16 +110,16 @@ def __init__( ) from e else: try: - config.load_incluster_config() + k8s.config.load_incluster_config() logger.info("Using in-cluster Kubernetes configuration.") - except config.ConfigException: + except k8s.config.ConfigException: try: logger.info( "In-cluster config not found. Falling back to default local" " kubeconfig." ) - config.load_kube_config() - except config.ConfigException as e: + k8s.config.load_kube_config() + except k8s.config.ConfigException as e: logger.error( "Could not configure Kubernetes client automatically.", exc_info=True, @@ -130,8 +128,8 @@ def __init__( "Failed to find any valid Kubernetes configuration." ) from e - self._batch_v1 = client.BatchV1Api() - self._core_v1 = client.CoreV1Api() + self._batch_v1 = k8s.client.BatchV1Api() + self._core_v1 = k8s.client.CoreV1Api() def execute_code( self, @@ -161,7 +159,7 @@ def execute_code( ) return self._watch_job_completion(job_name) - except ApiException as e: + except k8s.client.exceptions.ApiException as e: logger.error( "A Kubernetes API error occurred during job" f" '{job_name}': {e.reason}", @@ -187,44 +185,46 @@ def _create_job_manifest( job_name: str, configmap_name: str, invocation_context: InvocationContext, - ) -> client.V1Job: + ) -> k8s.client.V1Job: """Creates the complete V1Job object with security best practices.""" # Define the container that will run the code. - container = client.V1Container( + container = k8s.client.V1Container( name="code-runner", image=self.image, command=["python3", "/app/code.py"], volume_mounts=[ - client.V1VolumeMount(name="code-volume", mount_path="/app") + k8s.client.V1VolumeMount(name="code-volume", mount_path="/app") ], # Enforce a strict security context. - security_context=client.V1SecurityContext( + security_context=k8s.client.V1SecurityContext( run_as_non_root=True, run_as_user=1001, allow_privilege_escalation=False, read_only_root_filesystem=True, - capabilities=client.V1Capabilities(drop=["ALL"]), + capabilities=k8s.client.V1Capabilities(drop=["ALL"]), ), # Set resource limits to prevent abuse. - resources=client.V1ResourceRequirements( + resources=k8s.client.V1ResourceRequirements( requests={"cpu": self.cpu_requested, "memory": self.mem_requested}, limits={"cpu": self.cpu_limit, "memory": self.mem_limit}, ), ) # Use tolerations to request a gVisor node. - pod_spec = client.V1PodSpec( + pod_spec = k8s.client.V1PodSpec( restart_policy="Never", containers=[container], volumes=[ - client.V1Volume( + k8s.client.V1Volume( name="code-volume", - config_map=client.V1ConfigMapVolumeSource(name=configmap_name), + config_map=k8s.client.V1ConfigMapVolumeSource( + name=configmap_name + ), ) ], runtime_class_name="gvisor", # Request the gVisor runtime. tolerations=[ - client.V1Toleration( + k8s.client.V1Toleration( key="sandbox.gke.io/runtime", operator="Equal", value="gvisor", @@ -233,8 +233,8 @@ def _create_job_manifest( ], ) - job_spec = client.V1JobSpec( - template=client.V1PodTemplateSpec(spec=pod_spec), + job_spec = k8s.client.V1JobSpec( + template=k8s.client.V1PodTemplateSpec(spec=pod_spec), backoff_limit=0, # Do not retry the Job on failure. # Kubernetes TTL controller will handle Job/Pod cleanup. ttl_seconds_after_finished=600, # Garbage collect after 10 minutes. @@ -244,10 +244,12 @@ def _create_job_manifest( annotations = { "adk.agent.google.com/invocation-id": invocation_context.invocation_id } - return client.V1Job( + return k8s.client.V1Job( api_version="batch/v1", kind="Job", - metadata=client.V1ObjectMeta(name=job_name, annotations=annotations), + metadata=k8s.client.V1ObjectMeta( + name=job_name, annotations=annotations + ), spec=job_spec, ) @@ -301,25 +303,25 @@ def _get_pod_logs(self, job_name: str) -> str: return self._core_v1.read_namespaced_pod_log( name=pod_name, namespace=self.namespace ) - except ApiException as e: + except k8s.client.exceptions.ApiException as e: raise RuntimeError( f"API error retrieving logs for job '{job_name}': {e.reason}" ) from e def _create_code_configmap(self, name: str, code: str) -> None: """Creates a ConfigMap to hold the Python code.""" - body = client.V1ConfigMap( - metadata=client.V1ObjectMeta(name=name), data={"code.py": code} + body = k8s.client.V1ConfigMap( + metadata=k8s.client.V1ObjectMeta(name=name), data={"code.py": code} ) self._core_v1.create_namespaced_config_map( namespace=self.namespace, body=body ) def _add_owner_reference( - self, owner_job: client.V1Job, configmap_name: str + self, owner_job: k8s.client.V1Job, configmap_name: str ) -> None: """Patches the ConfigMap to be owned by the Job for auto-cleanup.""" - owner_reference = client.V1OwnerReference( + owner_reference = k8s.client.V1OwnerReference( api_version=owner_job.api_version, kind=owner_job.kind, name=owner_job.metadata.name, @@ -338,7 +340,7 @@ def _add_owner_reference( f"Set Job '{owner_job.metadata.name}' as owner of ConfigMap" f" '{configmap_name}'." ) - except ApiException as e: + except k8s.client.exceptions.ApiException as e: logger.warning( f"Failed to set ownerReference on ConfigMap '{configmap_name}'. " f"Manual cleanup is required. Reason: {e.reason}" From 75814e941bacba1a5e74cb3c3079b8ab2c09efc1 Mon Sep 17 00:00:00 2001 From: GWeale Date: Fri, 5 Sep 2025 17:54:46 -0700 Subject: [PATCH 29/30] test(gke): expose client/config for monkeypatch; fix imports #non-breaking --- .../adk/code_executors/gke_code_executor.py | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index b15c936852..c9ba4e9ce5 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -25,6 +25,11 @@ from .code_execution_utils import CodeExecutionInput from .code_execution_utils import CodeExecutionResult +# Expose these for tests to monkeypatch. +client = k8s.client +config = k8s.config +ApiException = k8s.client.exceptions.ApiException + logger = logging.getLogger("google_adk." + __name__) @@ -97,10 +102,10 @@ def __init__( if self.kubeconfig_path: try: logger.info(f"Using explicit kubeconfig from '{self.kubeconfig_path}'.") - k8s.config.load_kube_config( + config.load_kube_config( config_file=self.kubeconfig_path, context=self.kubeconfig_context ) - except k8s.config.ConfigException as e: + except config.ConfigException as e: logger.error( f"Failed to load explicit kubeconfig from {self.kubeconfig_path}", exc_info=True, @@ -110,16 +115,16 @@ def __init__( ) from e else: try: - k8s.config.load_incluster_config() + config.load_incluster_config() logger.info("Using in-cluster Kubernetes configuration.") - except k8s.config.ConfigException: + except config.ConfigException: try: logger.info( "In-cluster config not found. Falling back to default local" " kubeconfig." ) - k8s.config.load_kube_config() - except k8s.config.ConfigException as e: + config.load_kube_config() + except config.ConfigException as e: logger.error( "Could not configure Kubernetes client automatically.", exc_info=True, @@ -128,8 +133,8 @@ def __init__( "Failed to find any valid Kubernetes configuration." ) from e - self._batch_v1 = k8s.client.BatchV1Api() - self._core_v1 = k8s.client.CoreV1Api() + self._batch_v1 = client.BatchV1Api() + self._core_v1 = client.CoreV1Api() def execute_code( self, @@ -159,7 +164,7 @@ def execute_code( ) return self._watch_job_completion(job_name) - except k8s.client.exceptions.ApiException as e: + except ApiException as e: logger.error( "A Kubernetes API error occurred during job" f" '{job_name}': {e.reason}", @@ -303,7 +308,7 @@ def _get_pod_logs(self, job_name: str) -> str: return self._core_v1.read_namespaced_pod_log( name=pod_name, namespace=self.namespace ) - except k8s.client.exceptions.ApiException as e: + except ApiException as e: raise RuntimeError( f"API error retrieving logs for job '{job_name}': {e.reason}" ) from e @@ -340,7 +345,7 @@ def _add_owner_reference( f"Set Job '{owner_job.metadata.name}' as owner of ConfigMap" f" '{configmap_name}'." ) - except k8s.client.exceptions.ApiException as e: + except ApiException as e: logger.warning( f"Failed to set ownerReference on ConfigMap '{configmap_name}'. " f"Manual cleanup is required. Reason: {e.reason}" From cd6d518b437a98f1158fb1b963b52af40dbcfa34 Mon Sep 17 00:00:00 2001 From: GWeale Date: Fri, 5 Sep 2025 18:51:13 -0700 Subject: [PATCH 30/30] chore(pyproject): keep-sorted ordering for kubernetes entries #non-breaking --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5706e3a8aa..c01bba8519 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -113,10 +113,10 @@ test = [ "kubernetes>=29.0.0", # For GkeCodeExecutor "llama-index-readers-file>=0.4.0", # For retrieval tests "openai>=1.100.2", # For LiteLLM + "pytest>=8.3.4", "pytest-asyncio>=0.25.0", "pytest-mock>=3.14.0", "pytest-xdist>=3.6.1", - "pytest>=8.3.4", "python-multipart>=0.0.9", "rouge-score>=0.1.2", "tabulate>=0.9.0",