diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index d6d7ed93b4..86384f41c4 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -1544,6 +1544,14 @@ boolean localReset(final H2Error error) throws IOException { return localReset(error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode()); } + @Override + public void terminate() { + try { + localReset(H2Error.INTERNAL_ERROR); + } catch (final IOException ignore) { + } + } + @Override public boolean cancel() { try { diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2StreamChannel.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2StreamChannel.java index d200745c54..e30a17cb89 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2StreamChannel.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2StreamChannel.java @@ -43,4 +43,6 @@ interface H2StreamChannel extends DataStreamChannel, CapacityChannel, Cancellabl void push(List
headers, AsyncPushProducer pushProducer) throws HttpException, IOException; + void terminate(); + } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamHandler.java index efdef785e6..55316d0a94 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamHandler.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamHandler.java @@ -133,6 +133,11 @@ public void pushPromise( commitPromise(promise, pushProducer); } + @Override + public void terminateExchange() { + terminate(); + } + }; this.httpProcessor = httpProcessor; this.connMetrics = connMetrics; @@ -206,6 +211,10 @@ private void commitPromise( connMetrics.incrementRequestCount(); } + private void terminate() { + outputChannel.terminate(); + } + @Override public void consumePromise(final List
headers) throws HttpException, IOException { throw new ProtocolException("Unexpected message promise"); diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushH2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushH2StreamHandler.java index e13bdf3769..6e4fc9d89b 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushH2StreamHandler.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushH2StreamHandler.java @@ -195,6 +195,10 @@ private void commitPromise( connMetrics.incrementRequestCount(); } + private void terminate() { + outputChannel.terminate(); + } + @Override public void produceOutput() throws HttpException, IOException { switch (responseState) { @@ -219,6 +223,11 @@ public void pushPromise( commitPromise(promise, pushProducer); } + @Override + public void terminateExchange() { + terminate(); + } + }, context); break; case BODY: diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/ClassicH2RequestExecutionExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/ClassicH2RequestExecutionExample.java new file mode 100644 index 0000000000..dedc4056df --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/ClassicH2RequestExecutionExample.java @@ -0,0 +1,155 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.examples; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.Future; + +import org.apache.hc.core5.annotation.Experimental; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; +import org.apache.hc.core5.http.nio.AsyncClientEndpoint; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncRequestProducer; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncResponseConsumer; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.frame.RawFrame; +import org.apache.hc.core5.http2.impl.nio.H2StreamListener; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.util.Timeout; + +/** + * Example of HTTP/2 request execution with a classic I/O API compatibility bridge + * that enables the use of standard {@link java.io.InputStream} / {@link java.io.OutputStream} + * based data consumers / producers. + *

> + * Execution of individual message exchanges is performed at the current thread. + */ +@Experimental +public class ClassicH2RequestExecutionExample { + + public static void main(final String[] args) throws Exception { + + // Create and start requester + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + .build(); + + final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap() + .setH2Config(h2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setStreamListener(new H2StreamListener() { + + @Override + public void onHeaderInput(final HttpConnection connection, final int streamId, final List headers) { + for (int i = 0; i < headers.size(); i++) { + System.out.println(connection.getRemoteAddress() + " (" + streamId + ") << " + headers.get(i)); + } + } + + @Override + public void onHeaderOutput(final HttpConnection connection, final int streamId, final List headers) { + for (int i = 0; i < headers.size(); i++) { + System.out.println(connection.getRemoteAddress() + " (" + streamId + ") >> " + headers.get(i)); + } + } + + @Override + public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) { + } + + @Override + public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) { + } + + @Override + public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) { + } + + @Override + public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) { + } + + }) + .create(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("HTTP requester shutting down"); + requester.close(CloseMode.GRACEFUL); + })); + requester.start(); + + final HttpHost target = new HttpHost("nghttp2.org"); + final Future future = requester.connect(target, Timeout.ofDays(5)); + final AsyncClientEndpoint clientEndpoint = future.get(); + + final String[] requestUris = new String[] {"/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"}; + + for (final String requestUri: requestUris) { + final ClassicHttpRequest request = ClassicRequestBuilder.get() + .setHttpHost(target) + .setPath(requestUri) + .build(); + + final ClassicToAsyncRequestProducer requestProducer = new ClassicToAsyncRequestProducer(request, Timeout.ofMinutes(5)); + final ClassicToAsyncResponseConsumer responseConsumer = new ClassicToAsyncResponseConsumer(Timeout.ofMinutes(5)); + + clientEndpoint.execute(requestProducer, responseConsumer, null); + + requestProducer.blockWaiting().execute(); + try (ClassicHttpResponse response = responseConsumer.blockWaiting()) { + System.out.println(requestUri + " -> " + response.getCode()); + final HttpEntity entity = response.getEntity(); + if (entity != null) { + final ContentType contentType = ContentType.parse(entity.getContentType()); + final Charset charset = ContentType.getCharset(contentType, StandardCharsets.UTF_8); + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(entity.getContent(), charset))) { + String line; + while ((line = reader.readLine()) != null) { + System.out.println(line); + } + } + } + } + } + + System.out.println("Shutting down I/O reactor"); + requester.initiateShutdown(); + } + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/ClassicH2ServerExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/ClassicH2ServerExample.java new file mode 100644 index 0000000000..d3d2607580 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/ClassicH2ServerExample.java @@ -0,0 +1,188 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.examples; + +import java.io.BufferedWriter; +import java.io.OutputStreamWriter; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.hc.core5.annotation.Experimental; +import org.apache.hc.core5.concurrent.DefaultThreadFactory; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.ProtocolException; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.impl.routing.RequestRouter; +import org.apache.hc.core5.http.io.HttpRequestHandler; +import org.apache.hc.core5.http.io.entity.EntityTemplate; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncServerExchangeHandler; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.frame.RawFrame; +import org.apache.hc.core5.http2.impl.nio.H2StreamListener; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.net.URIBuilder; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.util.TimeValue; + +/** + * Example of asynchronous embedded HTTP/2 server with a classic I/O API compatibility + * bridge that enables the use of standard {@link java.io.InputStream} / {@link java.io.OutputStream} + * based data consumers / producers. + *

> + * Execution of individual message exchanges is delegated to an {@link java.util.concurrent.Executor} + * backed by a pool of threads. + */ +@Experimental +public class ClassicH2ServerExample { + + public static void main(final String[] args) throws Exception { + int port = 8080; + if (args.length >= 1) { + port = Integer.parseInt(args[0]); + } + + final IOReactorConfig config = IOReactorConfig.custom() + .setSoTimeout(15, TimeUnit.SECONDS) + .setTcpNoDelay(true) + .build(); + + final ExecutorService executorService = Executors.newFixedThreadPool( + 25, + new DefaultThreadFactory("worker-pool", true)); + + final HttpRequestHandler requestHandler = (request, response, context) -> { + try { + final HttpEntity requestEntity = request.getEntity(); + if (requestEntity != null) { + EntityUtils.consume(requestEntity); + } + final Map queryParams = new URIBuilder(request.getUri()).getQueryParams().stream() + .collect(Collectors.toMap( + NameValuePair::getName, + NameValuePair::getValue, + (s, s2) -> s)); + final int n = Integer.parseInt(queryParams.getOrDefault("n", "10")); + final String p = queryParams.getOrDefault("pattern", "huh?"); + final HttpEntity responseEntity = new EntityTemplate( + ContentType.TEXT_PLAIN.withCharset(StandardCharsets.UTF_8), + outputStream -> { + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8))) { + for (int i = 0; i < n; i++) { + writer.write(p); + writer.write("\n"); + } + } + }); + response.setEntity(responseEntity); + } catch (final URISyntaxException ex) { + throw new ProtocolException("Invalid request URI", ex); + } catch (final NumberFormatException ex) { + throw new ProtocolException("Invalid query parameter", ex); + } + }; + + final RequestRouter requestRouter = RequestRouter.builder() + .resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER) + .addRoute(RequestRouter.LOCAL_AUTHORITY, "*", requestHandler) + .build(); + + final HttpAsyncServer server = H2ServerBootstrap.bootstrap() + .setIOReactorConfig(config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setStreamListener(new H2StreamListener() { + + @Override + public void onHeaderInput(final HttpConnection connection, final int streamId, final List headers) { + for (int i = 0; i < headers.size(); i++) { + System.out.println(connection.getRemoteAddress() + " (" + streamId + ") << " + headers.get(i)); + } + } + + @Override + public void onHeaderOutput(final HttpConnection connection, final int streamId, final List headers) { + for (int i = 0; i < headers.size(); i++) { + System.out.println(connection.getRemoteAddress() + " (" + streamId + ") >> " + headers.get(i)); + } + } + + @Override + public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) { + } + + @Override + public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) { + } + + @Override + public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) { + } + + @Override + public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) { + } + + }) + .setRequestRouter((request, context) -> { + final HttpRequestHandler handler = requestRouter.resolve(request, context); + return () -> new ClassicToAsyncServerExchangeHandler( + executorService, + handler, + e -> e.printStackTrace(System.out)); + }) + .setExceptionCallback(e -> e.printStackTrace(System.out)) + .create(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("HTTP server shutting down"); + server.close(CloseMode.GRACEFUL); + executorService.shutdownNow(); + })); + + server.start(); + final Future future = server.listen(new InetSocketAddress(port), URIScheme.HTTP); + final ListenerEndpoint listenerEndpoint = future.get(); + System.out.print("Listening on " + listenerEndpoint.getAddress()); + server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE)); + } + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ConscriptRequestExecutionExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ConscriptRequestExecutionExample.java index e33f4e248a..8edea1d6be 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ConscriptRequestExecutionExample.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ConscriptRequestExecutionExample.java @@ -114,12 +114,13 @@ public void onOutputFlowControl(final HttpConnection connection, final int strea requester.start(); final HttpHost target = new HttpHost("https", "nghttp2.org", 443); + final Future future = requester.connect(target, Timeout.ofDays(5)); + final AsyncClientEndpoint clientEndpoint = future.get(); + final String[] requestUris = new String[] {"/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"}; final CountDownLatch latch = new CountDownLatch(requestUris.length); for (final String requestUri: requestUris) { - final Future future = requester.connect(target, Timeout.ofDays(5)); - final AsyncClientEndpoint clientEndpoint = future.get(); clientEndpoint.execute( AsyncRequestBuilder.get() .setHttpHost(target) diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2RequestExecutionExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2RequestExecutionExample.java index 3653baeebe..4df72137f8 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2RequestExecutionExample.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2RequestExecutionExample.java @@ -105,12 +105,13 @@ public void onOutputFlowControl(final HttpConnection connection, final int strea requester.start(); final HttpHost target = new HttpHost("nghttp2.org"); + final Future future = requester.connect(target, Timeout.ofSeconds(5)); + final AsyncClientEndpoint clientEndpoint = future.get(); + final String[] requestUris = new String[] {"/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"}; final CountDownLatch latch = new CountDownLatch(requestUris.length); for (final String requestUri: requestUris) { - final Future future = requester.connect(target, Timeout.ofSeconds(5)); - final AsyncClientEndpoint clientEndpoint = future.get(); clientEndpoint.execute( AsyncRequestBuilder.get() .setHttpHost(target) diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingHttp1StreamListener.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingHttp1StreamListener.java index 72796c2c77..48195baad6 100644 --- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingHttp1StreamListener.java +++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingHttp1StreamListener.java @@ -34,11 +34,10 @@ import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.impl.Http1StreamListener; -import org.apache.hc.core5.http.message.RequestLine; import org.apache.hc.core5.http.message.StatusLine; import org.apache.hc.core5.testing.classic.LoggingSupport; -import org.slf4j.LoggerFactory; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LoggingHttp1StreamListener implements Http1StreamListener { @@ -61,7 +60,7 @@ private LoggingHttp1StreamListener(final Type type) { public void onRequestHead(final HttpConnection connection, final HttpRequest request) { if (headerLog.isDebugEnabled()) { final String idRequestDirection = LoggingSupport.getId(connection) + requestDirection; - headerLog.debug("{}{}", idRequestDirection, new RequestLine(request)); + headerLog.debug("{}{} {}", idRequestDirection, request.getMethod(), request.getRequestUri()); for (final Iterator

it = request.headerIterator(); it.hasNext(); ) { headerLog.debug("{}{}", idRequestDirection, it.next()); } diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/Result.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/Result.java similarity index 98% rename from httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/Result.java rename to httpcore5-testing/src/test/java/org/apache/hc/core5/testing/Result.java index 94c050c4f1..817bdc5572 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/Result.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/Result.java @@ -24,7 +24,7 @@ * . * */ -package org.apache.hc.core5.testing.compatibility; +package org.apache.hc.core5.testing; import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpResponse; diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/classic/ClassicHttpCompatTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/classic/ClassicHttpCompatTest.java index 3b8cd1bbc7..87651a688e 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/classic/ClassicHttpCompatTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/classic/ClassicHttpCompatTest.java @@ -46,8 +46,8 @@ import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.testing.Result; import org.apache.hc.core5.testing.compatibility.ContainerImages; -import org.apache.hc.core5.testing.compatibility.Result; import org.apache.hc.core5.testing.compatibility.TLSTestContexts; import org.apache.hc.core5.testing.extension.classic.HttpRequesterResource; import org.apache.hc.core5.util.Timeout; diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/nio/AsyncHttp2CompatTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/nio/AsyncHttp2CompatTest.java index e1f2fa5777..64c1d616c8 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/nio/AsyncHttp2CompatTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/nio/AsyncHttp2CompatTest.java @@ -53,7 +53,7 @@ import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap; import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy; import org.apache.hc.core5.reactor.IOReactorConfig; -import org.apache.hc.core5.testing.compatibility.Result; +import org.apache.hc.core5.testing.Result; import org.apache.hc.core5.testing.compatibility.TLSTestContexts; import org.apache.hc.core5.testing.extension.nio.H2AsyncRequesterResource; import org.apache.hc.core5.util.Timeout; diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/nio/AsyncHttpCompatTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/nio/AsyncHttpCompatTest.java index d87c5f12bd..e8509f69ff 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/nio/AsyncHttpCompatTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/nio/AsyncHttpCompatTest.java @@ -46,8 +46,8 @@ import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; import org.apache.hc.core5.http.nio.support.BasicRequestProducer; import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.testing.Result; import org.apache.hc.core5.testing.compatibility.ContainerImages; -import org.apache.hc.core5.testing.compatibility.Result; import org.apache.hc.core5.util.Timeout; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/nio/HttpBinAsyncCompatTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/nio/HttpBinAsyncCompatTest.java index 1e699f5659..cf5aca906a 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/nio/HttpBinAsyncCompatTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/nio/HttpBinAsyncCompatTest.java @@ -59,7 +59,7 @@ import org.apache.hc.core5.http2.impl.nio.bootstrap.H2AsyncRequester; import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap; import org.apache.hc.core5.reactor.IOReactorConfig; -import org.apache.hc.core5.testing.compatibility.Result; +import org.apache.hc.core5.testing.Result; import org.apache.hc.core5.testing.extension.nio.H2AsyncRequesterResource; import org.apache.hc.core5.util.Timeout; import org.junit.jupiter.api.Assertions; diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/classic/ExecutorResource.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/classic/ExecutorResource.java new file mode 100644 index 0000000000..30330014d4 --- /dev/null +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/classic/ExecutorResource.java @@ -0,0 +1,57 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.testing.extension.classic; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public class ExecutorResource implements AfterEachCallback { + + private final ExecutorService executorService; + + public ExecutorResource(final ExecutorService executorService) { + this.executorService = executorService; + } + + public ExecutorResource(final int nThreads) { + this.executorService = Executors.newFixedThreadPool(nThreads); + } + + public ExecutorService getExecutorService() { + return executorService; + } + + @Override + public void afterEach(final ExtensionContext extensionContext) throws Exception { + executorService.shutdownNow(); + } + +} diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/ClassicToAsyncHttp1TransportTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/ClassicToAsyncHttp1TransportTest.java new file mode 100644 index 0000000000..3842b1e72a --- /dev/null +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/ClassicToAsyncHttp1TransportTest.java @@ -0,0 +1,101 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.testing.nio; + +import java.net.InetSocketAddress; +import java.util.Random; +import java.util.concurrent.Future; + +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpVersion; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncRequestProducer; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncResponseConsumer; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +abstract class ClassicToAsyncHttp1TransportTest extends ClassicToAsyncTransportTest { + + public ClassicToAsyncHttp1TransportTest(final URIScheme scheme, final HttpVersion version) { + super(scheme, version); + } + + @ValueSource(ints = {0, 2048, 10240}) + @ParameterizedTest(name = "{displayName}; content length: {0}") + void test_request_handling_no_keep_alive(final int contentSize) throws Exception { + final HttpAsyncServer server = serverResource.start(); + registerHandler("/echo", () -> new EchoHandler(1024)); + + final Future future = server.listen(new InetSocketAddress(0), scheme); + final ListenerEndpoint listener = future.get(); + final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); + final HttpAsyncRequester requester = clientResource.start(); + + final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort()); + + final int n = 10; + + for (int i = 0; i < n; i++) { + final byte[] temp = new byte[contentSize]; + new Random(System.currentTimeMillis()).nextBytes(temp); + + final ClassicHttpRequest request = ClassicRequestBuilder.post() + .setHttpHost(target) + .setPath("/echo/") + .addHeader(HttpHeaders.CONNECTION, "Close") + .setEntity(new ByteArrayEntity(temp, ContentType.DEFAULT_BINARY)) + .build(); + + final ClassicToAsyncRequestProducer requestProducer = new ClassicToAsyncRequestProducer(request, TIMEOUT); + final ClassicToAsyncResponseConsumer responseConsumer = new ClassicToAsyncResponseConsumer(TIMEOUT); + + requester.execute(requestProducer, responseConsumer, TIMEOUT, null); + + requestProducer.blockWaiting().execute(); + + try (ClassicHttpResponse response = responseConsumer.blockWaiting()) { + Assertions.assertEquals(200, response.getCode()); + final byte[] bytes = EntityUtils.toByteArray(response.getEntity()); + Assertions.assertNotNull(bytes); + Assertions.assertArrayEquals(temp, bytes); + } + } + } + +} diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/ClassicToAsyncIntegrationTests.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/ClassicToAsyncIntegrationTests.java new file mode 100644 index 0000000000..44a89ce7fa --- /dev/null +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/ClassicToAsyncIntegrationTests.java @@ -0,0 +1,77 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.testing.nio; + +import org.apache.hc.core5.http.HttpVersion; +import org.apache.hc.core5.http.URIScheme; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; + +class ClassicToAsyncIntegrationTests { + + @Nested + @DisplayName("Classic over async transport (HTTP/1.1)") + class ClassicOverAsyncTransport extends ClassicToAsyncHttp1TransportTest { + + public ClassicOverAsyncTransport() { + super(URIScheme.HTTP, HttpVersion.HTTP_1_1); + } + + } + + @Nested + @DisplayName("Classic over async transport (HTTP/1.1, TLS)") + class ClassicOverAsyncTransportTls extends ClassicToAsyncHttp1TransportTest { + + public ClassicOverAsyncTransportTls() { + super(URIScheme.HTTPS, HttpVersion.HTTP_1_1); + } + + } + + @Nested + @DisplayName("Classic over async transport (HTTP/2)") + class ClassicOverAsyncTransportH2 extends ClassicToAsyncTransportTest { + + public ClassicOverAsyncTransportH2() { + super(URIScheme.HTTP, HttpVersion.HTTP_2); + } + + } + + @Nested + @DisplayName("Classic over async transport (HTTP/2, TLS)") + class ClassicOverAsyncTransportH2Tls extends ClassicToAsyncTransportTest { + + public ClassicOverAsyncTransportH2Tls() { + super(URIScheme.HTTPS, HttpVersion.HTTP_2); + } + + } + +} diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/ClassicToAsyncTransportTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/ClassicToAsyncTransportTest.java new file mode 100644 index 0000000000..e9c82c0c8a --- /dev/null +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/ClassicToAsyncTransportTest.java @@ -0,0 +1,391 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.testing.nio; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Queue; +import java.util.StringTokenizer; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.function.Supplier; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.HttpVersion; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.RequestNotExecutedException; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.impl.routing.RequestRouter; +import org.apache.hc.core5.http.io.HttpRequestHandler; +import org.apache.hc.core5.http.io.entity.EntityTemplate; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; +import org.apache.hc.core5.http.message.BasicHttpRequest; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncRequestProducer; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncResponseConsumer; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncServerExchangeHandler; +import org.apache.hc.core5.http.support.BasicRequestBuilder; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy; +import org.apache.hc.core5.http2.ssl.H2ServerTlsStrategy; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.testing.Result; +import org.apache.hc.core5.testing.SSLTestContexts; +import org.apache.hc.core5.testing.extension.classic.ExecutorResource; +import org.apache.hc.core5.testing.extension.nio.H2AsyncRequesterResource; +import org.apache.hc.core5.testing.extension.nio.H2AsyncServerResource; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +abstract class ClassicToAsyncTransportTest { + + static final Timeout TIMEOUT = Timeout.ofMinutes(1); + + final URIScheme scheme; + @RegisterExtension + final H2AsyncServerResource serverResource; + @RegisterExtension + final H2AsyncRequesterResource clientResource; + @RegisterExtension + final ExecutorResource executorResource; + + public ClassicToAsyncTransportTest(final URIScheme scheme, final HttpVersion version) { + this.scheme = scheme; + this.serverResource = new H2AsyncServerResource(); + this.serverResource.configure(bootstrap -> bootstrap + .setVersionPolicy(version.lessEquals(HttpVersion.HTTP_1_1) ? HttpVersionPolicy.FORCE_HTTP_1 : HttpVersionPolicy.FORCE_HTTP_2) + .setTlsStrategy(new H2ServerTlsStrategy(SSLTestContexts.createServerSSLContext())) + .setIOReactorConfig( + IOReactorConfig.custom() + .setSoTimeout(TIMEOUT) + .build()) + .setRequestRouter(RequestRouter.>builder() + .addRoute(RequestRouter.LOCAL_AUTHORITY, "*", () -> new EchoHandler(2048)) + .resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER) + .build()) + ); + this.clientResource = new H2AsyncRequesterResource(); + this.clientResource.configure(bootstrap -> bootstrap + .setVersionPolicy(version.lessEquals(HttpVersion.HTTP_1_1) ? HttpVersionPolicy.FORCE_HTTP_1 : HttpVersionPolicy.FORCE_HTTP_2) + .setTlsStrategy(new H2ClientTlsStrategy(SSLTestContexts.createClientSSLContext())) + .setIOReactorConfig(IOReactorConfig.custom() + .setSoTimeout(TIMEOUT) + .build()) + ); + this.executorResource = new ExecutorResource(5); + } + + void registerHandler( + final String pathPattern, + final Supplier requestHandlerSupplier) { + serverResource.configure(bootstrap -> bootstrap + .setRequestRouter(RequestRouter.>builder() + .addRoute(RequestRouter.LOCAL_AUTHORITY, pathPattern, requestHandlerSupplier) + .resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER) + .build()) + ); + } + + void registerHandler(final String pathPattern, final HttpRequestHandler requestHandler) { + registerHandler(pathPattern, () -> new ClassicToAsyncServerExchangeHandler( + executorResource.getExecutorService(), + requestHandler, + LoggingExceptionCallback.INSTANCE)); + } + + @Test + void test_request_execution() throws Exception { + final HttpAsyncServer server = serverResource.start(); + registerHandler("/echo", () -> new EchoHandler(1024)); + + final Future future = server.listen(new InetSocketAddress(0), scheme); + final ListenerEndpoint listener = future.get(); + final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); + final HttpAsyncRequester requester = clientResource.start(); + + final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort()); + + final int n = 10; + + for (int i = 0; i < n; i++) { + final ClassicHttpRequest request1 = ClassicRequestBuilder.post() + .setHttpHost(target) + .setPath("/echo") + .setEntity(new EntityTemplate( + -1, + ContentType.TEXT_PLAIN.withCharset(StandardCharsets.UTF_8), + null, + outputStream -> { + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8))) { + for (int ii = 0; ii < 500; ii++) { + writer.write("0123456789abcdef"); + writer.newLine(); + } + } + })) + .build(); + final ClassicToAsyncRequestProducer requestProducer = new ClassicToAsyncRequestProducer(request1, 16, TIMEOUT); + final ClassicToAsyncResponseConsumer responseConsumer = new ClassicToAsyncResponseConsumer(16, TIMEOUT); + + requester.execute(requestProducer, responseConsumer, TIMEOUT, null); + + requestProducer.blockWaiting().execute(); + + try (ClassicHttpResponse response = responseConsumer.blockWaiting()) { + final HttpEntity entity = response.getEntity(); + final ContentType contentType = ContentType.parse(entity.getContentType()); + final Charset charset = ContentType.getCharset(contentType, StandardCharsets.UTF_8); + + try (final InputStream inputStream = entity.getContent()) { + final StringBuilder buffer = new StringBuilder(); + final byte[] tmp = new byte[16]; + int l; + while ((l = inputStream.read(tmp)) != -1) { + buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l))); + } + final StringTokenizer t1 = new StringTokenizer(buffer.toString(), "\r\n"); + while (t1.hasMoreTokens()) { + Assertions.assertEquals("0123456789abcdef", t1.nextToken()); + } + } + } + } + } + + @ParameterizedTest(name = "method {0}") + @ValueSource(strings = {"GET", "POST", "HEAD"}) + void test_request_handling(final String method) throws Exception { + registerHandler("/hello", (request, response, context) -> { + final HttpEntity requestEntity = request.getEntity(); + if (requestEntity != null) { + EntityUtils.consume(requestEntity); + } + final ContentType contentType = ContentType.TEXT_PLAIN.withCharset(StandardCharsets.UTF_8); + final Charset charset = contentType.getCharset(); + final HttpEntity responseEntity = new EntityTemplate( + contentType, + outputStream -> { + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) { + for (int i = 0; i < 500; i++) { + writer.write("0123456789abcdef\r\n"); + } + } + }); + response.setEntity(responseEntity); + }); + + final HttpAsyncServer server = serverResource.start(); + final Future future = server.listen(new InetSocketAddress(0), scheme); + final ListenerEndpoint listener = future.get(); + final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); + final HttpAsyncRequester requester = clientResource.start(); + + final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort()); + + final int n = 10; + + final CountDownLatch countDownLatch = new CountDownLatch(n); + final Queue> resultQueue = new ConcurrentLinkedQueue<>(); + + for (int i = 0; i < n; i++) { + final BasicHttpRequest request1 = BasicRequestBuilder.create(method) + .setHttpHost(target) + .setPath("/hello") + .build(); + final AsyncEntityProducer entityProducer = Method.POST.isSame(method) ? + new MultiLineEntityProducer("xxxxxxxxxxxx", 250) : null; + + requester.execute( + new BasicRequestProducer(request1, entityProducer), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + TIMEOUT, + new FutureCallback>() { + + @Override + public void completed(final Message responseMessage) { + resultQueue.add(new Result<>( + request1, + responseMessage.getHead(), + responseMessage.getBody())); + countDownLatch.countDown(); + } + + @Override + public void failed(final Exception ex) { + resultQueue.add(new Result<>(request1, ex)); + countDownLatch.countDown(); + } + + @Override + public void cancelled() { + failed(new RequestNotExecutedException()); + } + + }); + } + + Assertions.assertTrue(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), "Request executions have not completed in time"); + for (final Result result : resultQueue) { + if (result.isOK()) { + Assertions.assertNotNull(result.response); + Assertions.assertEquals(HttpStatus.SC_OK, result.response.getCode(), "Response message returned non 200 status"); + if (Method.HEAD.isSame(method)) { + Assertions.assertNull(result.content); + } else { + Assertions.assertNotNull(result.content); + final StringTokenizer t1 = new StringTokenizer(result.content, "\r\n"); + while (t1.hasMoreTokens()) { + Assertions.assertEquals("0123456789abcdef", t1.nextToken()); + } + } + } else { + Assertions.fail(result.exception); + } + } + } + + @Test + void test_request_handling_full_streaming() throws Exception { + registerHandler("/echo", (request, response, context) -> { + final HttpEntity requestEntity = request.getEntity(); + final ContentType contentType = requestEntity != null ? ContentType.parseLenient(requestEntity.getContentType()) : ContentType.TEXT_PLAIN; + final Charset charset = contentType.getCharset(StandardCharsets.UTF_8); + final HttpEntity responseEntity; + if (requestEntity != null) { + responseEntity = new EntityTemplate( + contentType, + outputStream -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(requestEntity.getContent())); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) { + String line; + while ((line = reader.readLine()) != null) { + writer.write(line); + writer.newLine(); + } + } + }); + } else { + responseEntity = null; + } + response.setEntity(responseEntity); + }); + + final HttpAsyncServer server = serverResource.start(); + final Future future = server.listen(new InetSocketAddress(0), scheme); + final ListenerEndpoint listener = future.get(); + final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); + final HttpAsyncRequester requester = clientResource.start(); + + final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort()); + + final int n = 10; + + final CountDownLatch countDownLatch = new CountDownLatch(n); + final Queue> resultQueue = new ConcurrentLinkedQueue<>(); + + for (int i = 0; i < n; i++) { + final BasicHttpRequest request1 = BasicRequestBuilder.post() + .setHttpHost(target) + .setPath("/echo") + .build(); + final AsyncEntityProducer entityProducer = new MultiLineEntityProducer("0123456789abcdef", 500); + + requester.execute( + new BasicRequestProducer(request1, entityProducer), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + TIMEOUT, + new FutureCallback>() { + + @Override + public void completed(final Message responseMessage) { + resultQueue.add(new Result<>( + request1, + responseMessage.getHead(), + responseMessage.getBody())); + countDownLatch.countDown(); + } + + @Override + public void failed(final Exception ex) { + resultQueue.add(new Result<>(request1, ex)); + countDownLatch.countDown(); + } + + @Override + public void cancelled() { + failed(new RequestNotExecutedException()); + } + + }); + } + + Assertions.assertTrue(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), "Request executions have not completed in time"); + for (final Result result : resultQueue) { + if (result.isOK()) { + Assertions.assertNotNull(result.response); + Assertions.assertEquals(HttpStatus.SC_OK, result.response.getCode(), "Response message returned non 200 status"); + Assertions.assertNotNull(result.content); + final StringTokenizer t1 = new StringTokenizer(result.content, "\r\n"); + while (t1.hasMoreTokens()) { + Assertions.assertEquals("0123456789abcdef", t1.nextToken()); + } + } else { + Assertions.fail(result.exception); + } + } + } + +} diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/HttpIntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/HttpIntegrationTest.java index 5fd703cd70..e50eec7322 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/HttpIntegrationTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/HttpIntegrationTest.java @@ -27,13 +27,10 @@ package org.apache.hc.core5.testing.nio; -import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.io.InterruptedIOException; -import java.io.OutputStream; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -46,13 +43,17 @@ import java.util.Queue; import java.util.Random; import java.util.StringTokenizer; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.EntityDetails; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HeaderElements; +import org.apache.hc.core5.http.HttpEntity; import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.HttpHost; @@ -63,6 +64,11 @@ import org.apache.hc.core5.http.Method; import org.apache.hc.core5.http.ProtocolException; import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.routing.RequestRouter; +import org.apache.hc.core5.http.io.HttpRequestHandler; +import org.apache.hc.core5.http.io.entity.EntityTemplate; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; import org.apache.hc.core5.http.message.BasicHttpRequest; import org.apache.hc.core5.http.message.BasicHttpResponse; import org.apache.hc.core5.http.nio.AsyncEntityProducer; @@ -88,9 +94,9 @@ import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; import org.apache.hc.core5.http.nio.support.BasicResponseProducer; import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler; -import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer; -import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer; -import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncRequestProducer; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncResponseConsumer; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncServerExchangeHandler; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.support.BasicRequestBuilder; import org.apache.hc.core5.testing.extension.ExecutorResource; @@ -524,47 +530,37 @@ void testSlowResponseConsumer() throws Exception { final Future connectFuture = client.connect(target, TIMEOUT); final ClientSessionEndpoint streamEndpoint = connectFuture.get(); - final BasicHttpRequest request = BasicRequestBuilder.get() + final ClassicHttpRequest request = ClassicRequestBuilder.get() .setHttpHost(target) .setPath("/") .build(); + final ClassicToAsyncResponseConsumer responseConsumer = new ClassicToAsyncResponseConsumer(16, TIMEOUT); - final Future> future1 = streamEndpoint.execute( + streamEndpoint.execute( new BasicRequestProducer(request, null), - new BasicResponseConsumer<>(new AbstractClassicEntityConsumer(16, executorResource.getExecutorService()) { - - @Override - protected String consumeData( - final ContentType contentType, final InputStream inputStream) throws IOException { - final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII); - - final StringBuilder buffer = new StringBuilder(); - try { - final byte[] tmp = new byte[16]; - int l; - while ((l = inputStream.read(tmp)) != -1) { - buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l))); - Thread.sleep(500); - } - } catch (final InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new InterruptedIOException(ex.getMessage()); - } - return buffer.toString(); - } - }), + responseConsumer, null); - final Message result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit()); - Assertions.assertNotNull(result1); - final HttpResponse response1 = result1.getHead(); - Assertions.assertNotNull(response1); - Assertions.assertEquals(200, response1.getCode()); - final String s1 = result1.getBody(); - Assertions.assertNotNull(s1); - final StringTokenizer t1 = new StringTokenizer(s1, "\r\n"); - while (t1.hasMoreTokens()) { - Assertions.assertEquals("0123456789abcd", t1.nextToken()); + final Random random = new Random(); + + try (ClassicHttpResponse response = responseConsumer.blockWaiting()) { + final HttpEntity entity = response.getEntity(); + final ContentType contentType = ContentType.parse(entity.getContentType()); + final Charset charset = ContentType.getCharset(contentType, StandardCharsets.UTF_8); + + try (final InputStream inputStream = entity.getContent()) { + final StringBuilder buffer = new StringBuilder(); + final byte[] tmp = new byte[16]; + int l; + while ((l = inputStream.read(tmp)) != -1) { + buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l))); + Thread.sleep(100 + random.nextInt(400)); + } + final StringTokenizer t1 = new StringTokenizer(buffer.toString(), "\r\n"); + while (t1.hasMoreTokens()) { + Assertions.assertEquals("0123456789abcd", t1.nextToken()); + } + } } } @@ -582,33 +578,37 @@ void testSlowRequestProducer() throws Exception { final Future connectFuture = client.connect(target, TIMEOUT); final ClientSessionEndpoint streamEndpoint = connectFuture.get(); - final BasicHttpRequest request1 = BasicRequestBuilder.post() + final Random random = new Random(); + + final ClassicHttpRequest request1 = ClassicRequestBuilder.post() .setHttpHost(target) .setPath("/echo") - .build(); - - final Future> future1 = streamEndpoint.execute( - new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, executorResource.getExecutorService()) { - - @Override - protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException { - final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII); - try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) { - for (int i = 0; i < 500; i++) { - if (i % 100 == 0) { - writer.flush(); - Thread.sleep(500); + .setEntity(new EntityTemplate( + -1, + ContentType.TEXT_PLAIN.withCharset(StandardCharsets.UTF_8), + null, + outputStream -> { + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8))) { + for (int i = 0; i < 500; i++) { + if (i % 100 == 0) { + writer.flush(); + Thread.sleep(100 + random.nextInt(400)); + } + writer.write("0123456789abcdef\r\n"); } - writer.write("0123456789abcdef\r\n"); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(ex.getMessage()); } - } catch (final InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new InterruptedIOException(ex.getMessage()); - } - } + })) + .build(); + final ClassicToAsyncRequestProducer requestProducer = new ClassicToAsyncRequestProducer(request1, 16, TIMEOUT); - }), + final Future> future1 = streamEndpoint.execute( + requestProducer, new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null); + requestProducer.blockWaiting().execute(); + final Message result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit()); Assertions.assertNotNull(result1); final HttpResponse response1 = result1.getHead(); @@ -627,53 +627,42 @@ void testSlowResponseProducer() throws Exception { final HttpTestServer server = server(); final HttpTestClient client = client(); - server.register("*", () -> new AbstractClassicServerExchangeHandler(2048, executorResource.getExecutorService()) { - - @Override - protected void handle( - final HttpRequest request, - final InputStream requestStream, - final HttpResponse response, - final OutputStream responseStream, - final HttpContext context) throws IOException, HttpException { + final Random random = new Random(); - if (!"/hello".equals(request.getPath())) { - response.setCode(HttpStatus.SC_NOT_FOUND); - return; - } - if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) { - response.setCode(HttpStatus.SC_NOT_IMPLEMENTED); - return; - } - if (requestStream == null) { - return; - } - final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE); - final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null; - final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII); - response.setCode(HttpStatus.SC_OK); - response.setHeader(h1); - try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset)); - final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) { - try { - String l; - int count = 0; - while ((l = reader.readLine()) != null) { - writer.write(l); - writer.write("\r\n"); - count++; - if (count % 500 == 0) { - Thread.sleep(500); + final HttpRequestHandler requestHandler = (request, response, context) -> { + final HttpEntity requestEntity = request.getEntity(); + if (requestEntity != null) { + EntityUtils.consume(requestEntity); + } + final HttpEntity responseEntity = new EntityTemplate( + ContentType.TEXT_PLAIN.withCharset(StandardCharsets.UTF_8), + outputStream -> { + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8))) { + for (int i = 0; i < 500; i++) { + if (i % 100 == 0) { + writer.flush(); + Thread.sleep(100 + random.nextInt(400)); + } + writer.write("0123456789abcdef\r\n"); } + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(ex.getMessage()); } - writer.flush(); - } catch (final InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new InterruptedIOException(ex.getMessage()); - } - } - } - }); + }); + response.setEntity(responseEntity); + }; + + final RequestRouter requestRouter = RequestRouter.builder() + .resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER) + .addRoute(RequestRouter.LOCAL_AUTHORITY, "/hello", requestHandler) + .build(); + + server.register("*", () -> new ClassicToAsyncServerExchangeHandler( + Executors.newSingleThreadExecutor(), + requestRouter, + LoggingExceptionCallback.INSTANCE)); + final InetSocketAddress serverEndpoint = server.start(); final HttpHost target = target(serverEndpoint); @@ -681,13 +670,12 @@ protected void handle( final Future connectFuture = client.connect(target, TIMEOUT); final ClientSessionEndpoint streamEndpoint = connectFuture.get(); - final BasicHttpRequest request1 = BasicRequestBuilder.post() + final BasicHttpRequest request1 = BasicRequestBuilder.get() .setHttpHost(target) .setPath("/hello") .build(); - final Future> future1 = streamEndpoint.execute( - new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)), + new BasicRequestProducer(request1, null), new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null); final Message result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit()); Assertions.assertNotNull(result1); @@ -698,7 +686,7 @@ protected void handle( Assertions.assertNotNull(s1); final StringTokenizer t1 = new StringTokenizer(s1, "\r\n"); while (t1.hasMoreTokens()) { - Assertions.assertEquals("0123456789abcd", t1.nextToken()); + Assertions.assertEquals("0123456789abcdef", t1.nextToken()); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/BHttpConnectionBase.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/BHttpConnectionBase.java index df7c00bfd7..a300580379 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/BHttpConnectionBase.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/BHttpConnectionBase.java @@ -48,13 +48,13 @@ import org.apache.hc.core5.http.EndpointDetails; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpEntity; -import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.HttpMessage; import org.apache.hc.core5.http.ProtocolVersion; import org.apache.hc.core5.http.config.Http1Config; import org.apache.hc.core5.http.impl.BasicEndpointDetails; import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics; import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics; +import org.apache.hc.core5.http.impl.io.support.IncomingHttpEntity; import org.apache.hc.core5.http.io.BHttpConnection; import org.apache.hc.core5.http.io.SessionInputBuffer; import org.apache.hc.core5.http.io.SessionOutputBuffer; @@ -186,9 +186,8 @@ HttpEntity createIncomingEntity( final long len) { return new IncomingHttpEntity( createContentInputStream(len, inBuffer, inputStream), - len >= 0 ? len : -1, len == ContentLengthStrategy.CHUNKED, - message.getFirstHeader(HttpHeaders.CONTENT_TYPE), - message.getFirstHeader(HttpHeaders.CONTENT_ENCODING)); + len, + message); } @Override diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/IncomingHttpEntity.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/support/IncomingHttpEntity.java similarity index 80% rename from httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/IncomingHttpEntity.java rename to httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/support/IncomingHttpEntity.java index 91edff19f8..06f7baad85 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/IncomingHttpEntity.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/support/IncomingHttpEntity.java @@ -25,7 +25,7 @@ * */ -package org.apache.hc.core5.http.impl.io; +package org.apache.hc.core5.http.impl.io.support; import java.io.IOException; import java.io.InputStream; @@ -34,26 +34,27 @@ import java.util.List; import java.util.Set; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.function.Supplier; +import org.apache.hc.core5.http.ContentLengthStrategy; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpMessage; import org.apache.hc.core5.http.io.entity.AbstractHttpEntity; import org.apache.hc.core5.io.Closer; -class IncomingHttpEntity implements HttpEntity { +@Internal +public class IncomingHttpEntity implements HttpEntity { private final InputStream content; private final long len; - private final boolean chunked; - private final Header contentType; - private final Header contentEncoding; + private final HttpMessage message; - IncomingHttpEntity(final InputStream content, final long len, final boolean chunked, final Header contentType, final Header contentEncoding) { + public IncomingHttpEntity(final InputStream content, final long len, final HttpMessage message) { this.content = content; this.len = len; - this.chunked = chunked; - this.contentType = contentType; - this.contentEncoding = contentEncoding; + this.message = message; } @Override @@ -63,22 +64,24 @@ public boolean isRepeatable() { @Override public boolean isChunked() { - return chunked; + return len == ContentLengthStrategy.CHUNKED; } @Override public long getContentLength() { - return len; + return len >= 0 ? len : -1; } @Override public String getContentType() { - return contentType != null ? contentType.getValue() : null; + final Header h = message.getFirstHeader(HttpHeaders.CONTENT_TYPE); + return h != null ? h.getValue() : null; } @Override public String getContentEncoding() { - return contentEncoding != null ? contentEncoding.getValue() : null; + final Header h = message.getFirstHeader(HttpHeaders.CONTENT_ENCODING); + return h != null ? h.getValue() : null; } @Override diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java index 30f0f577cf..83eaa2f1cc 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java @@ -116,6 +116,11 @@ public void close() { shutdownSession(CloseMode.IMMEDIATE); } + @Override + public void close(final CloseMode closeMode) { + shutdownSession(closeMode); + } + @Override public void submit( final HttpRequest request, diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Http1StreamChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Http1StreamChannel.java index 3d4bd88318..6f4c9b3ef4 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Http1StreamChannel.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Http1StreamChannel.java @@ -31,12 +31,15 @@ import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpMessage; import org.apache.hc.core5.http.nio.ContentEncoder; +import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.util.Timeout; interface Http1StreamChannel extends ContentEncoder { void close(); + void close(CloseMode closeMode); + void activate() throws HttpException, IOException; void submit(OutgoingMessage messageHead, boolean endStream, final FlushMode flushMode) throws HttpException, IOException; diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java index 626771df5d..e4b1b6d6e3 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java @@ -127,6 +127,11 @@ public void close() { ServerHttp1StreamDuplexer.this.close(CloseMode.GRACEFUL); } + @Override + public void close(final CloseMode closeMode) { + ServerHttp1StreamDuplexer.this.close(closeMode); + } + @Override public void submit( final HttpResponse response, @@ -507,6 +512,10 @@ public void close() { channel.close(); } + public void close(final CloseMode closeMode) { + channel.close(closeMode); + } + @Override public void submit( final HttpResponse response, diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamHandler.java index 5e2761873f..f0e238e7d6 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamHandler.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamHandler.java @@ -64,6 +64,7 @@ import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpCoreContext; import org.apache.hc.core5.http.protocol.HttpProcessor; +import org.apache.hc.core5.io.CloseMode; class ServerHttp1StreamHandler implements ResourceHolder { @@ -141,6 +142,11 @@ public void pushPromise( commitPromise(); } + @Override + public void terminateExchange() { + terminate(); + } + @Override public String toString() { return super.toString() + " " + ServerHttp1StreamHandler.this; @@ -222,6 +228,10 @@ private void commitPromise() throws HttpException { throw new HttpException("HTTP/1.1 does not support server push"); } + private void terminate() { + outputChannel.close(CloseMode.IMMEDIATE); + } + void activateChannel() throws IOException, HttpException { outputChannel.activate(); } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/EntityTemplate.java b/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/EntityTemplate.java index 448f3d8379..34fe98f276 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/EntityTemplate.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/EntityTemplate.java @@ -77,6 +77,23 @@ public EntityTemplate( this.callback = Args.notNull(callback, "I/O callback"); } + /** + * @since 5.4 + */ + public EntityTemplate( + final long contentLength, + final ContentType contentType, + final IOCallback callback) { + this(contentLength, contentType, null, callback); + } + + /** + * @since 5.4 + */ + public EntityTemplate(final ContentType contentType, final IOCallback callback) { + this(-1, contentType, null, callback); + } + @Override public long getContentLength() { return contentLength; diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/message/MessageSupport.java b/httpcore5/src/main/java/org/apache/hc/core5/http/message/MessageSupport.java index 6ddcb4a4ee..43a360dde8 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/message/MessageSupport.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/message/MessageSupport.java @@ -479,6 +479,16 @@ public static void addTrailerHeader(final HttpMessage message, final EntityDetai } } + /** + * @since 5.4 + */ + public static boolean canResponseHaveBody(final HttpResponse response) { + final int status = response.getCode(); + return status >= HttpStatus.SC_SUCCESS + && status != HttpStatus.SC_NO_CONTENT + && status != HttpStatus.SC_NOT_MODIFIED; + } + /** * @since 5.0 */ @@ -490,9 +500,7 @@ public static boolean canResponseHaveBody(final String method, final HttpRespons if (Method.CONNECT.isSame(method) && status == HttpStatus.SC_OK) { return false; } - return status >= HttpStatus.SC_SUCCESS - && status != HttpStatus.SC_NO_CONTENT - && status != HttpStatus.SC_NOT_MODIFIED; + return canResponseHaveBody(response); } private final static Set HOP_BY_HOP; diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/ResponseChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/ResponseChannel.java index 155af155ba..8a477634c2 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/ResponseChannel.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/ResponseChannel.java @@ -30,6 +30,7 @@ import java.io.IOException; import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.http.EntityDetails; import org.apache.hc.core5.http.HttpException; @@ -82,4 +83,12 @@ public interface ResponseChannel { */ void pushPromise(HttpRequest promise, AsyncPushProducer responseProducer, HttpContext context) throws HttpException, IOException; + /** + * Terminates message exchange due to an internal error generating + * a response or its content. + */ + @Internal + default void terminateExchange() { + } + } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityConsumer.java index b8122dbd97..2952da1adf 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityConsumer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityConsumer.java @@ -52,7 +52,10 @@ * @param entity representation. * * @since 5.0 + * + * @deprecated Use {@link ClassicToAsyncResponseConsumer}. */ +@Deprecated public abstract class AbstractClassicEntityConsumer implements AsyncEntityConsumer { private enum State { IDLE, ACTIVE, COMPLETED } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityProducer.java index 7e38b60d12..0b512676ba 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityProducer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityProducer.java @@ -44,7 +44,10 @@ * processing is executed through an {@link Executor}. * * @since 5.0 + * + * @deprecated Use {@link ClassicToAsyncRequestProducer}. */ +@Deprecated public abstract class AbstractClassicEntityProducer implements AsyncEntityProducer { private enum State { IDLE, ACTIVE, COMPLETED } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicServerExchangeHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicServerExchangeHandler.java index 287f161888..6d93965361 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicServerExchangeHandler.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicServerExchangeHandler.java @@ -63,7 +63,10 @@ * Blocking input / output processing is executed through an {@link Executor}. * * @since 5.0 + * + * @deprecated Use {@link ClassicToAsyncServerExchangeHandler}. */ +@Deprecated public abstract class AbstractClassicServerExchangeHandler implements AsyncServerExchangeHandler { private enum State { IDLE, ACTIVE, COMPLETED } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ClassicToAsyncRequestProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ClassicToAsyncRequestProducer.java new file mode 100644 index 0000000000..a1959d14de --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ClassicToAsyncRequestProducer.java @@ -0,0 +1,196 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http.nio.support.classic; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hc.core5.annotation.Experimental; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.Asserts; +import org.apache.hc.core5.util.Timeout; + +/** + * @since 5.4 + */ +@Experimental +public class ClassicToAsyncRequestProducer implements AsyncRequestProducer { + + private final ClassicHttpRequest request; + private final int initialBufferSize; + private final Timeout timeout; + private final CountDownLatch countDownLatch; + private final AtomicReference bufferRef; + private final AtomicReference exceptionRef; + + private volatile boolean repeatable; + + public interface IORunnable { + + void execute() throws IOException; + + } + + public ClassicToAsyncRequestProducer(final ClassicHttpRequest request, final int initialBufferSize, final Timeout timeout) { + this.request = Args.notNull(request, "HTTP request"); + this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size"); + this.timeout = timeout; + this.countDownLatch = new CountDownLatch(1); + this.bufferRef = new AtomicReference<>(); + this.exceptionRef = new AtomicReference<>(); + } + + public ClassicToAsyncRequestProducer(final ClassicHttpRequest request, final Timeout timeout) { + this(request, ClassicToAsyncSupport.INITIAL_BUF_SIZE, timeout); + } + + void propagateException() throws IOException { + final Exception ex = exceptionRef.getAndSet(null); + if (ex != null) { + ClassicToAsyncSupport.rethrow(ex); + } + } + + public IORunnable blockWaiting() throws IOException, InterruptedException { + if (timeout == null) { + countDownLatch.await(); + } else { + if (!countDownLatch.await(timeout.getDuration(), timeout.getTimeUnit())) { + throw new InterruptedIOException("Timeout blocked waiting for output (" + timeout + ")"); + } + } + propagateException(); + final SharedOutputBuffer outputBuffer = bufferRef.get(); + return () -> { + final HttpEntity requestEntity = request.getEntity(); + if (requestEntity != null) { + try (final InternalOutputStream outputStream = new InternalOutputStream(outputBuffer)) { + requestEntity.writeTo(outputStream); + } + } + }; + } + + @Override + public void sendRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException { + final HttpEntity requestEntity = request.getEntity(); + final SharedOutputBuffer buffer = requestEntity != null ? new SharedOutputBuffer(initialBufferSize) : null; + bufferRef.set(buffer); + repeatable = requestEntity == null || requestEntity.isRepeatable(); + channel.sendRequest(request, requestEntity, null); + countDownLatch.countDown(); + } + + @Override + public boolean isRepeatable() { + return repeatable; + } + + @Override + public int available() { + final SharedOutputBuffer buffer = bufferRef.get(); + if (buffer != null) { + return buffer.length(); + } + return 0; + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + final SharedOutputBuffer buffer = bufferRef.get(); + if (buffer != null) { + buffer.flush(channel); + } + } + + @Override + public void failed(final Exception cause) { + try { + exceptionRef.set(cause); + } finally { + countDownLatch.countDown(); + } + } + + @Override + public void releaseResources() { + } + + class InternalOutputStream extends OutputStream { + + private final SharedOutputBuffer buffer; + + public InternalOutputStream(final SharedOutputBuffer buffer) { + Asserts.notNull(buffer, "Shared buffer"); + this.buffer = buffer; + } + + @Override + public void close() throws IOException { + propagateException(); + this.buffer.writeCompleted(timeout); + } + + @Override + public void flush() throws IOException { + propagateException(); + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + propagateException(); + this.buffer.write(b, off, len, timeout); + } + + @Override + public void write(final byte[] b) throws IOException { + propagateException(); + if (b == null) { + return; + } + this.buffer.write(b, 0, b.length, timeout); + } + + @Override + public void write(final int b) throws IOException { + propagateException(); + this.buffer.write(b, timeout); + } + + } + +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ClassicToAsyncResponseConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ClassicToAsyncResponseConsumer.java new file mode 100644 index 0000000000..8c33f78625 --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ClassicToAsyncResponseConsumer.java @@ -0,0 +1,327 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http.nio.support.classic; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hc.core5.annotation.Experimental; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.function.Supplier; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.io.entity.AbstractHttpEntity; +import org.apache.hc.core5.http.io.support.ClassicResponseBuilder; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.io.Closer; +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.Asserts; +import org.apache.hc.core5.util.Timeout; + +/** + * @since 5.4 + */ +@Experimental +public class ClassicToAsyncResponseConsumer implements AsyncResponseConsumer { + + static class ResponseData { + + final HttpResponse head; + final EntityDetails entityDetails; + + ResponseData(final HttpResponse head, + final EntityDetails entityDetails) { + this.head = head; + this.entityDetails = entityDetails; + } + + } + + private final int initialBufferSize; + private final Timeout timeout; + private final CountDownLatch countDownLatch; + private final AtomicReference responseRef; + private final AtomicReference> callbackRef; + private final AtomicReference bufferRef; + private final AtomicReference exceptionRef; + + public ClassicToAsyncResponseConsumer(final int initialBufferSize, final Timeout timeout) { + this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size"); + this.timeout = timeout; + this.countDownLatch = new CountDownLatch(1); + this.responseRef = new AtomicReference<>(); + this.callbackRef = new AtomicReference<>(); + this.bufferRef = new AtomicReference<>(); + this.exceptionRef = new AtomicReference<>(); + } + + public ClassicToAsyncResponseConsumer(final Timeout timeout) { + this(ClassicToAsyncSupport.INITIAL_BUF_SIZE, timeout); + } + + void propagateException() throws IOException { + final Exception ex = exceptionRef.getAndSet(null); + if (ex != null) { + ClassicToAsyncSupport.rethrow(ex); + } + } + + void fireComplete() throws IOException { + final FutureCallback callback = callbackRef.getAndSet(null); + if (callback != null) { + callback.completed(null); + } + } + + public ClassicHttpResponse blockWaiting() throws IOException, InterruptedException { + if (timeout == null) { + countDownLatch.await(); + } else { + if (!countDownLatch.await(timeout.getDuration(), timeout.getTimeUnit())) { + throw new InterruptedIOException("Timeout blocked waiting for input (" + timeout + ")"); + } + } + propagateException(); + final ResponseData r = responseRef.getAndSet(null); + Asserts.notNull(r, "HTTP response is missing"); + final SharedInputBuffer inputBuffer = bufferRef.get(); + return ClassicResponseBuilder.copy(r.head) + .setEntity(r.entityDetails != null ? + new IncomingHttpEntity(new InternalInputStream(inputBuffer), r.entityDetails) : + null) + .build(); + } + + @Override + public void consumeResponse(final HttpResponse asyncResponse, + final EntityDetails entityDetails, + final HttpContext context, + final FutureCallback resultCallback) throws HttpException, IOException { + callbackRef.set(resultCallback); + final ResponseData responseData = new ResponseData(asyncResponse, entityDetails); + responseRef.set(responseData); + if (entityDetails != null) { + bufferRef.set(new SharedInputBuffer(initialBufferSize)); + } else { + fireComplete(); + } + countDownLatch.countDown(); + } + + @Override + public void informationResponse(final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + } + + @Override + public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + final SharedInputBuffer buffer = bufferRef.get(); + if (buffer != null) { + buffer.updateCapacity(capacityChannel); + } + } + + @Override + public final void consume(final ByteBuffer src) throws IOException { + final SharedInputBuffer buffer = bufferRef.get(); + if (buffer != null) { + buffer.fill(src); + } + } + + @Override + public final void streamEnd(final List trailers) throws HttpException, IOException { + final SharedInputBuffer buffer = bufferRef.get(); + if (buffer != null) { + buffer.markEndStream(); + } + } + + @Override + public final void failed(final Exception cause) { + try { + exceptionRef.set(cause); + } finally { + countDownLatch.countDown(); + } + } + + @Override + public void releaseResources() { + } + + class InternalInputStream extends InputStream { + + private final SharedInputBuffer buffer; + + InternalInputStream(final SharedInputBuffer buffer) { + super(); + Args.notNull(buffer, "Input buffer"); + this.buffer = buffer; + } + + @Override + public int available() throws IOException { + propagateException(); + return this.buffer.length(); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + propagateException(); + if (len == 0) { + return 0; + } + final int bytesRead = this.buffer.read(b, off, len, timeout); + if (bytesRead == -1) { + fireComplete(); + } + return bytesRead; + } + + @Override + public int read(final byte[] b) throws IOException { + propagateException(); + if (b == null) { + return 0; + } + final int bytesRead = this.buffer.read(b, 0, b.length, timeout); + if (bytesRead == -1) { + fireComplete(); + } + return bytesRead; + } + + @Override + public int read() throws IOException { + propagateException(); + final int b = this.buffer.read(timeout); + if (b == -1) { + fireComplete(); + } + return b; + } + + @Override + public void close() throws IOException { + // read and discard the remainder of the message + final byte[] tmp = new byte[1024]; + do { + /* empty */ + } while (read(tmp) >= 0); + super.close(); + } + + } + + static class IncomingHttpEntity implements HttpEntity { + + private final InputStream content; + private final EntityDetails entityDetails; + + IncomingHttpEntity(final InputStream content, final EntityDetails entityDetails) { + this.content = content; + this.entityDetails = entityDetails; + } + + @Override + public boolean isRepeatable() { + return false; + } + + @Override + public boolean isChunked() { + return entityDetails.isChunked(); + } + + @Override + public long getContentLength() { + return entityDetails.getContentLength(); + } + + @Override + public String getContentType() { + return entityDetails.getContentType(); + } + + @Override + public String getContentEncoding() { + return entityDetails.getContentEncoding(); + } + + @Override + public InputStream getContent() throws IOException, IllegalStateException { + return content; + } + + @Override + public boolean isStreaming() { + return content != null; + } + + @Override + public void writeTo(final OutputStream outStream) throws IOException { + AbstractHttpEntity.writeTo(this, outStream); + } + + @Override + public Supplier> getTrailers() { + return null; + } + + @Override + public Set getTrailerNames() { + return Collections.emptySet(); + } + + @Override + public void close() throws IOException { + Closer.close(content); + } + + @Override + public String toString() { + return entityDetails.toString(); + } + + } + +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ClassicToAsyncServerExchangeHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ClassicToAsyncServerExchangeHandler.java new file mode 100644 index 0000000000..b92a0ea5e9 --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ClassicToAsyncServerExchangeHandler.java @@ -0,0 +1,380 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http.nio.support.classic; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hc.core5.annotation.Experimental; +import org.apache.hc.core5.function.Callback; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpRequestMapper; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.ProtocolException; +import org.apache.hc.core5.http.impl.io.support.IncomingHttpEntity; +import org.apache.hc.core5.http.io.HttpRequestHandler; +import org.apache.hc.core5.http.io.HttpServerRequestHandler; +import org.apache.hc.core5.http.io.support.BasicHttpServerRequestHandler; +import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; +import org.apache.hc.core5.http.nio.AsyncResponseProducer; +import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.ResponseChannel; +import org.apache.hc.core5.http.nio.support.BasicResponseProducer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.support.BasicResponseBuilder; +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.Asserts; + +/** + * {@link AsyncServerExchangeHandler} implementation that acts as a compatibility + * layer for classic {@link InputStream} / {@link OutputStream} based interfaces. + * Blocking input / output processing is executed through an {@link Executor}. + * + * @since 5.4 + */ +@Experimental +public class ClassicToAsyncServerExchangeHandler implements AsyncServerExchangeHandler { + + private final int initialBufferSize; + private final Executor executor; + private final HttpServerRequestHandler requestHandler; + private final Callback exceptionCallback; + private final AtomicBoolean responseCommitted; + private final AtomicReference responseProducerRef; + private final AtomicReference inputBufferRef; + private final AtomicReference outputBufferRef; + private final AtomicReference exceptionRef; + + public ClassicToAsyncServerExchangeHandler( + final int initialBufferSize, + final Executor executor, + final HttpServerRequestHandler requestHandler, + final Callback exceptionCallback) { + this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size"); + this.executor = Args.notNull(executor, "Executor"); + this.requestHandler = Args.notNull(requestHandler, "Request handler"); + this.exceptionCallback = exceptionCallback; + this.responseCommitted = new AtomicBoolean(); + this.responseProducerRef = new AtomicReference<>(); + this.inputBufferRef = new AtomicReference<>(); + this.outputBufferRef = new AtomicReference<>(); + this.exceptionRef = new AtomicReference<>(); + } + + public ClassicToAsyncServerExchangeHandler( + final Executor executor, + final HttpServerRequestHandler requestHandler, + final Callback exceptionCallback) { + this(ClassicToAsyncSupport.INITIAL_BUF_SIZE, executor, requestHandler, exceptionCallback); + } + + public ClassicToAsyncServerExchangeHandler( + final Executor executor, + final HttpRequestMapper handlerMapper, + final Callback exceptionCallback) { + this(ClassicToAsyncSupport.INITIAL_BUF_SIZE, executor, + new BasicHttpServerRequestHandler(handlerMapper), + exceptionCallback); + } + + public ClassicToAsyncServerExchangeHandler( + final Executor executor, + final HttpRequestHandler handler, + final Callback exceptionCallback) { + this(ClassicToAsyncSupport.INITIAL_BUF_SIZE, executor, + new BasicHttpServerRequestHandler((request, context) -> handler), + exceptionCallback); + } + + void propagateException() throws IOException { + final Exception ex = exceptionRef.getAndSet(null); + if (ex != null) { + ClassicToAsyncSupport.rethrow(ex); + } + } + + SharedInputBuffer inputBuffer() { + final SharedInputBuffer inputBuffer = inputBufferRef.get(); + Asserts.notNull(inputBuffer, "Input buffer"); + return inputBuffer; + } + + SharedOutputBuffer outputBuffer() { + final SharedOutputBuffer outputBuffer = outputBufferRef.get(); + Asserts.notNull(outputBuffer, "Output buffer"); + return outputBuffer; + } + + void abortInput() { + final SharedInputBuffer inputBuffer = inputBufferRef.get(); + if (inputBuffer != null) { + inputBuffer.abort(); + } + } + + void abortOutput() { + final SharedOutputBuffer outputBuffer = outputBufferRef.get(); + if (outputBuffer != null) { + outputBuffer.abort(); + } + } + + @Override + public final void handleRequest( + final HttpRequest request, + final EntityDetails entityDetails, + final ResponseChannel responseChannel, + final HttpContext context) throws HttpException, IOException { + if (entityDetails != null) { + final SharedInputBuffer inputBuffer = new SharedInputBuffer(initialBufferSize); + inputBufferRef.set(inputBuffer); + } + executor.execute(() -> { + try { + final ClassicHttpRequest cr = ClassicRequestBuilder.copy(request).build(); + if (entityDetails != null) { + cr.setEntity(new IncomingHttpEntity( + new InternalInputStream(inputBufferRef.get()), + entityDetails.getContentLength(), + request)); + } + + final HttpServerRequestHandler.ResponseTrigger trigger = new HttpServerRequestHandler.ResponseTrigger() { + + @Override + public void sendInformation(final ClassicHttpResponse response) throws HttpException, IOException { + responseChannel.sendInformation(response, context); + } + + @Override + public void submitResponse(final ClassicHttpResponse response) throws HttpException, IOException { + if (responseCommitted.compareAndSet(false, true)) { + final HttpEntity responseEntity = response.getEntity(); + final String method = request.getMethod(); + final boolean contentExpected = responseEntity != null && !Method.HEAD.isSame(method); + if (contentExpected) { + final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(initialBufferSize); + outputBufferRef.set(outputBuffer); + } + responseChannel.sendResponse(response, responseEntity, null); + if (contentExpected) { + responseEntity.writeTo(new InternalOutputStream(outputBufferRef.get())); + } + } else { + throw new IllegalStateException("Response has already been committed"); + } + } + + }; + try { + requestHandler.handle(cr, trigger, context); + } catch (HttpException | RuntimeException ex) { + if (responseCommitted.compareAndSet(false, true)) { + final AsyncResponseProducer responseProducer = handleError(ex); + responseProducerRef.set(responseProducer); + responseProducer.sendResponse(responseChannel, context); + } else { + throw ex; + } + } + } catch (final Exception ex) { + if (exceptionCallback != null) { + exceptionCallback.execute(ex); + } + responseChannel.terminateExchange(); + } + }); + } + + protected AsyncResponseProducer handleError(final Exception ex) { + final int status = (ex instanceof ProtocolException) ? HttpStatus.SC_BAD_REQUEST : HttpStatus.SC_INTERNAL_SERVER_ERROR; + return new BasicResponseProducer( + BasicResponseBuilder.create(status).build(), + ex.getMessage(), + ContentType.TEXT_PLAIN); + } + + @Override + public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + inputBuffer().updateCapacity(capacityChannel); + } + + @Override + public final void consume(final ByteBuffer src) throws IOException { + inputBuffer().fill(src); + } + + @Override + public final void streamEnd(final List trailers) throws HttpException, IOException { + inputBuffer().markEndStream(); + } + + @Override + public final int available() { + final AsyncResponseProducer responseProducer = responseProducerRef.get(); + if (responseProducer != null) { + return responseProducer.available(); + } else { + return outputBuffer().length(); + } + } + + @Override + public final void produce(final DataStreamChannel channel) throws IOException { + final AsyncResponseProducer responseProducer = responseProducerRef.get(); + if (responseProducer != null) { + responseProducer.produce(channel); + } else { + outputBuffer().flush(channel); + } + } + + @Override + public final void failed(final Exception cause) { + responseCommitted.set(true); + exceptionRef.compareAndSet(null, cause); + abortInput(); + abortOutput(); + } + + @Override + public void releaseResources() { + } + + class InternalInputStream extends InputStream { + + private final ContentInputBuffer buffer; + + InternalInputStream(final ContentInputBuffer buffer) { + super(); + Args.notNull(buffer, "Input buffer"); + this.buffer = buffer; + } + + @Override + public int available() throws IOException { + propagateException(); + return buffer.length(); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + propagateException(); + if (len == 0) { + return 0; + } + return buffer.read(b, off, len); + } + + @Override + public int read(final byte[] b) throws IOException { + propagateException(); + if (b == null) { + return 0; + } + return buffer.read(b, 0, b.length); + } + + @Override + public int read() throws IOException { + propagateException(); + return buffer.read(); + } + + @Override + public void close() throws IOException { + propagateException(); + // read and discard the remainder of the message + final byte[] tmp = new byte[1024]; + do { + /* empty */ + } while (read(tmp) >= 0); + super.close(); + } + + } + + class InternalOutputStream extends OutputStream { + + private final SharedOutputBuffer buffer; + + public InternalOutputStream(final SharedOutputBuffer buffer) { + Asserts.notNull(buffer, "Shared buffer"); + this.buffer = buffer; + } + + @Override + public void close() throws IOException { + propagateException(); + buffer.writeCompleted(); + } + + @Override + public void flush() throws IOException { + propagateException(); + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + propagateException(); + buffer.write(b, off, len); + } + + @Override + public void write(final byte[] b) throws IOException { + propagateException(); + if (b == null) { + return; + } + buffer.write(b, 0, b.length); + } + + @Override + public void write(final int b) throws IOException { + propagateException(); + buffer.write(b); + } + + } + +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ClassicToAsyncSupport.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ClassicToAsyncSupport.java new file mode 100644 index 0000000000..501edee111 --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ClassicToAsyncSupport.java @@ -0,0 +1,53 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.http.nio.support.classic; + +import java.io.IOException; + +import org.apache.hc.core5.http.HttpException; + +final class ClassicToAsyncSupport { + + final static int INITIAL_BUF_SIZE = 2048; + + static void rethrow(final Throwable ex) throws IOException { + if (ex instanceof Error) { + throw (Error) ex; + } else if (ex instanceof RuntimeException) { + throw (RuntimeException) ex; + } else if (ex instanceof IOException) { + throw new TransportException((IOException) ex); + } else if (ex instanceof HttpException) { + throw new ProtocolException((HttpException) ex); + } else { + // Unexpected exception type + throw new IllegalStateException(ex); + } + } + +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ProtocolException.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ProtocolException.java new file mode 100644 index 0000000000..1c76c00915 --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ProtocolException.java @@ -0,0 +1,45 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.http.nio.support.classic; + +import java.io.IOException; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.http.HttpException; + +/** + * @since 5.4 + */ +@Internal +public class ProtocolException extends IOException { + + public ProtocolException(final HttpException ex) { + super(ex); + } + +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java index b0a155f564..992f6ed015 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java @@ -35,6 +35,7 @@ import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.util.Timeout; /** * @since 5.0 @@ -93,12 +94,18 @@ public void updateCapacity(final CapacityChannel capacityChannel) throws IOExcep } } - private void awaitInput() throws InterruptedIOException { + private void awaitInput(final Timeout timeout) throws InterruptedIOException { if (!buffer().hasRemaining()) { setInputMode(); while (buffer().position() == 0 && !endStream && !aborted) { try { - condition.await(); + if (timeout == null) { + condition.await(); + } else { + if (!condition.await(timeout.getDuration(), timeout.getTimeUnit())) { + throw new InterruptedIOException("Timeout blocked waiting for input (" + timeout + ")"); + } + } } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); throw new InterruptedIOException(ex.getMessage()); @@ -116,10 +123,17 @@ private void ensureNotAborted() throws InterruptedIOException { @Override public int read() throws IOException { + return read(null); + } + + /** + * @since 5.4 + */ + public int read(final Timeout timeout) throws IOException { lock.lock(); try { setOutputMode(); - awaitInput(); + awaitInput(timeout); ensureNotAborted(); if (!buffer().hasRemaining() && endStream) { return -1; @@ -137,13 +151,20 @@ public int read() throws IOException { @Override public int read(final byte[] b, final int off, final int len) throws IOException { + return read(b, off, len, null); + } + + /** + * @since 5.4 + */ + public int read(final byte[] b, final int off, final int len, final Timeout timeout) throws IOException { if (len == 0) { return 0; } lock.lock(); try { setOutputMode(); - awaitInput(); + awaitInput(timeout); ensureNotAborted(); if (!buffer().hasRemaining() && endStream) { return -1; diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedOutputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedOutputBuffer.java index fb8dffba00..911259b210 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedOutputBuffer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedOutputBuffer.java @@ -35,6 +35,7 @@ import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.util.Timeout; /** * @since 5.0 @@ -80,8 +81,10 @@ private void ensureNotAborted() throws InterruptedIOException { } } - @Override - public void write(final byte[] b, final int off, final int len) throws IOException { + /** + * @since 5.4 + */ + public void write(final byte[] b, final int off, final int len, final Timeout timeout) throws IOException { final ByteBuffer src = ByteBuffer.wrap(b, off, len); lock.lock(); try { @@ -93,13 +96,13 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti buffer().put(src); } else { if (buffer().position() > 0 || dataStreamChannel == null) { - waitFlush(); + waitFlush(timeout); } if (buffer().position() == 0 && dataStreamChannel != null) { final int bytesWritten = dataStreamChannel.write(src); if (bytesWritten == 0) { hasCapacity = false; - waitFlush(); + waitFlush(timeout); } } } @@ -110,13 +113,20 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti } @Override - public void write(final int b) throws IOException { + public void write(final byte[] b, final int off, final int len) throws IOException { + write(b, off, len, null); + } + + /** + * @since 5.4 + */ + public void write(final int b, final Timeout timeout) throws IOException { lock.lock(); try { ensureNotAborted(); setInputMode(); if (!buffer().hasRemaining()) { - waitFlush(); + waitFlush(timeout); } buffer().put((byte)b); } finally { @@ -125,7 +135,14 @@ public void write(final int b) throws IOException { } @Override - public void writeCompleted() throws IOException { + public void write(final int b) throws IOException { + write(b, null); + } + + /** + * @since 5.4 + */ + public void writeCompleted(final Timeout timeout) throws IOException { if (endStream) { return; } @@ -137,7 +154,7 @@ public void writeCompleted() throws IOException { setOutputMode(); if (buffer().hasRemaining()) { dataStreamChannel.requestOutput(); - waitEndStream(); + waitEndStream(timeout); } else { propagateEndStream(); } @@ -148,30 +165,42 @@ public void writeCompleted() throws IOException { } } - private void waitFlush() throws InterruptedIOException { + @Override + public void writeCompleted() throws IOException { + writeCompleted(null); + } + + private void waitFlush(final Timeout timeout) throws InterruptedIOException { if (dataStreamChannel != null) { dataStreamChannel.requestOutput(); } setOutputMode(); while (buffer().hasRemaining() || !hasCapacity) { ensureNotAborted(); - waitForSignal(); + waitForSignal(timeout); } setInputMode(); } - private void waitEndStream() throws InterruptedIOException { + private void waitEndStream(final Timeout timeout) throws InterruptedIOException { if (dataStreamChannel != null) { dataStreamChannel.requestOutput(); } while (!endStreamPropagated.get() && !aborted) { - waitForSignal(); + waitForSignal(timeout); } } - private void waitForSignal() throws InterruptedIOException { + private void waitForSignal(final Timeout timeout) throws InterruptedIOException { try { - condition.await(); + if (timeout == null) { + condition.await(); + } else { + if (!condition.await(timeout.getDuration(), timeout.getTimeUnit())) { + aborted = true; + throw new InterruptedIOException("Timeout blocked waiting for output (" + timeout + ")"); + } + } } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); throw new InterruptedIOException(ex.getMessage()); diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/TransportException.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/TransportException.java new file mode 100644 index 0000000000..4aad21a5fd --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/TransportException.java @@ -0,0 +1,44 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.http.nio.support.classic; + +import java.io.IOException; + +import org.apache.hc.core5.annotation.Internal; + +/** + * @since 5.4 + */ +@Internal +public class TransportException extends IOException { + + public TransportException(final IOException ex) { + super(ex); + } + +}