Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,17 @@ private void processArrowStreamAsync(
}

} catch (Exception e) {
throw BigQueryException.translateAndThrow(e);
if (e instanceof InterruptedException
|| e.getCause() instanceof InterruptedException
|| e instanceof com.google.api.gax.rpc.CancelledException) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I think the changes make sense, I just have a few questions:

  1. Can you remind me why do we need check the type for Gax's CancelledException here?
  2. We do check the for the Thread's interrupted status in L1064-1065. The subsequent logic calls the Reader's processRows (
    } catch (RuntimeException | InterruptedException e) {
    throw BigQueryException.translateAndThrow(e);
    ) and I believe we also should set Thread.currentThread().interrupt() when we catch the InterruptedException there

IIUC the PR and the above correctly, then I don't think we need to explicitly catch for InterruptedException here as all interruption states should be set properly and handled cooperatively. Let me know your thoughts and if I see this incorrectly.

Copy link
Author

@SivamuruganP SivamuruganP Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! Here is the reasoning behind these checks:

  1. Since this logic relies on gRPC (via the Arrow reader), canceling the context or closing the connection often manifests as a CancelledException rather than InterruptedException. If we don't catch it explicitly, it falls through to the else block and gets logged as an error.

  2. when connection.close() is called, the thread is usually blocked inside reader.processRows(). This blocking call immediately throws an exception upon interruption, effectively breaking out of the loop before execution can return to L1064.

Below is exception stack trace I get when running testConnectionTest locally. You can see it hits translateAndThrow instead of exiting the loop gracefully:

Exception in thread "pool-1-thread-1" com.google.cloud.bigquery.BigQueryException: java.lang.InterruptedException
	at com.google.cloud.bigquery.BigQueryException.translateAndThrow(BigQueryException.java:139)
	at com.google.cloud.bigquery.ConnectionImpl.lambda$processArrowStreamAsync$7(ConnectionImpl.java:1072)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.lang.InterruptedException
	at java.base/java.util.concurrent.locks.ReentrantLock$Sync.lockInterruptibly(ReentrantLock.java:159)
	at java.base/java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:372)
	at java.base/java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:417)
	at com.google.api.gax.rpc.QueuingResponseObserver.getNext(QueuingResponseObserver.java:70)
	at com.google.api.gax.rpc.ServerStreamIterator.hasNext(ServerStreamIterator.java:96)
	at com.google.cloud.bigquery.ConnectionImpl.lambda$processArrowStreamAsync$7(ConnectionImpl.java:1063)
	... 3 more

// Log silently and let it fall through to 'finally' for cleanup.
// This is the "graceful shutdown".
logger.log(
Level.INFO, "Background thread interrupted (Connection Closed). Stopping.");
Thread.currentThread().interrupt();
} else {
throw BigQueryException.translateAndThrow(e);
}
} finally { // logic needed for graceful shutdown
// marking end of stream
try {
Expand Down
Loading