-
Notifications
You must be signed in to change notification settings - Fork 27
[PECOBLR-1131] Fix incorrect refetching of expired CloudFetch links when using Thrift protocol. #1066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PECOBLR-1131] Fix incorrect refetching of expired CloudFetch links when using Thrift protocol. #1066
Changes from all commits
e2000ad
84e129d
7d198ec
581b98d
fbedf44
58eaec5
60fc546
0794307
0e633e3
070069e
4d79420
13b33a0
51fca9e
ece4152
fdb10bd
eca4f03
25b7e5d
933fbfc
6bd358f
ba249e9
c649d57
44453e8
3420835
9eacee8
ddf4ed7
386736e
b99bc72
0ad607d
262b0c7
5e6ef1b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -215,15 +215,18 @@ private void triggerNextBatchDownload() { | |
| return; | ||
| } | ||
|
|
||
| // Calculate row offset for this batch | ||
| final long batchStartRowOffset = getChunkStartRowOffset(batchStartIndex); | ||
|
|
||
| LOGGER.info("Starting batch download from index {}", batchStartIndex); | ||
| currentDownloadTask = | ||
| CompletableFuture.runAsync( | ||
| () -> { | ||
| try { | ||
| // rowOffset is 0 here as this service is used by RemoteChunkProvider (SEA-only) | ||
| // which fetches by chunkIndex, not rowOffset | ||
| ChunkLinkFetchResult result = | ||
| session.getDatabricksClient().getResultChunks(statementId, batchStartIndex, 0); | ||
| session | ||
| .getDatabricksClient() | ||
| .getResultChunks(statementId, batchStartIndex, batchStartRowOffset); | ||
| LOGGER.info( | ||
| "Retrieved {} links for batch starting at {} for statement id {}", | ||
| result.getChunkLinks().size(), | ||
|
|
@@ -419,6 +422,28 @@ private void prepareNewBatchDownload(long startIndex) { | |
| isDownloadChainStarted.set(false); | ||
| } | ||
|
|
||
| /** | ||
| * Gets the start row offset for a given chunk index. | ||
| * | ||
| * @param chunkIndex the chunk index to get the row offset for | ||
| * @return the start row offset for the chunk | ||
| */ | ||
| private long getChunkStartRowOffset(long chunkIndex) { | ||
| T chunk = chunkIndexToChunksMap.get(chunkIndex); | ||
| if (chunk == null) { | ||
| // Should never happen. | ||
| throw new IllegalStateException( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's throw
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could create a new exception stating |
||
| "Chunk not found in map for index " | ||
| + chunkIndex | ||
| + ". " | ||
| + "Total chunks: " | ||
| + totalChunks | ||
| + ", StatementId: " | ||
| + statementId); | ||
| } | ||
| return chunk.getStartRowOffset(); | ||
| } | ||
|
|
||
| private boolean isChunkLinkExpired(ExternalLink link) { | ||
| if (link == null || link.getExpiration() == null) { | ||
| LOGGER.warn("Link or expiration is null, assuming link is expired"); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -94,7 +94,6 @@ public static void verifySuccessStatus(TStatus status, String errorContext, Stri | |
| "Error thrift response received [%s] for statementId [%s]", | ||
| errorContext, statementId) | ||
| : String.format("Error thrift response received [%s]", errorContext); | ||
| LOGGER.error(errorMessage); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why removing the log? We cannot rely on application that it will be logging correctly |
||
| throw new DatabricksHttpException(errorMessage, status.getSqlState()); | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's log this