From c9849153d6b79c1281299202b5bcb99027240223 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Wed, 18 Feb 2026 14:06:22 +0000 Subject: [PATCH 1/2] fix: flaky Agent to Agent tests --- .../apps/common/AgentExecutorProducer.java | 61 ++++++++++--------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java index b82041143..1db5f2d78 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java @@ -3,10 +3,7 @@ import static io.a2a.server.ServerCallContext.TRANSPORT_KEY; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Produces; @@ -176,49 +173,56 @@ private void handleAgentToAgentTest(RequestContext context, AgentEmitter agentEm /** * Handles delegation by forwarding to another agent via client. + *

+ * Uses blocking client call (streaming=false) which should return the final task state + * synchronously without requiring async callbacks and latches. This simplified approach + * avoids race conditions between event consumption and callback invocation. */ private void handleDelegation(String userInput, TransportProtocol transportProtocol, AgentEmitter agentEmitter) { // Strip "delegate:" prefix String delegatedContent = userInput.substring("delegate:".length()).trim(); - // Create client for same transport + // Create client for same transport (streaming=false for blocking behavior) try (Client client = AgentToAgentClientFactory.createClient(agentCard, transportProtocol)) { agentEmitter.startWork(); - // Set up consumer to capture task result - CountDownLatch latch = new CountDownLatch(1); - AtomicReference resultRef = new AtomicReference<>(); - AtomicReference errorRef = new AtomicReference<>(); - - BiConsumer consumer = - AgentToAgentClientFactory.createTaskCaptureConsumer(resultRef, latch); + // Store the result task from blocking call + AtomicReference taskRef = new AtomicReference<>(); // Delegate to another agent (new task on same server) // Add a marker so the receiving agent knows to complete the task Message delegatedMessage = A2A.toUserMessage("#a2a-delegated#" + delegatedContent); - client.sendMessage(delegatedMessage, List.of(consumer), error -> { - errorRef.set(error); - latch.countDown(); - }); - // Wait for response - if (!latch.await(30, TimeUnit.SECONDS)) { - agentEmitter.fail(new InternalError("Timeout waiting for delegated response")); - return; - } + // Blocking call should return final task synchronously + client.sendMessage(delegatedMessage, List.of((event, card) -> { + if (event instanceof TaskEvent te) { + taskRef.set(te.getTask()); + } else if (event instanceof TaskUpdateEvent tue) { + taskRef.set(tue.getTask()); + } + }), null); - Task delegatedResult = resultRef.get(); + // Blocking call should have completed before returning + Task delegatedResult = taskRef.get(); - // Check for error only if we didn't get a successful result - // (errors can occur after completion due to stream cleanup) - if (delegatedResult == null && errorRef.get() != null) { - agentEmitter.fail(new InternalError("Delegation failed: " + errorRef.get().getMessage())); + if (delegatedResult == null) { + agentEmitter.fail(new InternalError("No result received from blocking delegation call")); return; } - if (delegatedResult == null) { - agentEmitter.fail(new InternalError("No result received from delegation")); + // DIAGNOSTIC: Check if task is actually final + // If blocking call returns non-final task, it indicates a server-side race condition + if (!delegatedResult.status().state().isFinal()) { + String diagnostic = String.format( + "RACE CONDITION DETECTED: Blocking call returned non-final task! " + + "State: %s, TaskId: %s, Artifacts: %d. " + + "This indicates DefaultRequestHandler wait logic failed to synchronize with MainEventBusProcessor.", + delegatedResult.status().state(), + delegatedResult.id(), + delegatedResult.artifacts() != null ? delegatedResult.artifacts().size() : 0); + System.err.println(diagnostic); // Also print to stderr for CI visibility + agentEmitter.fail(new InternalError(diagnostic)); return; } @@ -234,9 +238,6 @@ private void handleDelegation(String userInput, TransportProtocol transportProto agentEmitter.complete(); } catch (A2AClientException e) { agentEmitter.fail(new InternalError("Failed to create client: " + e.getMessage())); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - agentEmitter.fail(new InternalError("Interrupted while waiting for response")); } } From c48c3a2b361aecd4eae3787535d53ab44babbf86 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Wed, 18 Feb 2026 16:17:22 +0000 Subject: [PATCH 2/2] Switch test client to blocking as well --- .../java/io/a2a/server/apps/common/AbstractA2AServerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index 7ecd64cac..75ba229ea 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -2491,7 +2491,7 @@ public void testAgentToAgentDelegation() throws Exception { BiConsumer delegationConsumer = AgentToAgentClientFactory.createTaskCaptureConsumer(delegationResultRef, delegationLatch); - getClient().sendMessage(delegationMessage, List.of(delegationConsumer), error -> { + getNonStreamingClient().sendMessage(delegationMessage, List.of(delegationConsumer), error -> { delegationErrorRef.set(error); delegationLatch.countDown(); });