Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified launchable/jar/exe_deploy.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
* Accepts T, buffers them, and writes them out as a batch.
*/
abstract class ChunkStreamer<T> implements Consumer<T>, Closeable {
abstract class ChunkStreamer<T> implements FlushableConsumer<T>, Closeable {
/**
* Encapsulation of how batches are sent.
*/
Expand Down Expand Up @@ -43,7 +42,8 @@ public void close() throws IOException {
flush();
}

private void flush() throws IOException {
@Override
public void flush() throws IOException {
if (spool.isEmpty()) {
return;
}
Expand Down
131 changes: 114 additions & 17 deletions src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.launchableinc.ingest.commits;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -44,23 +45,25 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.zip.GZIPOutputStream;

import static com.google.common.collect.ImmutableList.*;
import static java.util.Arrays.*;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Arrays.stream;

/**
* Compares what commits the local repository and the remote repository have, then send delta over.
Expand Down Expand Up @@ -171,11 +174,88 @@ public void transfer(URL service, Authenticator authenticator, boolean enableTim
if (dryRun) {
return;
}
handleError(url, client.execute(request));
handleError(url, client.execute(request)).close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
},
new TreeReceiver() {
private final List<VirtualFile> files = new ArrayList<>();

private void writeJsonTo(OutputStream os) throws IOException {
try (JsonGenerator w = new JsonFactory().createGenerator(os)) {
w.setCodec(objectMapper);
w.writeStartObject();
w.writeArrayFieldStart("tree");

for (VirtualFile commit : files) {
w.writeStartObject();
w.writeFieldName("path");
w.writeString(commit.path());
w.writeFieldName("blob");
w.writeString(commit.blob().name());
w.writeEndObject();
}

w.writeEndArray();
w.writeEndObject();
}
}
@Override
public Collection<VirtualFile> response() {
try {
URL url = new URL(service, "collect/tree");
HttpPost request = new HttpPost(url.toExternalForm());
request.setHeader("Content-Type", "application/json");
request.setHeader("Content-Encoding", "gzip");
request.setEntity(new EntityTemplate(raw -> {
try (OutputStream os = new GZIPOutputStream(raw)) {
writeJsonTo(os);
}
}));

if (outputAuditLog()) {
System.err.printf(
"AUDIT:launchable:%ssend request method:post path:%s headers:%s args:",
dryRunPrefix(), url, dumpHeaderAsJson(request.getAllHeaders()));
writeJsonTo(System.err);
System.err.println();
}

// even in dry run, this method needs to execute in order to show what files we'll be collecting
try (CloseableHttpResponse response = handleError(url, client.execute(request));
JsonParser parser = new JsonFactory().createParser(response.getEntity().getContent())) {
return select(objectMapper.readValue(parser, String[].class));
}
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
files.clear();
}
}

private List<VirtualFile> select(String[] response) {
Map<String,VirtualFile> filesByPath = new HashMap<>();
for (VirtualFile f : files) {
filesByPath.put(f.path(), f);
}

List<VirtualFile> selected = new ArrayList<>();
for (String path : response) {
VirtualFile f = filesByPath.get(path);
if (f!=null) {
selected.add(f);
}
}

return selected;
}

@Override
public void accept(VirtualFile f) {
files.add(f);
}
},
(ContentProducer files) -> {
try {
URL url = new URL(service, "collect/files");
Expand Down Expand Up @@ -211,7 +291,7 @@ public void transfer(URL service, Authenticator authenticator, boolean enableTim
if (dryRun) {
return;
}
handleError(url, client.execute(request));
handleError(url, client.execute(request)).close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -256,13 +336,13 @@ private ImmutableList<ObjectId> getAdvertisedRefs(HttpResponse response) throws
* chunk size.
*/
public void transfer(
Collection<ObjectId> advertised, IOConsumer<ContentProducer> commitSender, IOConsumer<ContentProducer> fileSender, int chunkSize)
Collection<ObjectId> advertised, IOConsumer<ContentProducer> commitSender, TreeReceiver treeReceiver, IOConsumer<ContentProducer> fileSender, int chunkSize)
throws IOException {
ByRepository r = new ByRepository(root, rootName);
try (CommitChunkStreamer cs = new CommitChunkStreamer(commitSender, chunkSize);
FileChunkStreamer fs = new FileChunkStreamer(fileSender, chunkSize);
ProgressReportingConsumer<VirtualFile> fsr = new ProgressReportingConsumer<>(fs, VirtualFile::path, Duration.ofSeconds(3))) {
r.transfer(advertised, cs, fsr);
r.transfer(advertised, cs, treeReceiver, fsr);
}
}

Expand Down Expand Up @@ -323,7 +403,7 @@ final class ByRepository implements AutoCloseable {
*
* @param commitReceiver Receives commits that should be sent, one by one.
*/
public void transfer(Collection<ObjectId> advertised, Consumer<JSCommit> commitReceiver, Consumer<VirtualFile> fileReceiver)
public void transfer(Collection<ObjectId> advertised, Consumer<JSCommit> commitReceiver, TreeReceiver treeReceiver, FlushableConsumer<VirtualFile> fileReceiver)
throws IOException {
try (RevWalk walk = new RevWalk(git); TreeWalk treeWalk = new TreeWalk(git)) {
// walk reverse topological order, so that older commits get added to the server earlier.
Expand Down Expand Up @@ -368,7 +448,11 @@ public void transfer(Collection<ObjectId> advertised, Consumer<JSCommit> commitR
}
}

collectFiles(treeWalk, fileReceiver);
// record all the necessary BLOBs first, before attempting to record its commit.
// this way, if the file collection fails, the server won't see this commit, so the future
// "record commit" invocation will retry the file collection, thereby making the behavior idempotent.
collectFiles(treeWalk, treeReceiver, fileReceiver);
fileReceiver.flush();

// walk the commits, transform them, and send them to the commitReceiver
for (RevCommit c : walk) {
Expand Down Expand Up @@ -399,7 +483,7 @@ That is, find submodules that are available in the working tree (thus `!isBare()
if (subRepo != null) {
try {
try (ByRepository br = new ByRepository(subRepo, name + "/" + swalk.getModulesPath())) {
br.transfer(advertised, commitReceiver, fileReceiver);
br.transfer(advertised, commitReceiver, treeReceiver, fileReceiver);
}
} catch (ConfigInvalidException e) {
throw new IOException("Invalid Git submodule configuration: " + git.getDirectory(), e);
Expand All @@ -411,15 +495,24 @@ That is, find submodules that are available in the working tree (thus `!isBare()
}
}

private void collectFiles(TreeWalk treeWalk, Consumer<VirtualFile> receiver) throws IOException {
if (!collectFiles) return;
/**
* treeWalk contains the HEAD (the interesting commit) at the 0th position, then all the commits
* the server advertised in the 1st, 2nd, ...
* Our goal here is to find all the files that the server hasn't seen yet. We'll send them to the tree receiver,
* which further responds with the actual files we need to send to the server.
*/
private void collectFiles(TreeWalk treeWalk, TreeReceiver treeReceiver, Consumer<VirtualFile> fileReceiver) throws IOException {
if (!collectFiles) {
return;
}


int c = treeWalk.getTreeCount();

OUTER:
while (treeWalk.next()) {
ObjectId head = treeWalk.getObjectId(0);
for (int i=1; i<c; i++) {
for (int i = 1; i < c; i++) {
if (head.equals(treeWalk.getObjectId(i))) {
// file at the head is identical to one of the uninteresting commits,
// meaning we have already seen this file/directory on the server.
Expand All @@ -431,16 +524,20 @@ private void collectFiles(TreeWalk treeWalk, Consumer<VirtualFile> receiver) thr
if (treeWalk.isSubtree()) {
treeWalk.enterSubtree();
} else {
if ((treeWalk.getFileMode(0).getBits()&FileMode.TYPE_MASK)==FileMode.TYPE_FILE) {
if ((treeWalk.getFileMode(0).getBits() & FileMode.TYPE_MASK) == FileMode.TYPE_FILE) {
GitFile f = new GitFile(name, treeWalk.getPathString(), head, objectReader);
// to avoid excessive data transfer, skip files that are too big
if (f.size()<1024*1024 && f.isText()) {
receiver.accept(f);
filesSent++;
if (f.size() < 1024 * 1024 && f.isText()) {
treeReceiver.accept(f);
}
}
}
}

for (VirtualFile f : treeReceiver.response()) {
fileReceiver.accept(f);
filesSent++;
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ protected void writeTo(List<VirtualFile> files, OutputStream os) throws IOExcept
for (VirtualFile f : files) {
TarArchiveEntry e = new TarArchiveEntry(f.path());
e.setSize(f.size());
e.setGroupName(f.blob().name()); // HACK - reuse the group name field to store the blob ID
tar.putArchiveEntry(e);
f.writeTo(tar);
tar.closeArchiveEntry();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.launchableinc.ingest.commits;

import java.io.IOException;
import java.util.function.Consumer;

/**
* Consumers that spool items it accepts and process them in bulk.
*/
public interface FlushableConsumer<T> extends Consumer<T> {
/**
* Process all items that have been accepted so far.
*/
void flush() throws IOException;
}
5 changes: 5 additions & 0 deletions src/main/java/com/launchableinc/ingest/commits/GitFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public String path() {
return path;
}

@Override
public ObjectId blob() {
return blob;
}

public long size() throws IOException {
return open().getSize();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.launchableinc.ingest.commits;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
Expand All @@ -13,44 +14,61 @@
* Given a slow {@link Consumer} that goes over a large number of items,
* provide a progress report to show that the work is still in progress.
*/
class ProgressReportingConsumer<T> implements Consumer<T>, AutoCloseable {
private final Consumer<T> base;
private final List<T> pool = new ArrayList<>();
private final Function<T,String> printer;
private final Duration reportInterval;

ProgressReportingConsumer(Consumer<T> base, Function<T,String> printer, Duration reportInterval) {
this.base = base;
this.printer = printer;
this.reportInterval = reportInterval;
}
class ProgressReportingConsumer<T> implements FlushableConsumer<T>, AutoCloseable {
private final FlushableConsumer<T> base;
private final List<T> pool = new ArrayList<>();
private final Function<T, String> printer;
private final Duration reportInterval;
private int round = 1;

ProgressReportingConsumer(FlushableConsumer<T> base, Function<T, String> printer, Duration reportInterval) {
this.base = base;
this.printer = printer;
this.reportInterval = reportInterval;
}

@Override
public void accept(T t) {
pool.add(t);
@Override
public void accept(T t) {
pool.add(t);
}

@Override
public void flush() throws IOException {
Instant nextReportTime = now().plus(reportInterval);
int width = String.valueOf(pool.size()).length();
int i = 0;
for (T x : pool) {
i++;
if (now().isAfter(nextReportTime)) {
System.err.printf("%s%s/%d: %s%n", round(), pad(i, width), pool.size(), printer.apply(x));
nextReportTime = now().plus(reportInterval);
}
base.accept(x);
}
pool.clear();
base.flush();
round++;
}

@Override
public void close() {
Instant nextReportTime = now().plus(reportInterval);
int width = String.valueOf(pool.size()).length();
int i = 0;
for (T x : pool) {
i++;
if (now().isAfter(nextReportTime)) {
System.err.printf("%s/%d: %s%n", pad(i, width), pool.size(), printer.apply(x));
nextReportTime = now().plus(reportInterval);
}
base.accept(x);
}
pool.clear();
private String round() {
if (round==1) {
// most of the time, there's only one round, so let's not bother
return "";
} else {
return String.format("#%d ", round);
}
}

@Override
public void close() throws IOException {
flush();
}

static String pad(int i, int width) {
String s = String.valueOf(i);
while (s.length() < width) {
s = " " + s;
}
return s;
static String pad(int i, int width) {
String s = String.valueOf(i);
while (s.length() < width) {
s = " " + s;
}
return s;
}
}
Loading
Loading