From bd446fe23e328d1b05ab3139ff8fbc7815220238 Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Mon, 17 Feb 2025 20:45:05 +0100 Subject: [PATCH 1/5] Add logic and tests for ListRuns --- .../com/databricks/sdk/mixin/JobsExt.java | 39 ++++ .../com/databricks/sdk/mixin/JobsExtTest.java | 189 ++++++++++++++++++ 2 files changed, 228 insertions(+) diff --git a/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java b/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java index 8dfd0a7a8..39fd871cd 100644 --- a/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java +++ b/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java @@ -3,6 +3,7 @@ import com.databricks.sdk.core.ApiClient; import com.databricks.sdk.service.jobs.*; import java.util.Collection; +import java.util.Iterator; public class JobsExt extends JobsAPI { @@ -14,6 +15,44 @@ public JobsExt(JobsService mock) { super(mock); } + /** + * List job runs. + * + *

Retrieve a list of runs. If the run has multiple pages of tasks, job_clusters, parameters or + * repair_history, it will paginate through all pages and aggregate the results. + */ + public Iterable listRuns(ListRunsRequest request) { + // fetch runs with limited elements in top level arrays + Iterable runsList = super.listRuns(request); + + if (!request.getExpandTasks()) { + return runsList; + } + + Iterator iterator = runsList.iterator(); + return () -> + new Iterator() { + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public BaseRun next() { + BaseRun run = iterator.next(); + // fully fetch all top level arrays for the run + GetRunRequest getRunRequest = new GetRunRequest().setRunId(run.getRunId()); + Run fullRun = getRun(getRunRequest); + run.setTasks(fullRun.getTasks()); + run.setJobClusters(fullRun.getJobClusters()); + run.setJobParameters(fullRun.getJobParameters()); + run.setRepairHistory(fullRun.getRepairHistory()); + run.setHasMore(false); + return run; + } + }; + } + /** * Wrap the {@code JobsApi.getRun} operation to retrieve paginated content without breaking the * response contract. diff --git a/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java b/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java index 46b04d912..f5ed5be2e 100644 --- a/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java +++ b/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java @@ -202,4 +202,193 @@ private void addJobEnvironments(Job job, String... environmentKeys) { } job.getSettings().setEnvironments(environments); } + + @Test + public void testListRunsWithoutExpandTasks() { + JobsService service = Mockito.mock(JobsService.class); + BaseRun run1 = new BaseRun().setRunId(100L).setJobId(1L); + BaseRun run2 = new BaseRun().setRunId(200L).setJobId(2L); + BaseRun run3 = new BaseRun().setRunId(300L).setJobId(3L); + BaseRun run4 = new BaseRun().setRunId(400L).setJobId(4L); + + List runsOnFirstPage = new ArrayList<>(); + runsOnFirstPage.add(run1); + runsOnFirstPage.add(run2); + List runsOnSecondPage = new ArrayList<>(); + runsOnSecondPage.add(run3); + runsOnSecondPage.add(run4); + when(service.listRuns(any())) + .thenReturn(new ListRunsResponse().setRuns(runsOnFirstPage)) + .thenReturn(new ListRunsResponse().setRuns(runsOnSecondPage)); + JobsExt jobsExt = new JobsExt(service); + + ListRunsRequest request = new ListRunsRequest().setExpandTasks(false); + Iterable runsList = jobsExt.listRuns(request); + + List expectedRunsList = new ArrayList<>(); + expectedRunsList.add(run1); + expectedRunsList.add(run2); + expectedRunsList.add(run3); + expectedRunsList.add(run4); + for (BaseRun run : runsList) { + BaseRun expectedRun = + expectedRunsList.stream() + .filter(e -> e.getRunId().equals(run.getRunId())) + .findFirst() + .orElse(null); + assertEquals(expectedRun, run); + } + verify(service, times(0)).getRun(any()); + } + + @Test + public void testListRuns() { + JobsService service = Mockito.mock(JobsService.class); + BaseRun run1 = new BaseRun().setRunId(100L).setJobId(1L).setHasMore(true); + addTasks(run1, 101L, 102L); + BaseRun run2 = new BaseRun().setRunId(200L).setJobId(2L).setHasMore(true); + addTasks(run2, 201L, 202L); + BaseRun run3 = new BaseRun().setRunId(300L).setJobId(3L); + addTasks(run3, 301L); + BaseRun run4 = new BaseRun().setRunId(400L).setJobId(4L).setHasMore(true); + addTasks(run4, 401L, 402L); + + List runsOnFirstPage = new ArrayList<>(); + runsOnFirstPage.add(run1); + runsOnFirstPage.add(run2); + runsOnFirstPage.add(run3); + ListRunsResponse listRunsResponse1 = + new ListRunsResponse().setRuns(runsOnFirstPage).setNextPageToken("page2token"); + List runsOnSecondPage = new ArrayList<>(); + runsOnSecondPage.add(run4); + ListRunsResponse listRunsResponse2 = new ListRunsResponse().setRuns(runsOnSecondPage); + + when(service.listRuns(any())).thenReturn(listRunsResponse1).thenReturn(listRunsResponse2); + + // runs/get?run_id=100 + Run getRun1_page1 = new Run().setRunId(100L).setJobId(1L).setNextPageToken("run1_page2token"); + addTasks(getRun1_page1, 101L, 102L); + Run getRun1_page2 = new Run().setRunId(100L).setJobId(1L).setNextPageToken("run1_page3token"); + addTasks(getRun1_page2, 103L, 104L); + Run getRun1_page3 = new Run().setRunId(100L).setJobId(1L); + addTasks(getRun1_page3, 105L); + + // runs/get?run_id=200 + Run getRun2_page1 = new Run().setRunId(200L).setJobId(2L).setNextPageToken("run2_page2token"); + addTasks(getRun2_page1, 201L, 202L); + Run getRun2_page2 = new Run().setRunId(200L).setJobId(2L); + addTasks(getRun2_page2, 203L); + + // runs/get?run_id=300 + Run getRun3_page1 = new Run().setRunId(300L).setJobId(3L); + addTasks(getRun3_page1, 301L); + + // runs/get?run_id=400 + Run getRun4_page1 = new Run().setRunId(400L).setJobId(4L).setNextPageToken("run4_page2token"); + addTasks(getRun4_page1, 401L, 402L); + Run getRun4_page2 = new Run().setRunId(400L).setJobId(4L); + addTasks(getRun4_page2, 403L, 404L); + + doReturn(getRun1_page1) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 100L + && request.getPageToken() == null)); + doReturn(getRun1_page2) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 100L + && "run1_page2token".equals(request.getPageToken()))); + doReturn(getRun1_page3) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 100L + && "run1_page3token".equals(request.getPageToken()))); + doReturn(getRun2_page1) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 200L + && request.getPageToken() == null)); + doReturn(getRun2_page2) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 200L + && "run2_page2token".equals(request.getPageToken()))); + doReturn(getRun3_page1) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 300L + && request.getPageToken() == null)); + doReturn(getRun4_page1) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 400L + && request.getPageToken() == null)); + doReturn(getRun4_page2) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 400L + && "run4_page2token".equals(request.getPageToken()))); + + JobsExt jobsExt = new JobsExt(service); + ListRunsRequest request = new ListRunsRequest().setExpandTasks(true); + Iterable runsList = jobsExt.listRuns(request); + + BaseRun expectedRun1 = new BaseRun().setRunId(100L).setJobId(1L).setHasMore(false); + addTasks(expectedRun1, 101L, 102L, 103L, 104L, 105L); + BaseRun expectedRun2 = new BaseRun().setRunId(200L).setJobId(2L).setHasMore(false); + addTasks(expectedRun2, 201L, 202L, 203L); + BaseRun expectedRun3 = new BaseRun().setRunId(300L).setJobId(3L).setHasMore(false); + addTasks(expectedRun3, 301L); + BaseRun expectedRun4 = new BaseRun().setRunId(400L).setJobId(4L).setHasMore(false); + addTasks(expectedRun4, 401L, 402L, 403L, 404L); + List expectedRunsList = new ArrayList<>(); + expectedRunsList.add(expectedRun1); + expectedRunsList.add(expectedRun2); + expectedRunsList.add(expectedRun3); + expectedRunsList.add(expectedRun4); + for (BaseRun run : runsList) { + BaseRun expectedRun = + expectedRunsList.stream() + .filter(e -> e.getRunId().equals(run.getRunId())) + .findFirst() + .orElse(null); + assertEquals(expectedRun, run); + } + // 3 getRun calls for run 100, 2 getRun calls for run 200, 1 getRun call for run 300, 2 getRun + // calls for run 400 + verify(service, times(8)).getRun(any()); + } + + private void addTasks(BaseRun run, long... taskRunIds) { + Collection tasks = new ArrayList<>(); + for (long runId : taskRunIds) { + tasks.add(new RunTask().setRunId(runId)); + } + run.setTasks(tasks); + } } From ec57687d25ac7f0b8ef5583ecfa5d135b6fc075e Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Thu, 20 Feb 2025 13:43:48 +0100 Subject: [PATCH 2/5] Only call runs/get when has_more is present --- .../com/databricks/sdk/mixin/JobsExt.java | 20 ++++++++++++------- .../com/databricks/sdk/mixin/JobsExtTest.java | 4 ++-- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java b/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java index 39fd871cd..ca74b2861 100644 --- a/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java +++ b/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java @@ -40,13 +40,19 @@ public boolean hasNext() { @Override public BaseRun next() { BaseRun run = iterator.next(); - // fully fetch all top level arrays for the run - GetRunRequest getRunRequest = new GetRunRequest().setRunId(run.getRunId()); - Run fullRun = getRun(getRunRequest); - run.setTasks(fullRun.getTasks()); - run.setJobClusters(fullRun.getJobClusters()); - run.setJobParameters(fullRun.getJobParameters()); - run.setRepairHistory(fullRun.getRepairHistory()); + + // The has_more field is only present in run with 100+ tasks, that is served from Jobs API 2.2. + // Extra tasks and other fields need to be fetched only when has_more is true. + if (run.getHasMore() != null && run.getHasMore()) { + // fully fetch all top level arrays for the run + GetRunRequest getRunRequest = new GetRunRequest().setRunId(run.getRunId()); + Run fullRun = getRun(getRunRequest); + run.setTasks(fullRun.getTasks()); + run.setJobClusters(fullRun.getJobClusters()); + run.setJobParameters(fullRun.getJobParameters()); + run.setRepairHistory(fullRun.getRepairHistory()); + } + // Set the has_more field to false to indicate that there are no more tasks and other fields to fetch. run.setHasMore(false); return run; } diff --git a/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java b/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java index f5ed5be2e..534b5b791 100644 --- a/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java +++ b/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java @@ -379,9 +379,9 @@ public void testListRuns() { .orElse(null); assertEquals(expectedRun, run); } - // 3 getRun calls for run 100, 2 getRun calls for run 200, 1 getRun call for run 300, 2 getRun + // 3 getRun calls for run 100, 2 getRun calls for run 200, 0 getRun call for run 300, 2 getRun // calls for run 400 - verify(service, times(8)).getRun(any()); + verify(service, times(7)).getRun(any()); } private void addTasks(BaseRun run, long... taskRunIds) { From 190ed99052c6b14fc581817e3917ba4f1a75e27b Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Thu, 20 Feb 2025 15:11:44 +0100 Subject: [PATCH 3/5] Set the has_more field to null --- .../src/main/java/com/databricks/sdk/mixin/JobsExt.java | 6 ++++-- .../test/java/com/databricks/sdk/mixin/JobsExtTest.java | 8 ++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java b/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java index ca74b2861..ea95df777 100644 --- a/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java +++ b/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java @@ -52,8 +52,10 @@ public BaseRun next() { run.setJobParameters(fullRun.getJobParameters()); run.setRepairHistory(fullRun.getRepairHistory()); } - // Set the has_more field to false to indicate that there are no more tasks and other fields to fetch. - run.setHasMore(false); + // Set the has_more fields to null. + // This field in Jobs API 2.2 is useful for pagination. It indicates if there are more than 100 tasks or job_clusters in the run. + // This function hides pagination details from the user. So the field does not play useful role here. + run.setHasMore(null); return run; } }; diff --git a/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java b/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java index 534b5b791..fb35fa184 100644 --- a/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java +++ b/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java @@ -358,13 +358,13 @@ public void testListRuns() { ListRunsRequest request = new ListRunsRequest().setExpandTasks(true); Iterable runsList = jobsExt.listRuns(request); - BaseRun expectedRun1 = new BaseRun().setRunId(100L).setJobId(1L).setHasMore(false); + BaseRun expectedRun1 = new BaseRun().setRunId(100L).setJobId(1L); addTasks(expectedRun1, 101L, 102L, 103L, 104L, 105L); - BaseRun expectedRun2 = new BaseRun().setRunId(200L).setJobId(2L).setHasMore(false); + BaseRun expectedRun2 = new BaseRun().setRunId(200L).setJobId(2L); addTasks(expectedRun2, 201L, 202L, 203L); - BaseRun expectedRun3 = new BaseRun().setRunId(300L).setJobId(3L).setHasMore(false); + BaseRun expectedRun3 = new BaseRun().setRunId(300L).setJobId(3L); addTasks(expectedRun3, 301L); - BaseRun expectedRun4 = new BaseRun().setRunId(400L).setJobId(4L).setHasMore(false); + BaseRun expectedRun4 = new BaseRun().setRunId(400L).setJobId(4L); addTasks(expectedRun4, 401L, 402L, 403L, 404L); List expectedRunsList = new ArrayList<>(); expectedRunsList.add(expectedRun1); From d9ab27ceb529c578e41c63b5ab509e3e02e4d1c4 Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Thu, 20 Feb 2025 16:44:31 +0100 Subject: [PATCH 4/5] Update changelog --- NEXT_CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 584443e52..e534d3d79 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -9,6 +9,7 @@ ### Documentation ### Internal Changes +* Update Jobs ListRuns API to support paginated responses ([#410](https://github.com/databricks/databricks-sdk-java/pull/410)) * Introduce automated tagging ([#409](https://github.com/databricks/databricks-sdk-java/pull/409)). * Update Jobs GetJob API to support paginated responses ([#403](https://github.com/databricks/databricks-sdk-java/pull/403)). * Update Jobs GetRun API to support paginated responses ([#402](https://github.com/databricks/databricks-sdk-java/pull/402)). From b5051069808ac5d98e62f82fe41fcf8905d69ad0 Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Thu, 20 Feb 2025 16:47:08 +0100 Subject: [PATCH 5/5] fmt --- .../src/main/java/com/databricks/sdk/mixin/JobsExt.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java b/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java index ea95df777..1bd143efe 100644 --- a/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java +++ b/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java @@ -41,7 +41,8 @@ public boolean hasNext() { public BaseRun next() { BaseRun run = iterator.next(); - // The has_more field is only present in run with 100+ tasks, that is served from Jobs API 2.2. + // The has_more field is only present in run with 100+ tasks, that is served from Jobs + // API 2.2. // Extra tasks and other fields need to be fetched only when has_more is true. if (run.getHasMore() != null && run.getHasMore()) { // fully fetch all top level arrays for the run @@ -53,8 +54,10 @@ public BaseRun next() { run.setRepairHistory(fullRun.getRepairHistory()); } // Set the has_more fields to null. - // This field in Jobs API 2.2 is useful for pagination. It indicates if there are more than 100 tasks or job_clusters in the run. - // This function hides pagination details from the user. So the field does not play useful role here. + // This field in Jobs API 2.2 is useful for pagination. It indicates if there are more + // than 100 tasks or job_clusters in the run. + // This function hides pagination details from the user. So the field does not play + // useful role here. run.setHasMore(null); return run; }