From e5b3d273333a9026c865c826d2035f05a71079b8 Mon Sep 17 00:00:00 2001 From: Yukihiro Okada Date: Mon, 4 Aug 2025 12:40:30 +0900 Subject: [PATCH] Fix TestTDClient.testBulkImport to mitigate the failure --- .../com/treasuredata/client/TestTDClient.java | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/src/test/java/com/treasuredata/client/TestTDClient.java b/src/test/java/com/treasuredata/client/TestTDClient.java index 241a9c1e..46000e08 100644 --- a/src/test/java/com/treasuredata/client/TestTDClient.java +++ b/src/test/java/com/treasuredata/client/TestTDClient.java @@ -1247,12 +1247,38 @@ public void testBulkImport() bs = client.getBulkImportSession(session); } - // Check the data - TDTable imported = client.listTables(SAMPLE_DB).stream().filter(input -> { - return input.getName().equals(bulkImportTable); - }).findFirst().get(); + // Wait for data propagation after commit completion + logger.info("Bulk import committed, waiting for data propagation"); + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); - assertEquals(numRowsInPart * 2, imported.getRowCount()); + // Check the data with retry logic + final int expectedRowCount = numRowsInPart * 2; + logger.info("Expected row count: {}, Valid parts: {}, numRowsInPart: {}", expectedRowCount, numValidParts, numRowsInPart); + + TDTable imported = null; + long rowCountCheckDeadline = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2); + BackOff rowCountBackoff = new ExponentialBackOff(); + + while (System.currentTimeMillis() < rowCountCheckDeadline) { + imported = client.listTables(SAMPLE_DB).stream().filter(input -> { + return input.getName().equals(bulkImportTable); + }).findFirst().get(); + + logger.info("Current row count: {}, expected: {}", imported.getRowCount(), expectedRowCount); + + if (imported.getRowCount() == expectedRowCount) { + break; + } + else if (imported.getRowCount() > 0) { + logger.warn("Row count mismatch: expected {}, got {}", expectedRowCount, imported.getRowCount()); + break; + } + + logger.debug("Row count still 0, retrying after backoff"); + Thread.sleep(rowCountBackoff.nextWaitTimeMillis()); + } + + assertEquals(expectedRowCount, imported.getRowCount()); List columns = imported.getColumns(); logger.info(columns.stream().map(TDColumn::toString).collect(Collectors.joining(", "))); assertEquals(2, columns.size()); // event, description, (time)