From c0db3b8cdba66eb6872315af7c9665fd08e2cfdb Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 5 Dec 2025 18:06:15 -0800 Subject: [PATCH 1/2] fix-gcs-batch-delete-fixture --- .../gcs/GoogleCloudStorageHttpHandler.java | 61 +++-- .../java/fixture/gcs/MultipartContent.java | 253 ++++++++++++++++++ .../java/fixture/gcs/MultipartUpload.java | 156 +---------- .../GoogleCloudStorageHttpHandlerTests.java | 19 +- ...dTests.java => MultipartContentTests.java} | 89 +++--- 5 files changed, 369 insertions(+), 209 deletions(-) create mode 100644 test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartContent.java rename test/fixtures/gcs-fixture/src/test/java/fixture/gcs/{MultipartUploadTests.java => MultipartContentTests.java} (50%) diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index 7a888afac206e..20444c97000b0 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -24,13 +24,14 @@ import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.URLDecoder; import java.util.HashMap; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static fixture.gcs.MockGcsBlobStore.failAndThrow; @@ -153,22 +154,24 @@ public void handle(final HttpExchange exchange) throws IOException { } } else if (Regex.simpleMatch("POST /batch/storage/v1", request)) { - // Batch https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch - final String uri = "/storage/v1/b/" + bucket + "/o/"; - final StringBuilder batch = new StringBuilder(); - for (String line : Streams.readAllLines(requestBody.streamInput())) { - if (line.isEmpty() || line.startsWith("--") || line.toLowerCase(Locale.ROOT).startsWith("content")) { - batch.append(line).append("\r\n"); - } else if (line.startsWith("DELETE")) { - final String name = line.substring(line.indexOf(uri) + uri.length(), line.lastIndexOf(" HTTP")); - if (Strings.hasText(name)) { - mockGcsBlobStore.deleteBlob(URLDecoder.decode(name, UTF_8)); - batch.append("HTTP/1.1 204 NO_CONTENT").append("\r\n"); - batch.append("\r\n"); - } - } + // https://docs.cloud.google.com/storage/docs/batch#http + final var boundary = MultipartContent.Reader.getBoundary(exchange); + final var batchReader = MultipartContent.Reader.readStream(boundary, requestBody.streamInput()); + final var responseStream = new ByteArrayOutputStream(); + final var batchWriter = new MultipartContent.Writer(boundary, responseStream); + while (batchReader.hasNext()) { + final var batchItem = batchReader.next(); + final var contentId = batchItem.headers().get("content-id"); + // only deletes are supported in batch + final var objectName = parseBatchItemDeleteObject(bucket, batchItem.content()); + mockGcsBlobStore.deleteBlob(objectName); + final var responsePartContent = "HTTP/1.1 204 No Content\r\n\r\n"; + final var responsePartHeaders = Map.of("content-type", "application/http", "content-id", "response-" + contentId); + batchWriter.write(MultipartContent.Part.of(responsePartHeaders, responsePartContent)); } - byte[] response = batch.toString().getBytes(UTF_8); + batchWriter.end(); + + byte[] response = responseStream.toByteArray(); exchange.getResponseHeaders().add("Content-Type", exchange.getRequestHeaders().getFirst("Content-Type")); exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.getResponseBody().write(response); @@ -255,6 +258,32 @@ private void writeBlobVersionAsJson(HttpExchange exchange, MockGcsBlobStore.Blob } } + // Example of DELETE batch item status line + // DELETE http://127.0.0.1:49177/storage/v1/b/bucket/o/test/tests-vQzflxz2Swa_bhmlM6gtyA/data-5odMgVMYTbKAI6DxS0qi-A.dat HTTP/1.1"; + static final Pattern BATCH_ITEM_HTTP_LINE = Pattern.compile( + "(?\\w+) (.+)/storage/v1/b/(?.+)/o/(?.+) HTTP/1\\.1" + ); + + static String parseBatchItemDeleteObject(String bucket, BytesReference bytes) { + final var s = bytes.utf8ToString(); + return s.lines().findFirst().map(line -> { + var matcher = BATCH_ITEM_HTTP_LINE.matcher(line); + if (matcher.find() == false) { + throw new IllegalStateException("Cannot parse batch item HTTP line: " + line); + } + var method = matcher.group("method"); + if (method.equals("DELETE") == false) { + throw new IllegalStateException("Expected DELETE item, found " + line); + } + var _bucket = matcher.group("bucket"); + if (bucket.equals(_bucket) == false) { + throw new IllegalStateException("Bucket does not match expected: " + bucket + ", got: " + _bucket); + } + return URLDecoder.decode(matcher.group("object"), UTF_8); + + }).orElseThrow(); + } + record ListBlobsResponse(String bucket, MockGcsBlobStore.PageOfBlobs pageOfBlobs) implements ToXContent { @Override diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartContent.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartContent.java new file mode 100644 index 0000000000000..647201c07a1e2 --- /dev/null +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartContent.java @@ -0,0 +1,253 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package fixture.gcs; + +import com.sun.net.httpserver.HttpExchange; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Locale; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.zip.GZIPInputStream; + +/** + * Multipart content + * + * Every part has own headers and content. Parts are separated by dash-boundary(--boundary) delimiter, + * and boundary is defined in the HTTP header Content-Type, + * like this {@code multipart/related; boundary=__END_OF_PART__4914cd49-4065-44f6-9846-ce805fe1e77f__}. + * Last part, close-delimiter, is dashed from both sides {@code --boundary--}. + * Part headers are separated from the content by double CRLF. + * More details here rfc2046. + * + *
+ *     {@code
+ * --boundary CRLF // with headers and content
+ * header: value CRLF
+ * header: value CRLF
+ * CRLF
+ * content CRLF // no headers
+ * --boundary CRLF
+ * CRLF
+ * content CRLF
+ * --boundary CRLF // no headers and no content
+ * CRLF
+ * --boundary--
+ *     }
+ * 
+ */ +public class MultipartContent { + static final byte[] CRLF = new byte[] { '\r', '\n' }; + static final byte[] DOUBLE_DASH = new byte[] { '-', '-' }; + + record Part(Map headers, BytesReference content) { + public static Part of(Map headers, String content) { + return new Part(headers, new BytesArray(content)); + } + } + + static class Writer { + private final OutputStream output; + private final byte[] boundary; + private boolean done; + + Writer(String boundary, OutputStream output) { + this.output = output; + this.boundary = boundary.getBytes(StandardCharsets.UTF_8); + this.done = false; + } + + void write(Part part) throws IOException { + if (done) { + throw new IllegalStateException("cannot write part after close-delimiter"); + } + output.write(DOUBLE_DASH); + output.write(boundary); + output.write(CRLF); + for (var header : part.headers.entrySet()) { + output.write(header.getKey().getBytes(StandardCharsets.UTF_8)); + output.write(": ".getBytes(StandardCharsets.UTF_8)); + output.write(header.getValue().getBytes(StandardCharsets.UTF_8)); + output.write(CRLF); + } + output.write(CRLF); + if (part.content != null && part.content.length() > 0) { + var ref = part.content.toBytesRef(); + output.write(ref.bytes, ref.offset, ref.length); + output.write(CRLF); + } + } + + void end() throws IOException { + output.write(DOUBLE_DASH); + output.write(boundary); + output.write(DOUBLE_DASH); + output.close(); + done = true; + } + } + + static class Reader implements Iterator { + + static final Pattern BOUNDARY_HEADER_PATTERN = Pattern.compile("multipart/\\w+; boundary=\\\"?(.*)\\\"?"); + + private final InputStream input; + private final byte[] delimiter; + private boolean done; + + Reader(byte[] delimiter, InputStream input) { + this.input = input; + this.delimiter = delimiter; + } + + public static Reader readGzipStream(HttpExchange exchange, InputStream gzipInput) throws IOException { + return readGzipStream(getBoundary(exchange), gzipInput); + } + + public static Reader readGzipStream(String boundary, InputStream gzipInput) throws IOException { + return readStream(boundary, new GZIPInputStream(gzipInput)); + } + + public static String getBoundary(HttpExchange exchange) { + var m = BOUNDARY_HEADER_PATTERN.matcher(exchange.getRequestHeaders().getFirst("Content-Type")); + if (m.matches() == false) { + throw new IllegalStateException("boundary header is not present"); + } + return m.group(1); + } + + // for testing + static Reader readStream(String boundary, InputStream input) throws IOException { + byte[] dashBoundary = ("--" + boundary).getBytes(); + // read first boundary + skipUntilDelimiter(input, dashBoundary); + if (readCloseDelimiterOrCRLF(input)) { + throw new IllegalStateException("multipart content must have at least one part"); + } + var delimiter = ("\r\n--" + boundary).getBytes(); + return new Reader(delimiter, input); + } + + /** + * Must call after reading body-part-delimiter to see if there are more parts. + * If there are no parts, a closing double dash is expected, otherwise CRLF. + */ + static boolean readCloseDelimiterOrCRLF(InputStream is) throws IOException { + var d1 = is.read(); + var d2 = is.read(); + if (d1 == '-' && d2 == '-') { + return true; + } else if (d1 == '\r' && d2 == '\n') { + return false; + } else { + throw new IllegalStateException("expect '--' or CRLF, got " + d1 + " " + d2); + } + } + + /** + * Read bytes from stream into buffer until reach given delimiter. The delimiter is consumed too. + */ + static BytesReference readUntilDelimiter(InputStream is, byte[] delimiter) throws IOException { + assert delimiter.length > 0; + var out = new ByteArrayOutputStream(1024); + var delimiterMatchLen = 0; + while (true) { + var c = is.read(); + if (c == -1) { + throw new IllegalStateException("expected delimiter, but reached end of stream "); + } + var b = (byte) c; + out.write(b); + if (delimiter[delimiterMatchLen] == b) { + delimiterMatchLen++; + if (delimiterMatchLen >= delimiter.length) { + var bytes = out.toByteArray(); + return new BytesArray(bytes, 0, bytes.length - delimiter.length); + } + } else { + if (delimiter[0] == b) { + delimiterMatchLen = 1; + } else { + delimiterMatchLen = 0; + } + } + } + } + + /** + * Discard bytes from stream until reach given delimiter. The delimiter is consumed too. + */ + static void skipUntilDelimiter(InputStream is, byte[] delimiter) throws IOException { + assert delimiter.length > 0; + var delimiterMatchLen = 0; + while (true) { + var c = is.read(); + if (c == -1) { + throw new IllegalStateException("expected delimiter, but reached end of stream "); + } + var b = (byte) c; + if (delimiter[delimiterMatchLen] == b) { + delimiterMatchLen++; + if (delimiterMatchLen >= delimiter.length) { + return; + } + } else { + if (delimiter[0] == b) { + delimiterMatchLen = 1; + } else { + delimiterMatchLen = 0; + } + } + } + } + + @Override + public boolean hasNext() { + return done == false; + } + + @Override + public Part next() { + if (done) { + return null; + } + try { + final var partBytes = readUntilDelimiter(input, delimiter); + done = readCloseDelimiterOrCRLF(input); + if (partBytes.length() == 0) { + return new Part(Map.of(), BytesArray.EMPTY); + } + + var partStream = partBytes.streamInput(); + + final var headers = new HashMap(); + BytesReference headerLine; + while ((headerLine = readUntilDelimiter(partStream, CRLF)).length() > 0) { + final var split = headerLine.utf8ToString().split(": ", 2); + headers.put(split[0].toLowerCase(Locale.ROOT), split[1]); + } + + return new Part(headers, new BytesArray(partStream.readAllBytes())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + +} diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java index 9dba250a207b5..514ebff84d69e 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java @@ -11,65 +11,27 @@ import com.sun.net.httpserver.HttpExchange; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.Iterator; import java.util.regex.Pattern; -import java.util.zip.GZIPInputStream; public record MultipartUpload(String bucket, String name, String generation, String crc32, String md5, BytesReference content) { - static final byte[] BODY_PART_HEADERS_DELIMITER = new byte[] { '\r', '\n', '\r', '\n' }; static final Pattern METADATA_PATTERN = Pattern.compile("\"(bucket|name|generation|crc32c|md5Hash)\":\"([^\"]*)\""); - static final Pattern BOUNDARY_HEADER_PATTERN = Pattern.compile("multipart/\\w+; boundary=\\\"?(.*)\\\"?"); /** * Reads HTTP content of MultipartUpload. First part is always json metadata, followed by binary parts. - * Every part has own headers and content. Parts are separated by dash-boundary(--boundary) delimiter, - * and boundary is defined in the HTTP header Content-Type, - * like this {@code multipart/related; boundary=__END_OF_PART__4914cd49-4065-44f6-9846-ce805fe1e77f__}. - * Last part, close-delimiter, is dashed from both sides {@code --boundary--}. - * Part headers are separated from the content by double CRLF. - * More details here rfc2046. - * - *
-     *     {@code
-     * --boundary CRLF
-     * headers CRLF
-     * CRLF
-     * content CRLF
-     * --boundary CRLF
-     * headers CRLF
-     * CRLF
-     * content CRLF
-     * --boundary--
-     *     }
-     * 
*/ public static MultipartUpload parseBody(HttpExchange exchange, InputStream gzipInput) throws IOException { - var m = BOUNDARY_HEADER_PATTERN.matcher(exchange.getRequestHeaders().getFirst("Content-Type")); - if (m.matches() == false) { - throw new IllegalStateException("boundary header is not present"); - } - var boundary = m.group(1); - try (var input = new GZIPInputStream(gzipInput)) { - return parseBody(boundary, input); - } - } - - // for tests - static MultipartUpload parseBody(String boundary, InputStream input) throws IOException { - var reader = new MultipartContentReader(boundary, input); + final var reader = MultipartContent.Reader.readGzipStream(exchange, gzipInput); // read first body-part - blob metadata json - var metadataBytes = reader.next(); - var match = METADATA_PATTERN.matcher(metadataBytes.utf8ToString()); + final var firstPart = reader.next(); + final var match = METADATA_PATTERN.matcher(firstPart.content().utf8ToString()); String bucket = "", name = "", gen = "", crc = "", md5 = ""; while (match.find()) { switch (match.group(1)) { @@ -82,119 +44,13 @@ static MultipartUpload parseBody(String boundary, InputStream input) throws IOEx } // read and combine remaining parts - var blobParts = new ArrayList(); + final var blobParts = new ArrayList(); while (reader.hasNext()) { - blobParts.add(reader.next()); + blobParts.add(reader.next().content()); } - var compositeBuf = CompositeBytesReference.of(blobParts.toArray(new BytesReference[0])); + final var compositeBuf = CompositeBytesReference.of(blobParts.toArray(new BytesReference[0])); return new MultipartUpload(bucket, name, gen, crc, md5, compositeBuf); } - /** - * Must call after reading body-part-delimiter to see if there are more parts. - * If there are no parts, a closing double dash is expected, otherwise CRLF. - */ - static boolean readCloseDelimiterOrCRLF(InputStream is) throws IOException { - var d1 = is.read(); - var d2 = is.read(); - if (d1 == '-' && d2 == '-') { - return true; - } else if (d1 == '\r' && d2 == '\n') { - return false; - } else { - throw new IllegalStateException("expect '--' or CRLF, got " + d1 + " " + d2); - } - } - - /** - * Read bytes from stream into buffer until reach given delimiter. The delimiter is consumed too. - */ - static BytesReference readUntilDelimiter(InputStream is, byte[] delimiter) throws IOException { - assert delimiter.length > 0; - var out = new ByteArrayOutputStream(1024); - var delimiterMatchLen = 0; - while (true) { - var c = is.read(); - if (c == -1) { - throw new IllegalStateException("expected delimiter, but reached end of stream "); - } - var b = (byte) c; - out.write(b); - if (delimiter[delimiterMatchLen] == b) { - delimiterMatchLen++; - if (delimiterMatchLen >= delimiter.length) { - var bytes = out.toByteArray(); - return new BytesArray(bytes, 0, bytes.length - delimiter.length); - } - } else { - if (delimiter[0] == b) { - delimiterMatchLen = 1; - } else { - delimiterMatchLen = 0; - } - } - } - } - - /** - * Discard bytes from stream until reach given delimiter. The delimiter is consumed too. - */ - static void skipUntilDelimiter(InputStream is, byte[] delimiter) throws IOException { - assert delimiter.length > 0; - var delimiterMatchLen = 0; - while (true) { - var c = is.read(); - if (c == -1) { - throw new IllegalStateException("expected delimiter, but reached end of stream "); - } - var b = (byte) c; - if (delimiter[delimiterMatchLen] == b) { - delimiterMatchLen++; - if (delimiterMatchLen >= delimiter.length) { - return; - } - } else { - if (delimiter[0] == b) { - delimiterMatchLen = 1; - } else { - delimiterMatchLen = 0; - } - } - } - } - - /** - * Multipart content iterator. - */ - static class MultipartContentReader implements Iterator { - private final InputStream input; - private final byte[] bodyPartDelimiter; - private boolean done; - - MultipartContentReader(String boundary, InputStream input) throws IOException { - this.input = input; - this.bodyPartDelimiter = ("\r\n--" + boundary).getBytes(); - byte[] dashBoundary = ("--" + boundary).getBytes(); - skipUntilDelimiter(input, dashBoundary); - readCloseDelimiterOrCRLF(input); - } - - @Override - public boolean hasNext() { - return done == false; - } - - @Override - public BytesReference next() { - try { - skipUntilDelimiter(input, BODY_PART_HEADERS_DELIMITER); - BytesReference buf = readUntilDelimiter(input, bodyPartDelimiter); - done = readCloseDelimiterOrCRLF(input); - return buf; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } } diff --git a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java index 2185f495ea81f..0540d05467463 100644 --- a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java +++ b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java @@ -106,24 +106,19 @@ public void testSimpleObjectOperations() { assertEquals( new TestHttpResponse(RestStatus.OK, """ --$boundary - Content-Length: 168 - Content-Type: application/http - content-id: 1 - content-transfer-encoding: binary + content-type: application/http + content-id: response-1 - HTTP/1.1 204 NO_CONTENT + HTTP/1.1 204 No Content - - - --$boundary-- - """.replace("\n", "\r\n").replace("$boundary", boundary)), + --$boundary--""".replace("\n", "\r\n").replace("$boundary", boundary)), handleRequest( handler, "POST", "/batch/storage/v1", createBatchDeleteRequest(bucket, boundary, blobName), - Headers.of("Content-Type", "mixed/multipart") + Headers.of("Content-Type", "multipart/mixed; boundary=" + boundary) ) ); assertEquals( @@ -133,7 +128,7 @@ public void testSimpleObjectOperations() { "POST", "/batch/storage/v1", createBatchDeleteRequest(bucket, boundary, blobName), - Headers.of("Content-Type", "mixed/multipart") + Headers.of("Content-Type", "multipart/mixed; boundary=" + boundary) ).restStatus() ); @@ -847,7 +842,7 @@ private static String createBatchDeleteRequest(String bucketName, String boundar content-transfer-encoding: binary %s - """.replace("$boundary", boundary); + """.replace("\n", "\r\n").replace("$boundary", boundary); StringBuilder builder = new StringBuilder(); AtomicInteger contentId = new AtomicInteger(); Arrays.stream(paths).forEach(p -> { diff --git a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartContentTests.java similarity index 50% rename from test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java rename to test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartContentTests.java index 4b7fde0e6391c..80d7f03f62a75 100644 --- a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java +++ b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartContentTests.java @@ -12,58 +12,85 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; -import org.elasticsearch.common.io.stream.ByteArrayStreamInput; import org.elasticsearch.test.ESTestCase; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.stream.IntStream; +import static fixture.gcs.MultipartContent.Reader.readUntilDelimiter; +import static fixture.gcs.MultipartContent.Reader.skipUntilDelimiter; import static java.nio.charset.StandardCharsets.UTF_8; import static org.elasticsearch.common.bytes.BytesReferenceTestUtils.equalBytes; -public class MultipartUploadTests extends ESTestCase { +public class MultipartContentTests extends ESTestCase { // produces content that does not contain boundary - static String randomPartContent(int len, String boundary) { + static BytesReference randomPartContent(int len, String boundary) { assert len > 0 && boundary.isEmpty() == false; var content = randomAlphanumericOfLength(len); var replacement = boundary.getBytes(UTF_8); replacement[0]++; // change single char to make it different from original - return content.replace(boundary, Arrays.toString(replacement)); + content = content.replace(boundary, Arrays.toString(replacement)); + return new BytesArray(content); } - public void testGenericMultipart() throws IOException { - var boundary = randomAlphanumericOfLength(between(1, 70)); - var part1 = "plain text\nwith line break"; - var part2 = ""; - var part3 = randomPartContent(between(1, 1024), boundary); - var strInput = """ - --$boundary\r - \r - \r - $part1\r - --$boundary\r - X-Header: x-man\r - \r - $part2\r - --$boundary\r - Content-Type: application/octet-stream\r - \r - $part3\r - --$boundary--""".replace("$boundary", boundary).replace("$part1", part1).replace("$part2", part2).replace("$part3", part3); - - var reader = new MultipartUpload.MultipartContentReader(boundary, new ByteArrayStreamInput(strInput.getBytes())); - assertEquals(part1, reader.next().utf8ToString()); - assertEquals(part2, reader.next().utf8ToString()); - assertEquals(part3, reader.next().utf8ToString()); - assertFalse(reader.hasNext()); + static Map randomHeaders() { + final var headers = new HashMap(); + final var numberOfHeaders = between(0, 10); + for (var headerNum = 0; headerNum < numberOfHeaders; headerNum++) { + headers.put("x-" + randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT), randomAlphanumericOfLength(between(1, 100))); + } + return headers; + } + + static MultipartContent.Part randomPart(String boundary) { + return new MultipartContent.Part(randomHeaders(), randomPartContent(between(1, 1024), boundary)); + } + + public void testEmptyPart() throws IOException { + final var boundary = randomAlphanumericOfLength(between(1, 70)); + final var output = new ByteArrayOutputStream(); + final var writer = new MultipartContent.Writer(boundary, output); + final var emptyPart = new MultipartContent.Part(Map.of(), BytesArray.EMPTY); + writer.write(emptyPart); + writer.end(); + final var reader = MultipartContent.Reader.readStream(boundary, new ByteArrayInputStream(output.toByteArray())); + assertEquals(emptyPart, reader.next()); + } + + public void testWriteAndReadParts() throws IOException { + final var boundary = randomAlphanumericOfLength(between(1, 70)); + final var output = new ByteArrayOutputStream(); + final var writer = new MultipartContent.Writer(boundary, output); + final var writeParts = IntStream.range(0, 100).mapToObj(i -> randomPart(boundary)).toList(); + for (var part : writeParts) { + writer.write(part); + } + writer.end(); + + final var input = new ByteArrayInputStream(output.toByteArray()); + final var reader = MultipartContent.Reader.readStream(boundary, input); + final var readParts = new ArrayList(); + while (reader.hasNext()) { + readParts.add(reader.next()); + } + for (int i = 0; i < writeParts.size(); i++) { + assertEquals(writeParts.get(i), readParts.get(i)); + } } public void testReadUntilDelimiter() throws IOException { for (int run = 0; run < 100; run++) { var delimitedContent = DelimitedContent.randomContent(); var inputStream = delimitedContent.toBytesReference().streamInput(); - var readBytes = MultipartUpload.readUntilDelimiter(inputStream, delimitedContent.delimiter); + var readBytes = readUntilDelimiter(inputStream, delimitedContent.delimiter); assertThat(readBytes, equalBytes(new BytesArray(delimitedContent.before))); var readRemaining = inputStream.readAllBytes(); assertArrayEquals(delimitedContent.after, readRemaining); @@ -74,7 +101,7 @@ public void testSkipUntilDelimiter() throws IOException { for (int run = 0; run < 100; run++) { var delimitedContent = DelimitedContent.randomContent(); var inputStream = delimitedContent.toBytesReference().streamInput(); - MultipartUpload.skipUntilDelimiter(inputStream, delimitedContent.delimiter); + skipUntilDelimiter(inputStream, delimitedContent.delimiter); var readRemaining = inputStream.readAllBytes(); assertArrayEquals(delimitedContent.after, readRemaining); } From a73b30333fcb43105c15c3d1d85f6ca17c7e709f Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 12 Dec 2025 11:42:07 -0800 Subject: [PATCH 2/2] headers order --- .../fixture/gcs/GoogleCloudStorageHttpHandler.java | 8 +++++++- .../src/main/java/fixture/gcs/MultipartContent.java | 12 ++++++------ .../test/java/fixture/gcs/MultipartContentTests.java | 10 +++++----- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index 20444c97000b0..fae8d1f7a09fb 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.net.URLDecoder; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; @@ -165,8 +166,13 @@ public void handle(final HttpExchange exchange) throws IOException { // only deletes are supported in batch final var objectName = parseBatchItemDeleteObject(bucket, batchItem.content()); mockGcsBlobStore.deleteBlob(objectName); + final var responsePartHeaders = new LinkedHashMap() { + { + put("content-type", "application/http"); + put("content-id", "response-" + contentId); + } + }; final var responsePartContent = "HTTP/1.1 204 No Content\r\n\r\n"; - final var responsePartHeaders = Map.of("content-type", "application/http", "content-id", "response-" + contentId); batchWriter.write(MultipartContent.Part.of(responsePartHeaders, responsePartContent)); } batchWriter.end(); diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartContent.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartContent.java index 647201c07a1e2..1f96ccae921b3 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartContent.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartContent.java @@ -19,10 +19,10 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; -import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Locale; -import java.util.Map; +import java.util.SequencedMap; import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; @@ -56,8 +56,8 @@ public class MultipartContent { static final byte[] CRLF = new byte[] { '\r', '\n' }; static final byte[] DOUBLE_DASH = new byte[] { '-', '-' }; - record Part(Map headers, BytesReference content) { - public static Part of(Map headers, String content) { + record Part(SequencedMap headers, BytesReference content) { + public static Part of(SequencedMap headers, String content) { return new Part(headers, new BytesArray(content)); } } @@ -231,12 +231,12 @@ public Part next() { final var partBytes = readUntilDelimiter(input, delimiter); done = readCloseDelimiterOrCRLF(input); if (partBytes.length() == 0) { - return new Part(Map.of(), BytesArray.EMPTY); + return new Part(new LinkedHashMap<>(), BytesArray.EMPTY); } var partStream = partBytes.streamInput(); - final var headers = new HashMap(); + final var headers = new LinkedHashMap(); BytesReference headerLine; while ((headerLine = readUntilDelimiter(partStream, CRLF)).length() > 0) { final var split = headerLine.utf8ToString().split(": ", 2); diff --git a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartContentTests.java b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartContentTests.java index 80d7f03f62a75..181130b1ec6e8 100644 --- a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartContentTests.java +++ b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartContentTests.java @@ -19,9 +19,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Locale; -import java.util.Map; +import java.util.SequencedMap; import java.util.stream.IntStream; import static fixture.gcs.MultipartContent.Reader.readUntilDelimiter; @@ -41,8 +41,8 @@ static BytesReference randomPartContent(int len, String boundary) { return new BytesArray(content); } - static Map randomHeaders() { - final var headers = new HashMap(); + static SequencedMap randomHeaders() { + final var headers = new LinkedHashMap(); final var numberOfHeaders = between(0, 10); for (var headerNum = 0; headerNum < numberOfHeaders; headerNum++) { headers.put("x-" + randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT), randomAlphanumericOfLength(between(1, 100))); @@ -58,7 +58,7 @@ public void testEmptyPart() throws IOException { final var boundary = randomAlphanumericOfLength(between(1, 70)); final var output = new ByteArrayOutputStream(); final var writer = new MultipartContent.Writer(boundary, output); - final var emptyPart = new MultipartContent.Part(Map.of(), BytesArray.EMPTY); + final var emptyPart = new MultipartContent.Part(new LinkedHashMap<>(), BytesArray.EMPTY); writer.write(emptyPart); writer.end(); final var reader = MultipartContent.Reader.readStream(boundary, new ByteArrayInputStream(output.toByteArray()));