diff --git a/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java b/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java index e94b6452c..3ac660f2b 100644 --- a/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java +++ b/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java @@ -8,7 +8,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import com.google.common.io.CharStreams; -import java.util.Collections; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.http.Header; @@ -53,6 +52,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -63,8 +63,8 @@ import java.util.function.Supplier; import java.util.zip.GZIPOutputStream; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static java.util.Arrays.stream; +import static com.google.common.collect.ImmutableList.*; +import static java.util.Arrays.*; /** * Compares what commits the local repository and the remote repository have, then send delta over. @@ -73,6 +73,10 @@ public class CommitGraphCollector { private static final Logger logger = LoggerFactory.getLogger(CommitGraphCollector.class); static final ObjectMapper objectMapper = new ObjectMapper(); private static final int HTTP_TIMEOUT_MILLISECONDS = 15_000; + /** + * Repository header is sent using this reserved file name + */ + static final String HEADER_FILE = ".launchable"; private final String rootName; @@ -157,150 +161,70 @@ public void transfer(URL service, Authenticator authenticator, boolean enableTim // every time a new stream is needed, supply ByteArrayOutputStream, and when the data is all // written, turn around and ship that over transfer( - advertised, - (ContentProducer commits) -> { - try { - URL url = new URL(service, "collect"); - HttpPost request = new HttpPost(url.toExternalForm()); - request.setHeader("Content-Type", "application/json"); - request.setHeader("Content-Encoding", "gzip"); - request.setEntity(new EntityTemplate(os -> commits.writeTo(new GZIPOutputStream(os)))); - - if (outputAuditLog()) { - System.err.printf( - "AUDIT:launchable:%ssend request method:post path:%s headers:%s" - + " args:", - dryRunPrefix(), url, dumpHeaderAsJson(request.getAllHeaders())); - commits.writeTo(System.err); - System.err.println(); - } - if (dryRun) { - return; - } - handleError(url, client.execute(request)).close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }, - new TreeReceiver() { - private final List files = new ArrayList<>(); - - private void writeJsonTo(OutputStream os) throws IOException { - try (JsonGenerator w = new JsonFactory().createGenerator(os)) { - w.setCodec(objectMapper); - w.writeStartObject(); - w.writeArrayFieldStart("tree"); - - for (VirtualFile commit : files) { - w.writeStartObject(); - w.writeFieldName("path"); - w.writeString(commit.path()); - w.writeFieldName("blob"); - w.writeString(commit.blob().name()); - w.writeEndObject(); - } - - w.writeEndArray(); - w.writeEndObject(); - } - } - @Override - public Collection response() { - try { - URL url = new URL(service, "collect/tree"); - HttpPost request = new HttpPost(url.toExternalForm()); - request.setHeader("Content-Type", "application/json"); - request.setHeader("Content-Encoding", "gzip"); - request.setEntity(new EntityTemplate(raw -> { - try (OutputStream os = new GZIPOutputStream(raw)) { - writeJsonTo(os); - } - })); - - if (outputAuditLog()) { - System.err.printf( - "AUDIT:launchable:%ssend request method:post path:%s headers:%s args:", - dryRunPrefix(), url, dumpHeaderAsJson(request.getAllHeaders())); - writeJsonTo(System.err); - System.err.println(); - } - - // even in dry run, this method needs to execute in order to show what files we'll be collecting - try (CloseableHttpResponse response = handleError(url, client.execute(request)); - JsonParser parser = new JsonFactory().createParser(response.getEntity().getContent())) { - return select(objectMapper.readValue(parser, String[].class)); - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } finally { - files.clear(); - } - } - - private List select(String[] response) { - Map filesByPath = new HashMap<>(); - for (VirtualFile f : files) { - filesByPath.put(f.path(), f); - } - - List selected = new ArrayList<>(); - for (String path : response) { - VirtualFile f = filesByPath.get(path); - if (f!=null) { - selected.add(f); - } - } + advertised, + (ContentProducer commits) -> sendCommits(service, client, commits), + new TreeReceiverImpl(service, client), + (ContentProducer files) -> sendFiles(service, client, files), + 256); + } + } - return selected; - } + private void sendCommits(URL service, CloseableHttpClient client, ContentProducer commits) throws IOException { + URL url = new URL(service, "collect"); + HttpPost request = new HttpPost(url.toExternalForm()); + request.setHeader("Content-Type", "application/json"); + request.setHeader("Content-Encoding", "gzip"); + request.setEntity(new EntityTemplate(os -> commits.writeTo(new GZIPOutputStream(os)))); + + if (outputAuditLog()) { + System.err.printf( + "AUDIT:launchable:%ssend request method:post path:%s headers:%s" + + " args:", + dryRunPrefix(), url, dumpHeaderAsJson(request.getAllHeaders())); + commits.writeTo(System.err); + System.err.println(); + } + if (dryRun) { + return; + } + handleError(url, client.execute(request)).close(); + } - @Override - public void accept(VirtualFile f) { - files.add(f); - } - }, - (ContentProducer files) -> { - try { - URL url = new URL(service, "collect/files"); - HttpPost request = new HttpPost(url.toExternalForm()); - request.setHeader("Content-Type", "application/octet-stream"); - // no content encoding, since .tar.gz is considered content - request.setEntity(new EntityTemplate(os -> files.writeTo(new GZIPOutputStream(os)))); - - if (outputAuditLog()) { - System.err.printf( - "AUDIT:launchable:%ssend request method:post path:%s headers:%s args:", - dryRunPrefix(), url, dumpHeaderAsJson(request.getAllHeaders())); - - // TODO: inefficient to buffer everything in memory just to read it back - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - files.writeTo(baos); - TarArchiveInputStream tar = - new TarArchiveInputStream( - new ByteArrayInputStream(baos.toByteArray()), - "UTF-8"); - TarArchiveEntry entry; - boolean first = true; - while ((entry = tar.getNextTarEntry()) != null) { - System.err.printf(entry.getName()); - if (first) { - first = false; - } else { - System.err.print(", "); - } - } - System.err.println(); - } - if (dryRun) { - return; - } - handleError(url, client.execute(request)).close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }, - 256); + private void sendFiles(URL service, CloseableHttpClient client, ContentProducer files) throws IOException { + URL url = new URL(service, "collect/files"); + HttpPost request = new HttpPost(url.toExternalForm()); + request.setHeader("Content-Type", "application/octet-stream"); + // no content encoding, since .tar.gz is considered content + request.setEntity(new EntityTemplate(os -> files.writeTo(new GZIPOutputStream(os)))); + + if (outputAuditLog()) { + System.err.printf( + "AUDIT:launchable:%ssend request method:post path:%s headers:%s args:", + dryRunPrefix(), url, dumpHeaderAsJson(request.getAllHeaders())); + + // TODO: inefficient to buffer everything in memory just to read it back + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + files.writeTo(baos); + TarArchiveInputStream tar = + new TarArchiveInputStream( + new ByteArrayInputStream(baos.toByteArray()), + "UTF-8"); + TarArchiveEntry entry; + boolean first = true; + while ((entry = tar.getNextTarEntry()) != null) { + System.err.printf(entry.getName()); + if (first) { + first = false; + } else { + System.err.print(", "); + } + } + System.err.println(); + } + if (dryRun) { + return; } + handleError(url, client.execute(request)).close(); } private void honorControlHeaders(HttpResponse response) { @@ -393,6 +317,7 @@ public void collectFiles(boolean collectFiles) { /** Process commits per repository. */ final class ByRepository implements AutoCloseable { + /** Names that uniquely identifies this Git repository among other Git repositories collected for the workspace. */ private final String name; private final Repository git; @@ -465,7 +390,7 @@ public void transfer(Collection advertised, Consumer commitR // record all the necessary BLOBs first, before attempting to record its commit. // this way, if the file collection fails, the server won't see this commit, so the future // "record commit" invocation will retry the file collection, thereby making the behavior idempotent. - collectFiles(treeWalk, treeReceiver, fileReceiver); + collectFiles(start, treeWalk, treeReceiver, fileReceiver); fileReceiver.flush(); // walk the commits, transform them, and send them to the commitReceiver @@ -515,7 +440,7 @@ That is, find submodules that are available in the working tree (thus `!isBare() * Our goal here is to find all the files that the server hasn't seen yet. We'll send them to the tree receiver, * which further responds with the actual files we need to send to the server. */ - private void collectFiles(TreeWalk treeWalk, TreeReceiver treeReceiver, Consumer fileReceiver) throws IOException { + private void collectFiles(RevCommit start, TreeWalk treeWalk, TreeReceiver treeReceiver, Consumer fileReceiver) throws IOException { if (!collectFiles) { return; } @@ -541,7 +466,7 @@ private void collectFiles(TreeWalk treeWalk, TreeReceiver treeReceiver, Consumer if ((treeWalk.getFileMode(0).getBits() & FileMode.TYPE_MASK) == FileMode.TYPE_FILE) { GitFile f = new GitFile(name, treeWalk.getPathString(), head, objectReader); // to avoid excessive data transfer, skip files that are too big - if (f.size() < 1024 * 1024 && f.isText()) { + if (f.size() < 1024 * 1024 && f.isText() && !f.path.equals(HEADER_FILE)) { treeReceiver.accept(f); } } @@ -551,11 +476,44 @@ private void collectFiles(TreeWalk treeWalk, TreeReceiver treeReceiver, Consumer // Note(Konboi): To balance the order, since words like "test" and "spec" tend to appear // toward the end in alphabetical sorting. List files = new ArrayList<>(treeReceiver.response()); - Collections.shuffle(files); - for (VirtualFile f : files) { - fileReceiver.accept(f); + if (!files.isEmpty()) { + fileReceiver.accept(buildHeader(start)); filesSent++; + + Collections.shuffle(files); + for (VirtualFile f : files) { + fileReceiver.accept(f); + filesSent++; + } + } + } + + /** + * Creates a per repository "header" file as a {@link VirtualFile}. + * Currently, this is just the list of files in the repository. + */ + private VirtualFile buildHeader(RevCommit start) throws IOException { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + try (JsonGenerator w = new JsonFactory().createGenerator(os)) { + w.setCodec(objectMapper); + w.writeStartObject(); + w.writeArrayFieldStart("tree"); + + try (TreeWalk tw = new TreeWalk(git)) { + tw.addTree(start.getTree()); + tw.setRecursive(true); + + while (tw.next()) { + w.writeStartObject(); + w.writeStringField("path", tw.getPathString()); + w.writeEndObject(); + } + } + + w.writeEndArray(); + w.writeEndObject(); } + return VirtualFile.from(name, HEADER_FILE, ObjectId.zeroId(), os.toByteArray()); } @@ -628,4 +586,90 @@ public void close() { objectReader.close(); } } + + private class TreeReceiverImpl implements TreeReceiver { + private final List files = new ArrayList<>(); + private final URL service; + private final CloseableHttpClient client; + + public TreeReceiverImpl(URL service, CloseableHttpClient client) { + this.service = service; + this.client = client; + } + + private void writeJsonTo(OutputStream os) throws IOException { + try (JsonGenerator w = new JsonFactory().createGenerator(os)) { + w.setCodec(objectMapper); + w.writeStartObject(); + w.writeArrayFieldStart("tree"); + + for (VirtualFile commit : files) { + w.writeStartObject(); + w.writeFieldName("path"); + w.writeString(commit.path()); + w.writeFieldName("blob"); + w.writeString(commit.blob().name()); + w.writeEndObject(); + } + + w.writeEndArray(); + w.writeEndObject(); + } + } + + @Override + public Collection response() { + try { + URL url = new URL(service, "collect/tree"); + HttpPost request = new HttpPost(url.toExternalForm()); + request.setHeader("Content-Type", "application/json"); + request.setHeader("Content-Encoding", "gzip"); + request.setEntity(new EntityTemplate(raw -> { + try (OutputStream os = new GZIPOutputStream(raw)) { + writeJsonTo(os); + } + })); + + if (outputAuditLog()) { + System.err.printf( + "AUDIT:launchable:%ssend request method:post path:%s headers:%s args:", + dryRunPrefix(), url, dumpHeaderAsJson(request.getAllHeaders())); + writeJsonTo(System.err); + System.err.println(); + } + + // even in dry run, this method needs to execute in order to show what files we'll be collecting + try (CloseableHttpResponse response = handleError(url, client.execute(request)); + JsonParser parser = new JsonFactory().createParser(response.getEntity().getContent())) { + return select(objectMapper.readValue(parser, String[].class)); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + files.clear(); + } + } + + private List select(String[] response) { + Map filesByPath = new HashMap<>(); + for (VirtualFile f : files) { + filesByPath.put(f.path(), f); + } + + List selected = new ArrayList<>(); + for (String path : response) { + VirtualFile f = filesByPath.get(path); + if (f!=null) { + selected.add(f); + } + } + + return selected; + } + + @Override + public void accept(VirtualFile f) { + files.add(f); + } + } } diff --git a/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java b/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java index 104649ab7..7eb8c701d 100644 --- a/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java +++ b/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java @@ -7,8 +7,22 @@ * Consumers that spool items it accepts and process them in bulk. */ public interface FlushableConsumer extends Consumer { - /** - * Process all items that have been accepted so far. - */ - void flush() throws IOException; + /** + * Process all items that have been accepted so far. + */ + void flush() throws IOException; + + static FlushableConsumer of(Consumer c) { + return new FlushableConsumer() { + @Override + public void flush() throws IOException { + // noop + } + + @Override + public void accept(T t) { + c.accept(t); + } + }; + } } diff --git a/src/main/java/com/launchableinc/ingest/commits/VirtualFile.java b/src/main/java/com/launchableinc/ingest/commits/VirtualFile.java index b8126eba5..030867403 100644 --- a/src/main/java/com/launchableinc/ingest/commits/VirtualFile.java +++ b/src/main/java/com/launchableinc/ingest/commits/VirtualFile.java @@ -23,4 +23,34 @@ public interface VirtualFile { long size() throws IOException; void writeTo(OutputStream os) throws IOException; + + static VirtualFile from(String repo, String path, ObjectId blob, byte[] payload) { + return new VirtualFile() { + + @Override + public String repo() { + return repo; + } + + @Override + public String path() { + return path; + } + + @Override + public ObjectId blob() { + return blob; + } + + @Override + public long size() { + return payload.length; + } + + @Override + public void writeTo(OutputStream os) throws IOException { + os.write(payload); + } + }; + } } diff --git a/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java b/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java index a03b3e75a..b069bee94 100644 --- a/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java +++ b/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java @@ -1,11 +1,13 @@ package com.launchableinc.ingest.commits; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.NullOutputStream; import org.apache.http.entity.ContentProducer; +import org.eclipse.jgit.api.CommitCommand; import org.eclipse.jgit.api.Git; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.PersonIdent; @@ -26,7 +28,8 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.util.Collection; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static com.google.common.truth.Truth.*; @@ -87,7 +90,7 @@ public void bareRepo() throws Exception { try (Repository r = Git.open(barerepoDir).getRepository()) { CommitGraphCollector cgc = collectCommit(r, ImmutableList.of()); assertThat(cgc.getCommitsSent()).isEqualTo(1); - assertThat(cgc.getFilesSent()).isEqualTo(1); + assertThat(cgc.getFilesSent()).isEqualTo(2); // header + .gitmodules } } @@ -118,7 +121,7 @@ public void chunking() throws Exception { 2); } assertThat(councCommitChunks[0]).isEqualTo(2); - assertThat(countFilesChunks[0]).isEqualTo(1); // a and sub/x, 2 files, 1 chunk + assertThat(countFilesChunks[0]).isEqualTo(3); // header, a, .gitmodules, and header, sub/x, 5 files, 3 chunks } private void assertValidTar(ContentProducer content) throws IOException { @@ -129,8 +132,8 @@ private void assertValidTar(ContentProducer content) throws IOException { } } - private void assertValidJson(ContentProducer content) throws IOException { - new ObjectMapper().readTree(read(content)); + private JsonNode assertValidJson(ContentProducer content) throws IOException { + return new ObjectMapper().readTree(read(content)); } private InputStream read(ContentProducer content) throws IOException { @@ -163,6 +166,37 @@ private CommitGraphCollector collectCommit(Repository r, List advertis return cgc; } + @Test + public void header() throws Exception { + setupRepos(); + try (Git mainrepo = Git.open(mainrepoDir)) { + addCommitInSubRepo(mainrepo); + + List files = new ArrayList<>(); + + CommitGraphCollector cgc = new CommitGraphCollector("test", mainrepo.getRepository()); + cgc.collectFiles(true); + cgc.new ByRepository(mainrepo.getRepository(), "main") + .transfer(Collections.emptyList(), c -> {}, + new PassThroughTreeReceiverImpl(), + FlushableConsumer.of(files::add)); + + // header for the main repo, 'gitmodules', header for the sub repo, 'a', and 'x' in the sub repo + assertThat(files).hasSize(5); + VirtualFile header = files.get(2); + assertThat(header.path()).isEqualTo(CommitGraphCollector.HEADER_FILE); + JsonNode tree = assertValidJson(header::writeTo).get("tree"); + assertThat(tree.isArray()).isTrue(); + + List paths = new ArrayList<>(); + for (JsonNode i : tree) { + paths.add(i.get("path").asText()); + } + + assertThat(paths).containsExactly("a", "x"); + } + } + /** * Initialize a repository with a submodule. * @@ -172,12 +206,13 @@ private PersonIdent setupRepos() throws Exception { PersonIdent ident; try (Git subrepo = Git.init().setDirectory(subrepoDir).call()) { Files.writeString(subrepoDir.toPath().resolve("a"), ""); - RevCommit c = subrepo.commit().setAll(true).setMessage("sub").call(); + subrepo.add().addFilepattern("a").call(); + RevCommit c = commit(subrepo).setMessage("sub").call(); ident = c.getCommitterIdent(); } try (Git mainrepo = Git.init().setDirectory(mainrepoDir).call()) { mainrepo.submoduleAdd().setPath("sub").setURI(subrepoDir.toURI().toString()).call(); - mainrepo.commit().setAll(true).setMessage("created a submodule").call(); + commit(mainrepo).setMessage("created a submodule").call(); } return ident; } @@ -186,7 +221,13 @@ private void addCommitInSubRepo(Git mainrepo) throws Exception { try (Git submodrepo = Git.wrap(SubmoduleWalk.getSubmoduleRepository(mainrepo.getRepository(), "sub"))) { Files.writeString(mainrepoDir.toPath().resolve("sub").resolve("x"), ""); - submodrepo.commit().setAll(true).setMessage("added x").call(); + submodrepo.add().addFilepattern("x").call(); + commit(submodrepo).setMessage("added x").call(); } } + + private CommitCommand commit(Git r) { + return r.commit().setAll(true).setSign(false); + } + } diff --git a/src/test/java/com/launchableinc/ingest/commits/CommitIngesterTest.java b/src/test/java/com/launchableinc/ingest/commits/CommitIngesterTest.java index fccd4fe2f..54df8289f 100644 --- a/src/test/java/com/launchableinc/ingest/commits/CommitIngesterTest.java +++ b/src/test/java/com/launchableinc/ingest/commits/CommitIngesterTest.java @@ -7,6 +7,8 @@ import java.net.InetSocketAddress; import java.net.URL; import java.nio.file.Files; + +import org.eclipse.jgit.api.CommitCommand; import org.eclipse.jgit.api.Git; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.revwalk.RevCommit; @@ -32,12 +34,12 @@ public void specifySubmodule() throws Exception { RevCommit subCommit; try (Git subrepo = Git.init().setDirectory(subrepoDir).call()) { Files.writeString(subrepoDir.toPath().resolve("a"), ""); - subCommit = subrepo.commit().setAll(true).setMessage("sub").call(); + subCommit = commit(subrepo).setMessage("sub").call(); } RevCommit mainCommit; try (Git mainrepo = Git.init().setDirectory(mainrepoDir).call()) { mainrepo.submoduleAdd().setPath("sub").setURI(subrepoDir.toURI().toString()).call(); - mainCommit = mainrepo.commit().setAll(true).setMessage("created a submodule").call(); + mainCommit = commit(mainrepo).setMessage("created a submodule").call(); } mockServerClient @@ -73,4 +75,8 @@ public void specifySubmodule() throws Exception { commitIngester.launchableToken = "v1:testorg/testws:dummy-token"; commitIngester.run(); } + + private CommitCommand commit(Git r) { + return r.commit().setAll(true).setSign(false); + } } diff --git a/src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java b/src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java index 52b73d527..3c655a95c 100644 --- a/src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java +++ b/src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java @@ -13,7 +13,7 @@ public class ProgressReportingConsumerTest { @Test public void basic() throws IOException { List done = new ArrayList<>(); - try (ProgressReportingConsumer x = new ProgressReportingConsumer<>(flushableConsumer(s -> {done.add(s);sleep();}), String::valueOf, Duration.ofMillis(100))) { + try (ProgressReportingConsumer x = new ProgressReportingConsumer<>(FlushableConsumer.of(s -> {done.add(s);sleep();}), String::valueOf, Duration.ofMillis(100))) { for (int i = 0; i < 100; i++) { x.accept("item " + i); } @@ -28,18 +28,4 @@ private static void sleep() { throw new UnsupportedOperationException(); } } - - private FlushableConsumer flushableConsumer(Consumer c) { - return new FlushableConsumer() { - @Override - public void flush() throws IOException { - // noop - } - - @Override - public void accept(T t) { - c.accept(t); - } - }; - } }