netty connector/container modifications (#4387)
* netty connector/container modifications
Signed-off-by: Maxim Nesen <maxim.nesen@oracle.com>
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
index 5531498..9ab9745 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -20,17 +20,14 @@
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingDeque;
import javax.ws.rs.core.Response;
import org.glassfish.jersey.client.ClientRequest;
import org.glassfish.jersey.client.ClientResponse;
-import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
import org.glassfish.jersey.netty.connector.internal.NettyInputStream;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpContent;
@@ -39,8 +36,6 @@
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
/**
* Jersey implementation of Netty channel handler.
@@ -49,19 +44,38 @@
*/
class JerseyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
- private final NettyConnector connector;
- private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
-
- private final AsyncConnectorCallback asyncConnectorCallback;
private final ClientRequest jerseyRequest;
- private final CompletableFuture future;
+ private final CompletableFuture<ClientResponse> responseAvailable;
+ private final CompletableFuture<?> responseDone;
- JerseyClientHandler(NettyConnector nettyConnector, ClientRequest request,
- AsyncConnectorCallback callback, CompletableFuture future) {
- this.connector = nettyConnector;
- this.asyncConnectorCallback = callback;
+ private NettyInputStream nis;
+ private ClientResponse jerseyResponse;
+
+ JerseyClientHandler(ClientRequest request,
+ CompletableFuture<ClientResponse> responseAvailable,
+ CompletableFuture<?> responseDone) {
this.jerseyRequest = request;
- this.future = future;
+ this.responseAvailable = responseAvailable;
+ this.responseDone = responseDone;
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) {
+ notifyResponse();
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) {
+ // assert: no-op, if channel is closed after LastHttpContent has been consumed
+ responseDone.completeExceptionally(new IOException("Stream closed"));
+ }
+
+ protected void notifyResponse() {
+ if (jerseyResponse != null) {
+ ClientResponse cr = jerseyResponse;
+ jerseyResponse = null;
+ responseAvailable.complete(cr);
+ }
}
@Override
@@ -69,7 +83,7 @@
if (msg instanceof HttpResponse) {
final HttpResponse response = (HttpResponse) msg;
- final ClientResponse jerseyResponse = new ClientResponse(new Response.StatusType() {
+ jerseyResponse = new ClientResponse(new Response.StatusType() {
@Override
public int getStatusCode() {
return response.status().code();
@@ -89,19 +103,15 @@
for (Map.Entry<String, String> entry : response.headers().entries()) {
jerseyResponse.getHeaders().add(entry.getKey(), entry.getValue());
}
- isList.clear(); // clearing the content - possible leftover from previous request processing.
+
// request entity handling.
if ((response.headers().contains(HttpHeaderNames.CONTENT_LENGTH) && HttpUtil.getContentLength(response) > 0)
|| HttpUtil.isTransferEncodingChunked(response)) {
- ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
- @Override
- public void operationComplete(Future<? super Void> future) throws Exception {
- isList.add(Unpooled.EMPTY_BUFFER);
- }
- });
+ nis = new NettyInputStream();
+ responseDone.whenComplete((_r, th) -> nis.complete(th));
- jerseyResponse.setEntityStream(new NettyInputStream(isList));
+ jerseyResponse.setEntityStream(nis);
} else {
jerseyResponse.setEntityStream(new InputStream() {
@Override
@@ -110,44 +120,29 @@
}
});
}
-
- if (asyncConnectorCallback != null) {
- connector.executorService.execute(new Runnable() {
- @Override
- public void run() {
- asyncConnectorCallback.response(jerseyResponse);
- future.complete(jerseyResponse);
- }
- });
- }
-
}
if (msg instanceof HttpContent) {
+
HttpContent httpContent = (HttpContent) msg;
ByteBuf content = httpContent.content();
+
if (content.isReadable()) {
content.retain();
- isList.add(content);
+ nis.publish(content);
}
if (msg instanceof LastHttpContent) {
- isList.add(Unpooled.EMPTY_BUFFER);
+ responseDone.complete(null);
+ notifyResponse();
}
}
}
+
+
@Override
public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) {
- if (asyncConnectorCallback != null) {
- connector.executorService.execute(new Runnable() {
- @Override
- public void run() {
- asyncConnectorCallback.failure(cause);
- }
- });
- }
- future.completeExceptionally(cause);
- ctx.close();
+ responseDone.completeExceptionally(cause);
}
}
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java
index 2b36006..373d3c8 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java
@@ -20,6 +20,8 @@
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -28,7 +30,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
@@ -76,6 +77,7 @@
final ExecutorService executorService;
final EventLoopGroup group;
final Client client;
+ final HashMap<String, ArrayList<Channel>> connections = new HashMap<>();
NettyConnector(Client client) {
@@ -83,72 +85,72 @@
if (threadPoolSize != null && threadPoolSize instanceof Integer && (Integer) threadPoolSize > 0) {
executorService = Executors.newFixedThreadPool((Integer) threadPoolSize);
+ this.group = new NioEventLoopGroup((Integer) threadPoolSize);
} else {
executorService = Executors.newCachedThreadPool();
+ this.group = new NioEventLoopGroup();
}
- this.group = new NioEventLoopGroup();
this.client = client;
}
@Override
public ClientResponse apply(ClientRequest jerseyRequest) {
-
- final AtomicReference<ClientResponse> syncResponse = new AtomicReference<>(null);
- final AtomicReference<Throwable> syncException = new AtomicReference<>(null);
-
try {
- Future<?> resultFuture = apply(jerseyRequest, new AsyncConnectorCallback() {
- @Override
- public void response(ClientResponse response) {
- syncResponse.set(response);
- }
-
- @Override
- public void failure(Throwable failure) {
- syncException.set(failure);
- }
- });
+ CompletableFuture<ClientResponse> resultFuture = execute(jerseyRequest);
Integer timeout = ClientProperties.getValue(jerseyRequest.getConfiguration().getProperties(),
ClientProperties.READ_TIMEOUT, 0);
- if (timeout != null && timeout > 0) {
- resultFuture.get(timeout, TimeUnit.MILLISECONDS);
- } else {
- resultFuture.get();
- }
+ return (timeout != null && timeout > 0) ? resultFuture.get(timeout, TimeUnit.MILLISECONDS)
+ : resultFuture.get();
} catch (ExecutionException ex) {
Throwable e = ex.getCause() == null ? ex : ex.getCause();
throw new ProcessingException(e.getMessage(), e);
} catch (Exception ex) {
throw new ProcessingException(ex.getMessage(), ex);
}
-
- Throwable throwable = syncException.get();
- if (throwable == null) {
- return syncResponse.get();
- } else {
- throw new RuntimeException(throwable);
- }
}
@Override
public Future<?> apply(final ClientRequest jerseyRequest, final AsyncConnectorCallback jerseyCallback) {
+ return execute(jerseyRequest).whenCompleteAsync((r, th) -> {
+ if (th == null) jerseyCallback.response(r);
+ else jerseyCallback.failure(th);
+ }, executorService);
+ }
- final CompletableFuture<Object> settableFuture = new CompletableFuture<>();
+ protected CompletableFuture<ClientResponse> execute(final ClientRequest jerseyRequest) {
+ final CompletableFuture<ClientResponse> responseAvailable = new CompletableFuture<>();
+ final CompletableFuture<?> responseDone = new CompletableFuture<>();
final URI requestUri = jerseyRequest.getUri();
String host = requestUri.getHost();
int port = requestUri.getPort() != -1 ? requestUri.getPort() : "https".equals(requestUri.getScheme()) ? 443 : 80;
try {
- Bootstrap b = new Bootstrap();
- b.group(group)
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
+ String key = requestUri.getScheme() + "://" + host + ":" + port;
+ ArrayList<Channel> conns;
+ synchronized (connections) {
+ conns = connections.get(key);
+ if (conns == null) {
+ conns = new ArrayList<>(0);
+ connections.put(key, conns);
+ }
+ }
+
+ Channel chan;
+ synchronized (conns) {
+ chan = conns.size() == 0 ? null : conns.remove(conns.size() - 1);
+ }
+
+ if (chan == null) {
+ Bootstrap b = new Bootstrap();
+ b.group(group)
+ .channel(NioSocketChannel.class)
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// Enable HTTPS if necessary.
@@ -177,32 +179,47 @@
p.addLast(new HttpClientCodec());
p.addLast(new ChunkedWriteHandler());
p.addLast(new HttpContentDecompressor());
- p.addLast(new JerseyClientHandler(NettyConnector.this, jerseyRequest, jerseyCallback, settableFuture));
- }
- });
+ }
+ });
- // connect timeout
- Integer connectTimeout = ClientProperties.getValue(jerseyRequest.getConfiguration().getProperties(),
- ClientProperties.CONNECT_TIMEOUT, 0);
- if (connectTimeout > 0) {
- b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
+ // connect timeout
+ Integer connectTimeout = ClientProperties.getValue(jerseyRequest.getConfiguration().getProperties(),
+ ClientProperties.CONNECT_TIMEOUT, 0);
+ if (connectTimeout > 0) {
+ b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
+ }
+
+ // Make the connection attempt.
+ chan = b.connect(host, port).sync().channel();
}
- // Make the connection attempt.
- final Channel ch = b.connect(host, port).sync().channel();
+ // assert: clientHandler will always notify responseDone: either normally, or exceptionally
+ // assert: clientHandler may notify responseAvailable, if sufficient parts of response are detected to construct
+ // a valid ClientResponse
+ // assert: responseAvailable completion may be racing against responseDone completion
+ // assert: it is ok to abort the entire response, if responseDone is completed exceptionally - in particular, nothing
+ // will leak
+ final Channel ch = chan;
+ JerseyClientHandler clientHandler = new JerseyClientHandler(jerseyRequest, responseAvailable, responseDone);
+ ch.pipeline().addLast(clientHandler);
- // guard against prematurely closed channel
- final GenericFutureListener<io.netty.util.concurrent.Future<? super Void>> closeListener =
- new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
- @Override
- public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws Exception {
- if (!settableFuture.isDone()) {
- settableFuture.completeExceptionally(new IOException("Channel closed."));
- }
- }
- };
+ responseDone.whenComplete((_r, th) -> {
+ ch.pipeline().remove(clientHandler);
- ch.closeFuture().addListener(closeListener);
+ if (th == null) {
+ synchronized (connections) {
+ ArrayList<Channel> conns1 = connections.get(key);
+ synchronized (conns1) {
+ conns1.add(ch);
+ }
+ }
+ } else {
+ ch.close();
+ // if responseAvailable has been completed, no-op: jersey will encounter IOException while reading response body
+ // if responseAvailable has not been completed, abort
+ responseAvailable.completeExceptionally(th);
+ }
+ });
HttpRequest nettyRequest;
String pathWithQuery = buildPathWithQueryParameters(requestUri);
@@ -226,14 +243,23 @@
nettyRequest.headers().add(HttpHeaderNames.HOST, jerseyRequest.getUri().getHost());
if (jerseyRequest.hasEntity()) {
+ // guard against prematurely closed channel
+ final GenericFutureListener<io.netty.util.concurrent.Future<? super Void>> closeListener =
+ new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
+ @Override
+ public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws Exception {
+ if (!responseDone.isDone()) {
+ responseDone.completeExceptionally(new IOException("Channel closed."));
+ }
+ }
+ };
+ ch.closeFuture().addListener(closeListener);
if (jerseyRequest.getLengthLong() == -1) {
HttpUtil.setTransferEncodingChunked(nettyRequest, true);
} else {
nettyRequest.headers().add(HttpHeaderNames.CONTENT_LENGTH, jerseyRequest.getLengthLong());
}
- }
- if (jerseyRequest.hasEntity()) {
// Send the HTTP request.
ch.writeAndFlush(nettyRequest);
@@ -260,27 +286,22 @@
try {
jerseyRequest.writeEntity();
} catch (IOException e) {
- jerseyCallback.failure(e);
- settableFuture.completeExceptionally(e);
+ responseDone.completeExceptionally(e);
}
}
});
ch.flush();
} else {
- // close listener is not needed any more.
- ch.closeFuture().removeListener(closeListener);
-
// Send the HTTP request.
ch.writeAndFlush(nettyRequest);
}
} catch (InterruptedException e) {
- settableFuture.completeExceptionally(e);
- return settableFuture;
+ responseDone.completeExceptionally(e);
}
- return settableFuture;
+ return responseAvailable;
}
private String buildPathWithQueryParameters(URI requestUri) {
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java
index 741121f..3da7019 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -16,107 +16,150 @@
package org.glassfish.jersey.netty.connector.internal;
-import java.io.IOException;
import java.io.InputStream;
-import java.util.concurrent.LinkedBlockingDeque;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
/**
* Input stream which servers as Request entity input.
* <p>
- * Consumes a list of pending {@link ByteBuf}s and processes them on request by Jersey
+ * Converts Netty NIO buffers to an input streams and stores them in the queue,
+ * waiting for Jersey to process it.
+ *
+ * @author Pavel Bucek
*/
public class NettyInputStream extends InputStream {
- private final LinkedBlockingDeque<ByteBuf> isList;
+ private volatile boolean end = false;
+ private Throwable cause;
- public NettyInputStream(LinkedBlockingDeque<ByteBuf> isList) {
- this.isList = isList;
+ private final ArrayDeque<ByteBuf> isList;
+ private ByteBuf current;
+ private ByteBuffer buffer;
+
+ private byte[] ONE_BYTE;
+ private boolean reading;
+
+ public NettyInputStream() {
+ this.isList = new ArrayDeque<>();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
-
- ByteBuf take;
- try {
- take = isList.take();
- boolean isReadable = take.isReadable();
- int read = -1;
- if (checkEndOfInputOrError(take)) {
- take.release();
+ if (current == null) {
+ buffer = awaitNext();
+ if (buffer == null) {
+ // assert: end is true
+ if (cause == null) {
return -1;
- }
+ }
- if (isReadable) {
- int readableBytes = take.readableBytes();
- read = Math.min(readableBytes, len);
- take.readBytes(b, off, read);
- if (read < len) {
- take.release();
- } else {
- isList.addFirst(take);
- }
- } else {
- read = 0;
- take.release(); //We don't need `0`
- }
+ throw new IOException(cause);
+ }
+ }
- return read;
- } catch (InterruptedException e) {
- throw new IOException("Interrupted.", e);
- }
+ int rem = buffer.remaining();
+ if (rem < len) {
+ len = rem;
+ }
+ buffer.get(b, off, len);
+ if (rem == len) {
+ releaseByteBuf();
+ }
+
+ return len;
}
@Override
public int read() throws IOException {
+ if (ONE_BYTE == null) {
+ ONE_BYTE = new byte[1];
+ }
+ int r = read(ONE_BYTE, 0, 1);
+ if (r < 0) {
+ return r;
+ }
- ByteBuf take;
- try {
- take = isList.take();
- boolean isReadable = take.isReadable();
- if (checkEndOfInputOrError(take)) {
- take.release();
- return -1;
- }
-
- if (isReadable) {
- return take.readInt();
- } else {
- take.release(); //We don't need `0`
- }
-
- return 0;
- } catch (InterruptedException e) {
- throw new IOException("Interrupted.", e);
- }
+ return ONE_BYTE[0] & 0xff;
}
@Override
- public void close() throws IOException {
- if (isList != null) {
- while (!isList.isEmpty()) {
- try {
- isList.take().release();
- } catch (InterruptedException e) {
- throw new IOException("Interrupted. Potential ByteBuf Leak.", e);
- }
- }
+ public void close() {
+
+ releaseByteBuf();
+
+ cleanup(true);
+ }
+
+ private void releaseByteBuf() {
+ if (current != null) {
+ current.release();
}
- super.close();
+
+ current = null;
+ buffer = null;
+ }
+
+ protected synchronized ByteBuffer awaitNext() {
+ while (isList.isEmpty()) {
+ if (end) {
+ return null;
+ }
+
+ try {
+ reading = true;
+ wait();
+ reading = false;
+ } catch (InterruptedException ie) {
+ // waiting uninterruptibly
+ }
+ }
+
+ current = isList.poll();
+ return current.nioBuffer().asReadOnlyBuffer();
+ }
+
+ public void complete(Throwable cause) {
+ this.cause = cause;
+ cleanup(cause != null);
+ }
+
+ protected synchronized void cleanup(boolean drain) {
+ if (drain) {
+ while (!isList.isEmpty()) {
+ isList.poll().release();
+ }
+ }
+
+ end = true;
+
+ if (reading) {
+ notifyAll();
+ }
}
@Override
public int available() throws IOException {
- ByteBuf peek = isList.peek();
- if (peek != null && peek.isReadable()) {
- return peek.readableBytes();
- }
- return 0;
+ return buffer == null ? 0 : buffer.remaining();
}
- private boolean checkEndOfInputOrError(ByteBuf take) throws IOException {
- return take == Unpooled.EMPTY_BUFFER;
+ public synchronized void publish(ByteBuf content) {
+ if (end || content.nioBuffer().remaining() == 0) {
+ content.release();
+ return;
+ }
+
+ isList.add(content);
+ if (reading) {
+ notifyAll();
+ }
+ }
+
+ public void clear() {
+ end = false;
+ isList.clear();
}
}
diff --git a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HelloWorldTest.java b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HelloWorldTest.java
index 6716c9b..20caa53 100644
--- a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HelloWorldTest.java
+++ b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HelloWorldTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -77,6 +77,7 @@
public void testConnection() {
Response response = target().path(ROOT_PATH).request("text/plain").get();
assertEquals(200, response.getStatus());
+ response.close();
}
@Test
diff --git a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HttpHeadersTest.java b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HttpHeadersTest.java
index 10cfa14..f5438e5 100644
--- a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HttpHeadersTest.java
+++ b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HttpHeadersTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, 2018 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -64,5 +64,6 @@
assertEquals(200, response.getStatus());
assertTrue(response.hasEntity());
+ response.close();
}
}
diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java
index e64476d..6eeac17 100644
--- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java
+++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -25,11 +25,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
import javax.ws.rs.core.SecurityContext;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@@ -55,7 +52,7 @@
class JerseyHttp2ServerHandler extends ChannelDuplexHandler {
private final URI baseUri;
- private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
+ private final NettyInputStream nettyInputStream = new NettyInputStream();
private final NettyHttpContainer container;
private final ResourceConfig resourceConfig;
@@ -92,9 +89,9 @@
* Process incoming data.
*/
private void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) throws Exception {
- isList.add(data.content());
+ nettyInputStream.publish(data.content());
if (data.isEndStream()) {
- isList.add(Unpooled.EMPTY_BUFFER);
+ nettyInputStream.complete(null);
}
}
@@ -163,11 +160,11 @@
ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
- isList.add(Unpooled.EMPTY_BUFFER);
+ nettyInputStream.complete(future.cause());
}
});
- requestContext.setEntityStream(new NettyInputStream(isList));
+ requestContext.setEntityStream(nettyInputStream);
} else {
requestContext.setEntityStream(new InputStream() {
@Override
diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java
index 712cb1f..0f2a7ae 100644
--- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java
+++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -20,13 +20,11 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.MediaType;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -53,7 +51,7 @@
class JerseyServerHandler extends ChannelInboundHandlerAdapter {
private final URI baseUri;
- private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
+ private final NettyInputStream nettyInputStream = new NettyInputStream();
private final NettyHttpContainer container;
private final ResourceConfig resourceConfig;
@@ -82,7 +80,7 @@
ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
}
- isList.clear(); // clearing the content - possible leftover from previous request processing.
+ nettyInputStream.clear(); // clearing the content - possible leftover from previous request processing.
final ContainerRequest requestContext = createContainerRequest(ctx, req);
requestContext.setWriter(new NettyResponseWriter(ctx, req, container));
@@ -105,7 +103,7 @@
//Otherwise, it's safe to discard during next processing
if ((!isJson && contentLength != -1) || HttpUtil.isTransferEncodingChunked(req)
|| (isJson && contentLength >= 2)) {
- requestContext.setEntityStream(new NettyInputStream(isList));
+ requestContext.setEntityStream(nettyInputStream);
}
}
@@ -128,11 +126,11 @@
ByteBuf content = httpContent.content();
if (content.isReadable()) {
- isList.add(content);
+ nettyInputStream.publish(content);
}
if (msg instanceof LastHttpContent) {
- isList.add(Unpooled.EMPTY_BUFFER);
+ nettyInputStream.complete(null);
}
}
}
diff --git a/pom.xml b/pom.xml
index a157ec4..3cad108 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2141,7 +2141,7 @@
<mockito.version>1.10.19</mockito.version>
<moxy.version>2.7.4</moxy.version>
<mustache.version>0.8.17</mustache.version>
- <netty.version>4.1.31.Final</netty.version>
+ <netty.version>4.1.43.Final</netty.version>
<nexus-staging.mvn.plugin.version>1.6.7</nexus-staging.mvn.plugin.version>
<opentracing.version>0.30.0</opentracing.version>
<osgi.version>6.0.0</osgi.version>