Skip to content

Commit 417947a

Browse files
Vando Pereiraennuite
authored andcommitted
ARROW-17785: [Java][FlightSQL] Add comprehensive tests for CloseSession error suppression
Add unit and integration tests for the error suppression functionality in ArrowFlightSqlClientHandler: - ArrowFlightSqlClientHandlerTest: 18 unit tests covering error detection, logging, and exception handling logic using Mockito and reflection - ArrowFlightSqlClientHandlerIntegrationTest: 4 integration tests with real FlightServer to validate error suppression in realistic scenarios Tests verify that benign gRPC shutdown errors (UNAVAILABLE and INTERNAL with GOAWAY) are properly suppressed while genuine failures are correctly propagated as exceptions.
1 parent fff1e4b commit 417947a

File tree

2 files changed

+516
-0
lines changed

2 files changed

+516
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.driver.jdbc.client;
19+
20+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
21+
import static org.junit.jupiter.api.Assertions.assertThrows;
22+
import static org.junit.jupiter.api.Assertions.assertTrue;
23+
24+
import ch.qos.logback.classic.Level;
25+
import ch.qos.logback.classic.Logger;
26+
import ch.qos.logback.classic.spi.ILoggingEvent;
27+
import ch.qos.logback.core.read.ListAppender;
28+
import java.sql.SQLException;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import org.apache.arrow.driver.jdbc.FlightServerTestExtension;
31+
import org.apache.arrow.flight.CallStatus;
32+
import org.apache.arrow.flight.CloseSessionRequest;
33+
import org.apache.arrow.flight.CloseSessionResult;
34+
import org.apache.arrow.flight.FlightProducer.CallContext;
35+
import org.apache.arrow.flight.FlightProducer.StreamListener;
36+
import org.apache.arrow.flight.FlightRuntimeException;
37+
import org.apache.arrow.flight.sql.NoOpFlightSqlProducer;
38+
import org.apache.arrow.memory.BufferAllocator;
39+
import org.apache.arrow.memory.RootAllocator;
40+
import org.apache.arrow.util.AutoCloseables;
41+
import org.junit.jupiter.api.AfterAll;
42+
import org.junit.jupiter.api.AfterEach;
43+
import org.junit.jupiter.api.BeforeAll;
44+
import org.junit.jupiter.api.BeforeEach;
45+
import org.junit.jupiter.api.Test;
46+
import org.junit.jupiter.api.extension.RegisterExtension;
47+
import org.slf4j.LoggerFactory;
48+
49+
/** Integration tests for {@link ArrowFlightSqlClientHandler} error suppression functionality. */
50+
public class ArrowFlightSqlClientHandlerIntegrationTest {
51+
52+
/** Custom producer that can simulate various error conditions during close operations. */
53+
public static class ErrorSimulatingFlightSqlProducer extends NoOpFlightSqlProducer {
54+
55+
private final AtomicBoolean simulateUnavailableOnClose = new AtomicBoolean(false);
56+
private final AtomicBoolean simulateGoAwayOnClose = new AtomicBoolean(false);
57+
private final AtomicBoolean simulateNonBenignOnClose = new AtomicBoolean(false);
58+
59+
public void setSimulateUnavailableOnClose(boolean simulate) {
60+
simulateUnavailableOnClose.set(simulate);
61+
}
62+
63+
public void setSimulateGoAwayOnClose(boolean simulate) {
64+
simulateGoAwayOnClose.set(simulate);
65+
}
66+
67+
public void setSimulateNonBenignOnClose(boolean simulate) {
68+
simulateNonBenignOnClose.set(simulate);
69+
}
70+
71+
@Override
72+
public void closeSession(CloseSessionRequest request, CallContext context, StreamListener<CloseSessionResult> listener) {
73+
if (simulateUnavailableOnClose.get()) {
74+
listener.onError(CallStatus.UNAVAILABLE.withDescription("Service unavailable during shutdown").toRuntimeException());
75+
return;
76+
}
77+
78+
if (simulateGoAwayOnClose.get()) {
79+
listener.onError(CallStatus.INTERNAL.withDescription("Connection closed after GOAWAY").toRuntimeException());
80+
return;
81+
}
82+
83+
if (simulateNonBenignOnClose.get()) {
84+
listener.onError(CallStatus.UNAUTHENTICATED.withDescription("Authentication failed").toRuntimeException());
85+
return;
86+
}
87+
88+
// Normal successful close - just complete successfully
89+
listener.onCompleted();
90+
}
91+
}
92+
93+
private static final ErrorSimulatingFlightSqlProducer PRODUCER = new ErrorSimulatingFlightSqlProducer();
94+
95+
@RegisterExtension
96+
public static final FlightServerTestExtension FLIGHT_SERVER_TEST_EXTENSION =
97+
FlightServerTestExtension.createStandardTestExtension(PRODUCER);
98+
99+
private static BufferAllocator allocator;
100+
private Logger logger;
101+
private ListAppender<ILoggingEvent> logAppender;
102+
103+
@BeforeAll
104+
public static void setup() {
105+
allocator = new RootAllocator(Long.MAX_VALUE);
106+
}
107+
108+
@AfterAll
109+
public static void tearDown() throws Exception {
110+
AutoCloseables.close(PRODUCER, allocator);
111+
}
112+
113+
@BeforeEach
114+
public void setUp() {
115+
// Set up logging capture
116+
logger = (Logger) LoggerFactory.getLogger(ArrowFlightSqlClientHandler.class);
117+
logAppender = new ListAppender<>();
118+
logAppender.start();
119+
logger.addAppender(logAppender);
120+
logger.setLevel(Level.DEBUG);
121+
122+
// Reset producer state
123+
PRODUCER.setSimulateUnavailableOnClose(false);
124+
PRODUCER.setSimulateGoAwayOnClose(false);
125+
PRODUCER.setSimulateNonBenignOnClose(false);
126+
}
127+
128+
@AfterEach
129+
public void tearDownLogging() {
130+
if (logger != null && logAppender != null) {
131+
logger.detachAppender(logAppender);
132+
}
133+
}
134+
135+
@Test
136+
public void testClose_WithCatalog_UnavailableError_SuppressesException() throws Exception {
137+
// Arrange
138+
PRODUCER.setSimulateUnavailableOnClose(true);
139+
140+
try (ArrowFlightSqlClientHandler client = new ArrowFlightSqlClientHandler.Builder()
141+
.withHost(FLIGHT_SERVER_TEST_EXTENSION.getHost())
142+
.withPort(FLIGHT_SERVER_TEST_EXTENSION.getPort())
143+
.withBufferAllocator(allocator)
144+
.withEncryption(false)
145+
.withCatalog("test-catalog") // This triggers CloseSession RPC
146+
.build()) {
147+
148+
// Act & Assert - close() should not throw despite server error
149+
assertDoesNotThrow(() -> client.close());
150+
}
151+
152+
// Verify error was logged as suppressed
153+
assertTrue(logAppender.list.stream()
154+
.anyMatch(event -> event.getFormattedMessage().contains("closing Flight SQL session")));
155+
}
156+
157+
@Test
158+
public void testClose_WithCatalog_GoAwayError_SuppressesException() throws Exception {
159+
// Arrange
160+
PRODUCER.setSimulateGoAwayOnClose(true);
161+
162+
try (ArrowFlightSqlClientHandler client = new ArrowFlightSqlClientHandler.Builder()
163+
.withHost(FLIGHT_SERVER_TEST_EXTENSION.getHost())
164+
.withPort(FLIGHT_SERVER_TEST_EXTENSION.getPort())
165+
.withBufferAllocator(allocator)
166+
.withEncryption(false)
167+
.withCatalog("test-catalog")
168+
.build()) {
169+
170+
// Act & Assert
171+
assertDoesNotThrow(() -> client.close());
172+
}
173+
174+
// Verify error was logged as suppressed
175+
assertTrue(logAppender.list.stream()
176+
.anyMatch(event -> event.getFormattedMessage().contains("closing Flight SQL session")));
177+
}
178+
179+
@Test
180+
public void testClose_WithCatalog_NonBenignError_ThrowsSQLException() throws Exception {
181+
// Arrange
182+
PRODUCER.setSimulateNonBenignOnClose(true);
183+
184+
ArrowFlightSqlClientHandler client = new ArrowFlightSqlClientHandler.Builder()
185+
.withHost(FLIGHT_SERVER_TEST_EXTENSION.getHost())
186+
.withPort(FLIGHT_SERVER_TEST_EXTENSION.getPort())
187+
.withBufferAllocator(allocator)
188+
.withEncryption(false)
189+
.withCatalog("test-catalog")
190+
.build();
191+
192+
// Act & Assert - non-benign errors should be thrown
193+
SQLException thrown = assertThrows(SQLException.class, () -> client.close());
194+
assertTrue(thrown.getMessage().contains("Failed to close Flight SQL session"));
195+
assertTrue(thrown.getCause() instanceof FlightRuntimeException);
196+
}
197+
198+
@Test
199+
public void testClose_WithoutCatalog_NoCloseSessionCall() throws Exception {
200+
// Arrange - no catalog means no CloseSession RPC
201+
PRODUCER.setSimulateUnavailableOnClose(true); // This won't be triggered
202+
203+
try (ArrowFlightSqlClientHandler client = new ArrowFlightSqlClientHandler.Builder()
204+
.withHost(FLIGHT_SERVER_TEST_EXTENSION.getHost())
205+
.withPort(FLIGHT_SERVER_TEST_EXTENSION.getPort())
206+
.withBufferAllocator(allocator)
207+
.withEncryption(false)
208+
// No catalog set
209+
.build()) {
210+
211+
// Act & Assert - should close successfully without any CloseSession RPC
212+
assertDoesNotThrow(() -> client.close());
213+
}
214+
215+
// Verify no CloseSession-related logging occurred
216+
assertTrue(logAppender.list.stream()
217+
.noneMatch(event -> event.getFormattedMessage().contains("closing Flight SQL session")));
218+
}
219+
}

0 commit comments

Comments
 (0)