diff --git a/contributing/samples/code_execution/gke_sandbox_agent.py b/contributing/samples/code_execution/gke_sandbox_agent.py new file mode 100644 index 0000000000..4baaf52152 --- /dev/null +++ b/contributing/samples/code_execution/gke_sandbox_agent.py @@ -0,0 +1,49 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A Python coding agent using the GkeCodeExecutor for secure execution.""" + +from google.adk.agents import LlmAgent +from google.adk.code_executors import GkeCodeExecutor + + +def gke_agent_system_instruction(): + """Returns: The system instruction for the GKE-based coding agent.""" + return """You are a helpful and capable AI agent that can write and execute Python code to answer questions and perform tasks. + +When a user asks a question, follow these steps: +1. Analyze the request. +2. Write a complete, self-contained Python script to accomplish the task. +3. Your code will be executed in a secure, sandboxed environment. +4. Return the full and complete output from the code execution, including any text, results, or error messages.""" + + +gke_executor = GkeCodeExecutor( + # This must match the namespace in your deployment_rbac.yaml where the + # agent's ServiceAccount and Role have permissions. + namespace="agent-sandbox", + # Setting an explicit timeout prevents a stuck job from running forever. + timeout_seconds=600, +) + +root_agent = LlmAgent( + name="gke_coding_agent", + model="gemini-2.0-flash", + description=( + "A general-purpose agent that executes Python code in a secure GKE" + " Sandbox." + ), + instruction=gke_agent_system_instruction(), + code_executor=gke_executor, +) diff --git a/contributing/samples/gke_agent_sandbox/deployment_rbac.yaml b/contributing/samples/gke_agent_sandbox/deployment_rbac.yaml new file mode 100644 index 0000000000..16572276d1 --- /dev/null +++ b/contributing/samples/gke_agent_sandbox/deployment_rbac.yaml @@ -0,0 +1,50 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: agent-sandbox +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: adk-agent-sa + namespace: agent-sandbox +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: adk-agent-role + namespace: agent-sandbox +rules: +- apiGroups: ["batch"] + resources: ["jobs"] + # create: Needed for _batch_v1.create_namespaced_job(). + # watch: Needed for watch.stream(self._batch_v1.list_namespaced_job, ...) to wait for completion + # list/get: Required for the watch to initialize and to get job details. + verbs: ["create", "get", "watch", "list", "delete"] +- apiGroups: [""] + resources: ["configmaps"] + # create: Needed mount the agent's code into the Job's Pod. + # delete: Needed for cleanup in the finally block + verbs: ["create", "get", "list", "delete"] +- apiGroups: [""] + resources: ["pods"] + # list: Needed to find the correct Pod _core_v1.list_namespaced_pod(label_selector=...) + verbs: ["get", "list", "delete"] +- apiGroups: [""] + # get: Needed for _core_v1.read_namespaced_pod_log() to get the code execution results and logs. + resources: ["pods/log"] + verbs: ["get", "list"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: adk-agent-binding + namespace: agent-sandbox +subjects: +- kind: ServiceAccount + name: adk-agent-sa + namespace: agent-sandbox +roleRef: + kind: Role + name: adk-agent-role + apiGroup: rbac.authorization.k8s.io diff --git a/pyproject.toml b/pyproject.toml index 269e1bc22a..c01bba8519 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,12 +110,13 @@ test = [ "langchain-community>=0.3.17", "langgraph>=0.2.60, <= 0.4.10", # For LangGraphAgent "litellm>=1.75.5, <2.0.0", # For LiteLLM tests + "kubernetes>=29.0.0", # For GkeCodeExecutor "llama-index-readers-file>=0.4.0", # For retrieval tests "openai>=1.100.2", # For LiteLLM + "pytest>=8.3.4", "pytest-asyncio>=0.25.0", "pytest-mock>=3.14.0", "pytest-xdist>=3.6.1", - "pytest>=8.3.4", "python-multipart>=0.0.9", "rouge-score>=0.1.2", "tabulate>=0.9.0", @@ -139,6 +140,7 @@ extensions = [ "docker>=7.0.0", # For ContainerCodeExecutor "langgraph>=0.2.60", # For LangGraphAgent "litellm>=1.75.5", # For LiteLlm class. Currently has OpenAI limitations. TODO: once LiteLlm fix it + "kubernetes>=29.0.0", # For GkeCodeExecutor "llama-index-readers-file>=0.4.0", # For retrieval using LlamaIndex. "llama-index-embeddings-google-genai>=0.3.0",# For files retrieval using LlamaIndex. "lxml>=5.3.0", # For load_web_page tool. diff --git a/src/google/adk/code_executors/__init__.py b/src/google/adk/code_executors/__init__.py index 12b6d870ad..41d406f63d 100644 --- a/src/google/adk/code_executors/__init__.py +++ b/src/google/adk/code_executors/__init__.py @@ -28,6 +28,7 @@ 'UnsafeLocalCodeExecutor', 'VertexAiCodeExecutor', 'ContainerCodeExecutor', + 'GkeCodeExecutor', ] @@ -52,4 +53,14 @@ def __getattr__(name: str): 'ContainerCodeExecutor requires additional dependencies. ' 'Please install with: pip install "google-adk[extensions]"' ) from e + elif name == 'GkeCodeExecutor': + try: + from .gke_code_executor import GkeCodeExecutor + + return GkeCodeExecutor + except ImportError as e: + raise ImportError( + 'GkeCodeExecutor requires additional dependencies. ' + 'Please install with: pip install "google-adk[extensions]"' + ) from e raise AttributeError(f"module '{__name__}' has no attribute '{name}'") diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py new file mode 100644 index 0000000000..c9ba4e9ce5 --- /dev/null +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -0,0 +1,352 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +import uuid + +import kubernetes as k8s +from kubernetes.watch import Watch + +from ..agents.invocation_context import InvocationContext +from .base_code_executor import BaseCodeExecutor +from .code_execution_utils import CodeExecutionInput +from .code_execution_utils import CodeExecutionResult + +# Expose these for tests to monkeypatch. +client = k8s.client +config = k8s.config +ApiException = k8s.client.exceptions.ApiException + +logger = logging.getLogger("google_adk." + __name__) + + +class GkeCodeExecutor(BaseCodeExecutor): + """Executes Python code in a secure gVisor-sandboxed Pod on GKE. + + This executor securely runs code by dynamically creating a Kubernetes Job for + each execution request. The user's code is mounted via a ConfigMap, and the + Pod is hardened with a strict security context and resource limits. + + Key Features: + - Sandboxed execution using the gVisor runtime. + - Ephemeral, per-execution environments using Kubernetes Jobs. + - Secure-by-default Pod configuration (non-root, no privileges). + - Automatic garbage collection of completed Jobs and Pods via TTL. + - Efficient, event-driven waiting using the Kubernetes watch API. + + RBAC Permissions: + This executor requires a ServiceAccount with specific RBAC permissions. The + Role granted to the ServiceAccount must include rules to manage Jobs, + ConfigMaps, and Pod logs. Below is a minimal set of required permissions: + + rules: + # For creating/deleting code ConfigMaps and patching ownerReferences + - apiGroups: [""] # Core API Group + resources: ["configmaps"] + verbs: ["create", "delete", "get", "patch"] + # For watching Job completion status + - apiGroups: ["batch"] + resources: ["jobs"] + verbs: ["get", "list", "watch", "create", "delete"] + # For retrieving logs from the completed Job's Pod + - apiGroups: [""] # Core API Group + resources: ["pods", "pods/log"] + verbs: ["get", "list"] + """ + + namespace: str = "default" + image: str = "python:3.11-slim" + timeout_seconds: int = 300 + cpu_requested: str = "200m" + mem_requested: str = "256Mi" + # The maximum CPU the container can use, in "millicores". 1000m is 1 full CPU core. + cpu_limit: str = "500m" + mem_limit: str = "512Mi" + + kubeconfig_path: str | None = None + kubeconfig_context: str | None = None + + _batch_v1: k8s.client.BatchV1Api + _core_v1: k8s.client.CoreV1Api + + def __init__( + self, + kubeconfig_path: str | None = None, + kubeconfig_context: str | None = None, + **data, + ): + """Initializes the executor and the Kubernetes API clients. + + This constructor supports multiple authentication methods: + 1. Explicitly via a kubeconfig file path and context. + 2. Automatically via in-cluster service account (when running in GKE). + 3. Automatically via the default local kubeconfig file (~/.kube/config). + """ + super().__init__(**data) + self.kubeconfig_path = kubeconfig_path + self.kubeconfig_context = kubeconfig_context + + if self.kubeconfig_path: + try: + logger.info(f"Using explicit kubeconfig from '{self.kubeconfig_path}'.") + config.load_kube_config( + config_file=self.kubeconfig_path, context=self.kubeconfig_context + ) + except config.ConfigException as e: + logger.error( + f"Failed to load explicit kubeconfig from {self.kubeconfig_path}", + exc_info=True, + ) + raise RuntimeError( + "Failed to configure Kubernetes client from provided path." + ) from e + else: + try: + config.load_incluster_config() + logger.info("Using in-cluster Kubernetes configuration.") + except config.ConfigException: + try: + logger.info( + "In-cluster config not found. Falling back to default local" + " kubeconfig." + ) + config.load_kube_config() + except config.ConfigException as e: + logger.error( + "Could not configure Kubernetes client automatically.", + exc_info=True, + ) + raise RuntimeError( + "Failed to find any valid Kubernetes configuration." + ) from e + + self._batch_v1 = client.BatchV1Api() + self._core_v1 = client.CoreV1Api() + + def execute_code( + self, + invocation_context: InvocationContext, + code_execution_input: CodeExecutionInput, + ) -> CodeExecutionResult: + """Orchestrates the secure execution of a code snippet on GKE.""" + job_name = f"adk-exec-{uuid.uuid4().hex[:10]}" + configmap_name = f"code-src-{job_name}" + + try: + # The execution process: + # 1. Create a ConfigMap to mount LLM-generated code into the Pod. + # 2. Create a Job that runs the code from the ConfigMap. + # 3. Set the Job as the ConfigMap's owner for automatic cleanup. + self._create_code_configmap(configmap_name, code_execution_input.code) + job_manifest = self._create_job_manifest( + job_name, configmap_name, invocation_context + ) + created_job = self._batch_v1.create_namespaced_job( + body=job_manifest, namespace=self.namespace + ) + self._add_owner_reference(created_job, configmap_name) + + logger.info( + f"Submitted Job '{job_name}' to namespace '{self.namespace}'." + ) + return self._watch_job_completion(job_name) + + except ApiException as e: + logger.error( + "A Kubernetes API error occurred during job" + f" '{job_name}': {e.reason}", + exc_info=True, + ) + return CodeExecutionResult(stderr=f"Kubernetes API error: {e.reason}") + except TimeoutError as e: + logger.error(e, exc_info=True) + logs = self._get_pod_logs(job_name) + stderr = f"Executor timed out: {e}\n\nPod Logs:\n{logs}" + return CodeExecutionResult(stderr=stderr) + except Exception as e: + logger.error( + f"An unexpected error occurred during job '{job_name}': {e}", + exc_info=True, + ) + return CodeExecutionResult( + stderr=f"An unexpected executor error occurred: {e}" + ) + + def _create_job_manifest( + self, + job_name: str, + configmap_name: str, + invocation_context: InvocationContext, + ) -> k8s.client.V1Job: + """Creates the complete V1Job object with security best practices.""" + # Define the container that will run the code. + container = k8s.client.V1Container( + name="code-runner", + image=self.image, + command=["python3", "/app/code.py"], + volume_mounts=[ + k8s.client.V1VolumeMount(name="code-volume", mount_path="/app") + ], + # Enforce a strict security context. + security_context=k8s.client.V1SecurityContext( + run_as_non_root=True, + run_as_user=1001, + allow_privilege_escalation=False, + read_only_root_filesystem=True, + capabilities=k8s.client.V1Capabilities(drop=["ALL"]), + ), + # Set resource limits to prevent abuse. + resources=k8s.client.V1ResourceRequirements( + requests={"cpu": self.cpu_requested, "memory": self.mem_requested}, + limits={"cpu": self.cpu_limit, "memory": self.mem_limit}, + ), + ) + + # Use tolerations to request a gVisor node. + pod_spec = k8s.client.V1PodSpec( + restart_policy="Never", + containers=[container], + volumes=[ + k8s.client.V1Volume( + name="code-volume", + config_map=k8s.client.V1ConfigMapVolumeSource( + name=configmap_name + ), + ) + ], + runtime_class_name="gvisor", # Request the gVisor runtime. + tolerations=[ + k8s.client.V1Toleration( + key="sandbox.gke.io/runtime", + operator="Equal", + value="gvisor", + effect="NoSchedule", + ) + ], + ) + + job_spec = k8s.client.V1JobSpec( + template=k8s.client.V1PodTemplateSpec(spec=pod_spec), + backoff_limit=0, # Do not retry the Job on failure. + # Kubernetes TTL controller will handle Job/Pod cleanup. + ttl_seconds_after_finished=600, # Garbage collect after 10 minutes. + ) + + # Assemble and return the final Job object. + annotations = { + "adk.agent.google.com/invocation-id": invocation_context.invocation_id + } + return k8s.client.V1Job( + api_version="batch/v1", + kind="Job", + metadata=k8s.client.V1ObjectMeta( + name=job_name, annotations=annotations + ), + spec=job_spec, + ) + + def _watch_job_completion(self, job_name: str) -> CodeExecutionResult: + """Uses the watch API to efficiently wait for job completion.""" + watch = Watch() + try: + for event in watch.stream( + self._batch_v1.list_namespaced_job, + namespace=self.namespace, + field_selector=f"metadata.name={job_name}", + timeout_seconds=self.timeout_seconds, + ): + job = event["object"] + if job.status.succeeded: + watch.stop() + logger.info(f"Job '{job_name}' succeeded.") + logs = self._get_pod_logs(job_name) + return CodeExecutionResult(stdout=logs) + if job.status.failed: + watch.stop() + logger.error(f"Job '{job_name}' failed.") + logs = self._get_pod_logs(job_name) + return CodeExecutionResult(stderr=f"Job failed. Logs:\n{logs}") + + # If the loop finishes without returning, the watch timed out. + raise TimeoutError( + f"Job '{job_name}' did not complete within {self.timeout_seconds}s." + ) + finally: + watch.stop() + + def _get_pod_logs(self, job_name: str) -> str: + """Retrieves logs from the pod created by the specified job. + + Raises: + RuntimeError: If the pod cannot be found or logs cannot be fetched. + """ + try: + pods = self._core_v1.list_namespaced_pod( + namespace=self.namespace, + label_selector=f"job-name={job_name}", + limit=1, + ) + if not pods.items: + raise RuntimeError( + f"Could not find Pod for Job '{job_name}' to retrieve logs." + ) + + pod_name = pods.items[0].metadata.name + return self._core_v1.read_namespaced_pod_log( + name=pod_name, namespace=self.namespace + ) + except ApiException as e: + raise RuntimeError( + f"API error retrieving logs for job '{job_name}': {e.reason}" + ) from e + + def _create_code_configmap(self, name: str, code: str) -> None: + """Creates a ConfigMap to hold the Python code.""" + body = k8s.client.V1ConfigMap( + metadata=k8s.client.V1ObjectMeta(name=name), data={"code.py": code} + ) + self._core_v1.create_namespaced_config_map( + namespace=self.namespace, body=body + ) + + def _add_owner_reference( + self, owner_job: k8s.client.V1Job, configmap_name: str + ) -> None: + """Patches the ConfigMap to be owned by the Job for auto-cleanup.""" + owner_reference = k8s.client.V1OwnerReference( + api_version=owner_job.api_version, + kind=owner_job.kind, + name=owner_job.metadata.name, + uid=owner_job.metadata.uid, + controller=True, + ) + patch_body = {"metadata": {"ownerReferences": [owner_reference.to_dict()]}} + + try: + self._core_v1.patch_namespaced_config_map( + name=configmap_name, + namespace=self.namespace, + body=patch_body, + ) + logger.info( + f"Set Job '{owner_job.metadata.name}' as owner of ConfigMap" + f" '{configmap_name}'." + ) + except ApiException as e: + logger.warning( + f"Failed to set ownerReference on ConfigMap '{configmap_name}'. " + f"Manual cleanup is required. Reason: {e.reason}" + ) diff --git a/tests/unittests/code_executors/test_gke_code_executor.py b/tests/unittests/code_executors/test_gke_code_executor.py new file mode 100644 index 0000000000..5ef99792f3 --- /dev/null +++ b/tests/unittests/code_executors/test_gke_code_executor.py @@ -0,0 +1,227 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import MagicMock +from unittest.mock import patch + +from google.adk.agents.invocation_context import InvocationContext +from google.adk.code_executors.code_execution_utils import CodeExecutionInput +from google.adk.code_executors.gke_code_executor import GkeCodeExecutor +from kubernetes import client +from kubernetes import config +from kubernetes.client.rest import ApiException +import pytest + + +@pytest.fixture +def mock_invocation_context() -> InvocationContext: + """Fixture for a mock InvocationContext.""" + mock = MagicMock(spec=InvocationContext) + mock.invocation_id = "test-invocation-123" + return mock + + +@pytest.fixture(autouse=True) +def mock_k8s_config(): + """Fixture for auto-mocking Kubernetes config loading.""" + with patch( + "google.adk.code_executors.gke_code_executor.config" + ) as mock_config: + # Simulate fallback from in-cluster to kubeconfig + mock_config.ConfigException = config.ConfigException + mock_config.load_incluster_config.side_effect = config.ConfigException + yield mock_config + + +@pytest.fixture +def mock_k8s_clients(): + """Fixture for mock Kubernetes API clients.""" + with patch( + "google.adk.code_executors.gke_code_executor.client" + ) as mock_client_class: + mock_batch_v1 = MagicMock(spec=client.BatchV1Api) + mock_core_v1 = MagicMock(spec=client.CoreV1Api) + mock_client_class.BatchV1Api.return_value = mock_batch_v1 + mock_client_class.CoreV1Api.return_value = mock_core_v1 + yield { + "batch_v1": mock_batch_v1, + "core_v1": mock_core_v1, + } + + +class TestGkeCodeExecutor: + """Unit tests for the GkeCodeExecutor.""" + + def test_init_defaults(self): + """Tests that the executor initializes with correct default values.""" + executor = GkeCodeExecutor() + assert executor.namespace == "default" + assert executor.image == "python:3.11-slim" + assert executor.timeout_seconds == 300 + assert executor.cpu_requested == "200m" + assert executor.mem_limit == "512Mi" + + def test_init_with_overrides(self): + """Tests that class attributes can be overridden at instantiation.""" + executor = GkeCodeExecutor( + namespace="test-ns", + image="custom-python:latest", + timeout_seconds=60, + cpu_limit="1000m", + ) + assert executor.namespace == "test-ns" + assert executor.image == "custom-python:latest" + assert executor.timeout_seconds == 60 + assert executor.cpu_limit == "1000m" + + @patch("google.adk.code_executors.gke_code_executor.Watch") + def test_execute_code_success( + self, + mock_watch, + mock_k8s_clients, + mock_invocation_context, + ): + """Tests the happy path for successful code execution.""" + # Setup Mocks + mock_job = MagicMock() + mock_job.status.succeeded = True + mock_job.status.failed = None + mock_watch.return_value.stream.return_value = [{"object": mock_job}] + + mock_pod_list = MagicMock() + mock_pod_list.items = [MagicMock()] + mock_pod_list.items[0].metadata.name = "test-pod-name" + mock_k8s_clients["core_v1"].list_namespaced_pod.return_value = mock_pod_list + mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = ( + "hello world" + ) + + # Execute + executor = GkeCodeExecutor() + code_input = CodeExecutionInput(code='print("hello world")') + result = executor.execute_code(mock_invocation_context, code_input) + + # Assert + assert result.stdout == "hello world" + assert result.stderr == "" + mock_k8s_clients[ + "core_v1" + ].create_namespaced_config_map.assert_called_once() + mock_k8s_clients["batch_v1"].create_namespaced_job.assert_called_once() + mock_k8s_clients["core_v1"].patch_namespaced_config_map.assert_called_once() + mock_k8s_clients["core_v1"].read_namespaced_pod_log.assert_called_once() + + @patch("google.adk.code_executors.gke_code_executor.Watch") + def test_execute_code_job_failed( + self, + mock_watch, + mock_k8s_clients, + mock_invocation_context, + ): + """Tests the path where the Kubernetes Job fails.""" + mock_job = MagicMock() + mock_job.status.succeeded = None + mock_job.status.failed = True + mock_watch.return_value.stream.return_value = [{"object": mock_job}] + mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = ( + "Traceback...\nValueError: failure" + ) + + executor = GkeCodeExecutor() + result = executor.execute_code( + mock_invocation_context, CodeExecutionInput(code="fail") + ) + + assert result.stdout == "" + assert "Job failed. Logs:" in result.stderr + assert "ValueError: failure" in result.stderr + + def test_execute_code_api_exception( + self, mock_k8s_clients, mock_invocation_context + ): + """Tests handling of an ApiException from the K8s client.""" + mock_k8s_clients["core_v1"].create_namespaced_config_map.side_effect = ( + ApiException(reason="Test API Error") + ) + executor = GkeCodeExecutor() + result = executor.execute_code( + mock_invocation_context, CodeExecutionInput(code="...") + ) + + assert result.stdout == "" + assert "Kubernetes API error: Test API Error" in result.stderr + + @patch("google.adk.code_executors.gke_code_executor.Watch") + def test_execute_code_timeout( + self, + mock_watch, + mock_k8s_clients, + mock_invocation_context, + ): + """Tests the case where the job watch times out.""" + mock_watch.return_value.stream.return_value = ( + [] + ) # Empty stream simulates timeout + mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = ( + "Still running..." + ) + + executor = GkeCodeExecutor(timeout_seconds=1) + result = executor.execute_code( + mock_invocation_context, CodeExecutionInput(code="...") + ) + + assert result.stdout == "" + assert "Executor timed out" in result.stderr + assert "did not complete within 1s" in result.stderr + assert "Pod Logs:\nStill running..." in result.stderr + + def test_create_job_manifest_structure(self, mock_invocation_context): + """Tests the correctness of the generated Job manifest.""" + executor = GkeCodeExecutor(namespace="test-ns", image="test-img:v1") + job = executor._create_job_manifest( + "test-job", "test-cm", mock_invocation_context + ) + + # Check top-level properties + assert isinstance(job, client.V1Job) + assert job.api_version == "batch/v1" + assert job.kind == "Job" + assert job.metadata.name == "test-job" + assert job.spec.backoff_limit == 0 + assert job.spec.ttl_seconds_after_finished == 600 + + # Check pod template properties + pod_spec = job.spec.template.spec + assert pod_spec.restart_policy == "Never" + assert pod_spec.runtime_class_name == "gvisor" + assert len(pod_spec.tolerations) == 1 + assert pod_spec.tolerations[0].value == "gvisor" + assert len(pod_spec.volumes) == 1 + assert pod_spec.volumes[0].name == "code-volume" + assert pod_spec.volumes[0].config_map.name == "test-cm" + + # Check container properties + container = pod_spec.containers[0] + assert container.name == "code-runner" + assert container.image == "test-img:v1" + assert container.command == ["python3", "/app/code.py"] + + # Check security context + sec_context = container.security_context + assert sec_context.run_as_non_root is True + assert sec_context.run_as_user == 1001 + assert sec_context.allow_privilege_escalation is False + assert sec_context.read_only_root_filesystem is True + assert sec_context.capabilities.drop == ["ALL"]