Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified smart_tests/jar/exe_deploy.jar
Binary file not shown.
305 changes: 185 additions & 120 deletions src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.launchableinc.ingest.commits;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static java.time.Instant.now;

/**
* Given multiple concurrent slow {@link Consumer}s, each g oing over a large
* number of items in parallel,
* provide a progress report to show that the work is still in progress.
*/
class ProgressReporter<T> {
private final Function<T, String> printer;
private final Duration reportInterval;
private Instant nextReportTime;

/**
* Number of items that need to be processed, across all consumers.
*/
private final AtomicInteger workload = new AtomicInteger();
/**
* Number of items that have already been processed, across all consumers.
*/
private final AtomicInteger completed = new AtomicInteger();

ProgressReporter(Function<T, String> printer, Duration reportInterval) {
this.printer = printer;
this.reportInterval = reportInterval;
this.nextReportTime = now().plus(reportInterval);
}

/**
* Deals with one serial stream of work.
*/
class Consumer implements FlushableConsumer<T>, AutoCloseable {
private final FlushableConsumer<T> base;
private final List<T> pool = new ArrayList<>();

Consumer(FlushableConsumer<T> base) {
this.base = base;
}

@Override
public void accept(T t) {
pool.add(t);
workload.incrementAndGet();
}

@Override
public void flush() throws IOException {
for (T x : pool) {
synchronized (ProgressReporter.this) {
if (now().isAfter(nextReportTime)) {
print(completed.get(), workload.get(), x);
nextReportTime = now().plus(reportInterval);
}
}
base.accept(x);
completed.incrementAndGet();
}
pool.clear();
base.flush();
}

@Override
public void close() throws IOException {
flush();
}
}

Consumer newConsumer(FlushableConsumer<T> base) {
return new Consumer(base);
}

protected void print(int c, int w, T x) {
int width = String.valueOf(w).length();
System.err.printf("%s/%d: %s%n", pad(c, width), w, printer.apply(x));
}

static String pad(int i, int width) {
String s = String.valueOf(i);
while (s.length() < width) {
s = " " + s;
}
return s;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
MainTest.class,
FileChunkStreamerTest.class,
SSLBypassTest.class,
ProgressReportingConsumerTest.class
ProgressReporterTest.class
})
public class AllTests {}
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,13 @@ public void header() throws Exception {
CommitGraphCollector cgc = new CommitGraphCollector("test", mainrepo.getRepository());
cgc.collectFiles(true);
cgc.new ByRepository(mainrepo.getRepository(), "main")
.transfer(Collections.emptyList(), c -> {},
.collectFiles(Collections.emptyList(),
new PassThroughTreeReceiverImpl(),
FlushableConsumer.of(files::add));

// header for the main repo, 'gitmodules', header for the sub repo, 'a', and 'x' in the sub repo
assertThat(files).hasSize(5);
VirtualFile header = files.get(2);
// header for the main repo, 'gitmodules'
assertThat(files).hasSize(2);
VirtualFile header = files.get(0);
assertThat(header.path()).isEqualTo(CommitGraphCollector.HEADER_FILE);
JsonNode tree = assertValidJson(header::writeTo).get("tree");
assertThat(tree.isArray()).isTrue();
Expand All @@ -193,7 +193,7 @@ cgc.new ByRepository(mainrepo.getRepository(), "main")
paths.add(i.get("path").asText());
}

assertThat(paths).containsExactly("a", "x");
assertThat(paths).containsExactly(".gitmodules", "sub");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.launchableinc.ingest.commits;

import org.junit.Test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static com.google.common.truth.Truth.*;
import static java.util.Collections.*;

public class ProgressReporterTest {
ProgressReporter<String> pr = new ProgressReporter<String>(String::valueOf, Duration.ofMillis(100)) {
int cc = 0;
int ww = 0;
@Override
protected void print(int c, int w, String x) {
super.print(c, w, x);

// ensure numbers are monotonically increasing
assertThat(c).isAtLeast(cc);
assertThat(w).isAtLeast(ww);
cc = c;
ww = w;
}
};

/**
* Tests the most important bit -- that all items are processed.
*/
@Test
public void serial() throws Exception {
List<String> done = new ArrayList<>();
try (ProgressReporter<String>.Consumer x = pr.newConsumer(FlushableConsumer.of(s -> {
done.add(s);
sleep();
}))) {
for (int i = 0; i < 100; i++) {
x.accept("item " + i);
}
}
assertThat(done.size()).isEqualTo(100);
}

/**
* Perform work in parallel and make sure they all do get processed.
*/
@Test
public void parallel() throws Exception {
Set<String> done = synchronizedSet(new HashSet<>());

ExecutorService es = Executors.newFixedThreadPool(10);
List<Future<?>> all = new ArrayList<>();
for (int i=0; i<10; i++) {
final int ii = i;
all.add(es.submit(() -> {
try (ProgressReporter<String>.Consumer x = pr.newConsumer(FlushableConsumer.of(s -> {done.add(s);sleep();}))) {
for (int j = 0; j < 100; j++) {
x.accept("item " + (ii*100+j));
}
return null;
}
}));
}
for (Future<?> f : all) {
f.get();
}
es.shutdown();

assertThat(done.size()).isEqualTo(1000);
for (int i=0; i<1000; i++) {
assertThat(done).contains("item " + i);
}
}

private static void sleep() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new UnsupportedOperationException();
}
}

@Test
public void pad() {
assertThat(ProgressReporter.pad(5,3)).isEqualTo(" 5");
assertThat(ProgressReporter.pad(15,3)).isEqualTo(" 15");
assertThat(ProgressReporter.pad(1234,3)).isEqualTo("1234");
}
}

This file was deleted.

Loading