netty connector/container modifications (#4387)

* netty connector/container modifications

Signed-off-by: Maxim Nesen <maxim.nesen@oracle.com>
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
index 5531498..9ab9745 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -20,17 +20,14 @@
 import java.io.InputStream;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingDeque;
 
 import javax.ws.rs.core.Response;
 
 import org.glassfish.jersey.client.ClientRequest;
 import org.glassfish.jersey.client.ClientResponse;
-import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
 import org.glassfish.jersey.netty.connector.internal.NettyInputStream;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.HttpContent;
@@ -39,8 +36,6 @@
 import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpUtil;
 import io.netty.handler.codec.http.LastHttpContent;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
 
 /**
  * Jersey implementation of Netty channel handler.
@@ -49,19 +44,38 @@
  */
 class JerseyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
 
-    private final NettyConnector connector;
-    private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
-
-    private final AsyncConnectorCallback asyncConnectorCallback;
     private final ClientRequest jerseyRequest;
-    private final CompletableFuture future;
+    private final CompletableFuture<ClientResponse> responseAvailable;
+    private final CompletableFuture<?> responseDone;
 
-    JerseyClientHandler(NettyConnector nettyConnector, ClientRequest request,
-                        AsyncConnectorCallback callback, CompletableFuture future) {
-        this.connector = nettyConnector;
-        this.asyncConnectorCallback = callback;
+    private NettyInputStream nis;
+    private ClientResponse jerseyResponse;
+
+    JerseyClientHandler(ClientRequest request,
+                        CompletableFuture<ClientResponse> responseAvailable,
+                        CompletableFuture<?> responseDone) {
         this.jerseyRequest = request;
-        this.future = future;
+        this.responseAvailable = responseAvailable;
+        this.responseDone = responseDone;
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) {
+       notifyResponse();
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+       // assert: no-op, if channel is closed after LastHttpContent has been consumed
+       responseDone.completeExceptionally(new IOException("Stream closed"));
+    }
+
+    protected void notifyResponse() {
+       if (jerseyResponse != null) {
+          ClientResponse cr = jerseyResponse;
+          jerseyResponse = null;
+          responseAvailable.complete(cr);
+       }
     }
 
     @Override
@@ -69,7 +83,7 @@
         if (msg instanceof HttpResponse) {
             final HttpResponse response = (HttpResponse) msg;
 
-            final ClientResponse jerseyResponse = new ClientResponse(new Response.StatusType() {
+            jerseyResponse = new ClientResponse(new Response.StatusType() {
                 @Override
                 public int getStatusCode() {
                     return response.status().code();
@@ -89,19 +103,15 @@
             for (Map.Entry<String, String> entry : response.headers().entries()) {
                 jerseyResponse.getHeaders().add(entry.getKey(), entry.getValue());
             }
-            isList.clear(); // clearing the content - possible leftover from previous request processing.
+
             // request entity handling.
             if ((response.headers().contains(HttpHeaderNames.CONTENT_LENGTH) && HttpUtil.getContentLength(response) > 0)
                     || HttpUtil.isTransferEncodingChunked(response)) {
 
-                ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
-                    @Override
-                    public void operationComplete(Future<? super Void> future) throws Exception {
-                        isList.add(Unpooled.EMPTY_BUFFER);
-                    }
-                });
+                nis = new NettyInputStream();
+                responseDone.whenComplete((_r, th) -> nis.complete(th));
 
-                jerseyResponse.setEntityStream(new NettyInputStream(isList));
+                jerseyResponse.setEntityStream(nis);
             } else {
                 jerseyResponse.setEntityStream(new InputStream() {
                     @Override
@@ -110,44 +120,29 @@
                     }
                 });
             }
-
-            if (asyncConnectorCallback != null) {
-                connector.executorService.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        asyncConnectorCallback.response(jerseyResponse);
-                        future.complete(jerseyResponse);
-                    }
-                });
-            }
-
         }
         if (msg instanceof HttpContent) {
+
             HttpContent httpContent = (HttpContent) msg;
 
             ByteBuf content = httpContent.content();
+
             if (content.isReadable()) {
                 content.retain();
-                isList.add(content);
+                nis.publish(content);
             }
 
             if (msg instanceof LastHttpContent) {
-                isList.add(Unpooled.EMPTY_BUFFER);
+                responseDone.complete(null);
+                notifyResponse();
             }
         }
     }
 
+
+
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) {
-        if (asyncConnectorCallback != null) {
-            connector.executorService.execute(new Runnable() {
-                @Override
-                public void run() {
-                    asyncConnectorCallback.failure(cause);
-                }
-            });
-        }
-        future.completeExceptionally(cause);
-        ctx.close();
+        responseDone.completeExceptionally(cause);
     }
 }
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java
index 2b36006..373d3c8 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java
@@ -20,6 +20,8 @@
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -28,7 +30,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.ws.rs.ProcessingException;
 import javax.ws.rs.client.Client;
@@ -76,6 +77,7 @@
     final ExecutorService executorService;
     final EventLoopGroup group;
     final Client client;
+    final HashMap<String, ArrayList<Channel>> connections = new HashMap<>();
 
     NettyConnector(Client client) {
 
@@ -83,72 +85,72 @@
 
         if (threadPoolSize != null && threadPoolSize instanceof Integer && (Integer) threadPoolSize > 0) {
             executorService = Executors.newFixedThreadPool((Integer) threadPoolSize);
+            this.group = new NioEventLoopGroup((Integer) threadPoolSize);
         } else {
             executorService = Executors.newCachedThreadPool();
+            this.group = new NioEventLoopGroup();
         }
 
-        this.group = new NioEventLoopGroup();
         this.client = client;
     }
 
     @Override
     public ClientResponse apply(ClientRequest jerseyRequest) {
-
-        final AtomicReference<ClientResponse> syncResponse = new AtomicReference<>(null);
-        final AtomicReference<Throwable> syncException = new AtomicReference<>(null);
-
         try {
-            Future<?> resultFuture = apply(jerseyRequest, new AsyncConnectorCallback() {
-                @Override
-                public void response(ClientResponse response) {
-                    syncResponse.set(response);
-                }
-
-                @Override
-                public void failure(Throwable failure) {
-                    syncException.set(failure);
-                }
-            });
+            CompletableFuture<ClientResponse> resultFuture = execute(jerseyRequest);
 
             Integer timeout = ClientProperties.getValue(jerseyRequest.getConfiguration().getProperties(),
                                                         ClientProperties.READ_TIMEOUT, 0);
 
-            if (timeout != null && timeout > 0) {
-                resultFuture.get(timeout, TimeUnit.MILLISECONDS);
-            } else {
-                resultFuture.get();
-            }
+            return (timeout != null && timeout > 0) ? resultFuture.get(timeout, TimeUnit.MILLISECONDS)
+                                                    : resultFuture.get();
         } catch (ExecutionException ex) {
             Throwable e = ex.getCause() == null ? ex : ex.getCause();
             throw new ProcessingException(e.getMessage(), e);
         } catch (Exception ex) {
             throw new ProcessingException(ex.getMessage(), ex);
         }
-
-        Throwable throwable = syncException.get();
-        if (throwable == null) {
-            return syncResponse.get();
-        } else {
-            throw new RuntimeException(throwable);
-        }
     }
 
     @Override
     public Future<?> apply(final ClientRequest jerseyRequest, final AsyncConnectorCallback jerseyCallback) {
+        return execute(jerseyRequest).whenCompleteAsync((r, th) -> {
+                  if (th == null) jerseyCallback.response(r);
+                  else jerseyCallback.failure(th);
+               }, executorService);
+    }
 
-        final CompletableFuture<Object> settableFuture = new CompletableFuture<>();
+    protected CompletableFuture<ClientResponse> execute(final ClientRequest jerseyRequest) {
+        final CompletableFuture<ClientResponse> responseAvailable = new CompletableFuture<>();
+        final CompletableFuture<?> responseDone = new CompletableFuture<>();
 
         final URI requestUri = jerseyRequest.getUri();
         String host = requestUri.getHost();
         int port = requestUri.getPort() != -1 ? requestUri.getPort() : "https".equals(requestUri.getScheme()) ? 443 : 80;
 
         try {
-            Bootstrap b = new Bootstrap();
-            b.group(group)
-             .channel(NioSocketChannel.class)
-             .handler(new ChannelInitializer<SocketChannel>() {
-                 @Override
-                 protected void initChannel(SocketChannel ch) throws Exception {
+            String key = requestUri.getScheme() + "://" + host + ":" + port;
+            ArrayList<Channel> conns;
+            synchronized (connections) {
+               conns = connections.get(key);
+               if (conns == null) {
+                  conns = new ArrayList<>(0);
+                  connections.put(key, conns);
+               }
+            }
+
+            Channel chan;
+            synchronized (conns) {
+               chan = conns.size() == 0 ? null : conns.remove(conns.size() - 1);
+            }
+
+            if (chan == null) {
+               Bootstrap b = new Bootstrap();
+               b.group(group)
+                .channel(NioSocketChannel.class)
+                .handler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    protected void initChannel(SocketChannel ch) throws Exception {
                      ChannelPipeline p = ch.pipeline();
 
                      // Enable HTTPS if necessary.
@@ -177,32 +179,47 @@
                      p.addLast(new HttpClientCodec());
                      p.addLast(new ChunkedWriteHandler());
                      p.addLast(new HttpContentDecompressor());
-                     p.addLast(new JerseyClientHandler(NettyConnector.this, jerseyRequest, jerseyCallback, settableFuture));
-                 }
-             });
+                    }
+                });
 
-            // connect timeout
-            Integer connectTimeout = ClientProperties.getValue(jerseyRequest.getConfiguration().getProperties(),
-                                                               ClientProperties.CONNECT_TIMEOUT, 0);
-            if (connectTimeout > 0) {
-                b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
+               // connect timeout
+               Integer connectTimeout = ClientProperties.getValue(jerseyRequest.getConfiguration().getProperties(),
+                                                                  ClientProperties.CONNECT_TIMEOUT, 0);
+               if (connectTimeout > 0) {
+                   b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
+               }
+
+               // Make the connection attempt.
+               chan = b.connect(host, port).sync().channel();
             }
 
-            // Make the connection attempt.
-            final Channel ch = b.connect(host, port).sync().channel();
+            // assert: clientHandler will always notify responseDone: either normally, or exceptionally
+            // assert: clientHandler may notify responseAvailable, if sufficient parts of response are detected to construct
+            //         a valid ClientResponse
+            // assert: responseAvailable completion may be racing against responseDone completion
+            // assert: it is ok to abort the entire response, if responseDone is completed exceptionally - in particular, nothing
+            //         will leak
+            final Channel ch = chan;
+            JerseyClientHandler clientHandler = new JerseyClientHandler(jerseyRequest, responseAvailable, responseDone);
+            ch.pipeline().addLast(clientHandler);
 
-            // guard against prematurely closed channel
-            final GenericFutureListener<io.netty.util.concurrent.Future<? super Void>> closeListener =
-                    new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
-                        @Override
-                        public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws Exception {
-                            if (!settableFuture.isDone()) {
-                                settableFuture.completeExceptionally(new IOException("Channel closed."));
-                            }
-                        }
-                    };
+            responseDone.whenComplete((_r, th) -> {
+               ch.pipeline().remove(clientHandler);
 
-            ch.closeFuture().addListener(closeListener);
+               if (th == null) {
+                  synchronized (connections) {
+                     ArrayList<Channel> conns1 = connections.get(key);
+                     synchronized (conns1) {
+                        conns1.add(ch);
+                     }
+                  }
+               } else {
+                  ch.close();
+                  // if responseAvailable has been completed, no-op: jersey will encounter IOException while reading response body
+                  // if responseAvailable has not been completed, abort
+                  responseAvailable.completeExceptionally(th);
+               }
+            });
 
             HttpRequest nettyRequest;
             String pathWithQuery = buildPathWithQueryParameters(requestUri);
@@ -226,14 +243,23 @@
             nettyRequest.headers().add(HttpHeaderNames.HOST, jerseyRequest.getUri().getHost());
 
             if (jerseyRequest.hasEntity()) {
+                // guard against prematurely closed channel
+                final GenericFutureListener<io.netty.util.concurrent.Future<? super Void>> closeListener =
+                    new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
+                        @Override
+                        public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws Exception {
+                            if (!responseDone.isDone()) {
+                                responseDone.completeExceptionally(new IOException("Channel closed."));
+                            }
+                        }
+                    };
+                ch.closeFuture().addListener(closeListener);
                 if (jerseyRequest.getLengthLong() == -1) {
                     HttpUtil.setTransferEncodingChunked(nettyRequest, true);
                 } else {
                     nettyRequest.headers().add(HttpHeaderNames.CONTENT_LENGTH, jerseyRequest.getLengthLong());
                 }
-            }
 
-            if (jerseyRequest.hasEntity()) {
                 // Send the HTTP request.
                 ch.writeAndFlush(nettyRequest);
 
@@ -260,27 +286,22 @@
                         try {
                             jerseyRequest.writeEntity();
                         } catch (IOException e) {
-                            jerseyCallback.failure(e);
-                            settableFuture.completeExceptionally(e);
+                            responseDone.completeExceptionally(e);
                         }
                     }
                 });
 
                 ch.flush();
             } else {
-                // close listener is not needed any more.
-                ch.closeFuture().removeListener(closeListener);
-
                 // Send the HTTP request.
                 ch.writeAndFlush(nettyRequest);
             }
 
         } catch (InterruptedException e) {
-            settableFuture.completeExceptionally(e);
-            return settableFuture;
+            responseDone.completeExceptionally(e);
         }
 
-        return settableFuture;
+        return responseAvailable;
     }
 
     private String buildPathWithQueryParameters(URI requestUri) {
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java
index 741121f..3da7019 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -16,107 +16,150 @@
 
 package org.glassfish.jersey.netty.connector.internal;
 
-import java.io.IOException;
 import java.io.InputStream;
-import java.util.concurrent.LinkedBlockingDeque;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 
 /**
  * Input stream which servers as Request entity input.
  * <p>
- * Consumes a list of pending {@link ByteBuf}s and processes them on request by Jersey
+ * Converts Netty NIO buffers to an input streams and stores them in the queue,
+ * waiting for Jersey to process it.
+ *
+ * @author Pavel Bucek
  */
 public class NettyInputStream extends InputStream {
 
-    private final LinkedBlockingDeque<ByteBuf> isList;
+    private volatile boolean end = false;
+    private Throwable cause;
 
-    public NettyInputStream(LinkedBlockingDeque<ByteBuf> isList) {
-        this.isList = isList;
+    private final ArrayDeque<ByteBuf> isList;
+    private ByteBuf current;
+    private ByteBuffer buffer;
+
+    private byte[] ONE_BYTE;
+    private boolean reading;
+
+    public NettyInputStream() {
+        this.isList = new ArrayDeque<>();
     }
 
     @Override
     public int read(byte[] b, int off, int len) throws IOException {
-
-        ByteBuf take;
-        try {
-            take = isList.take();
-            boolean isReadable = take.isReadable();
-            int read = -1;
-            if (checkEndOfInputOrError(take)) {
-                take.release();
+       if (current == null) {
+          buffer = awaitNext();
+          if (buffer == null) {
+             // assert: end is true
+             if (cause == null) {
                 return -1;
-            }
+             }
 
-            if (isReadable) {
-                int readableBytes = take.readableBytes();
-                read = Math.min(readableBytes, len);
-                take.readBytes(b, off, read);
-                if (read < len) {
-                    take.release();
-                } else {
-                    isList.addFirst(take);
-                }
-            } else {
-                read = 0;
-                take.release(); //We don't need `0`
-            }
+             throw new IOException(cause);
+          }
+       }
 
-            return read;
-        } catch (InterruptedException e) {
-            throw new IOException("Interrupted.", e);
-        }
+       int rem = buffer.remaining();
+       if (rem < len) {
+          len = rem;
+       }
+       buffer.get(b, off, len);
+       if (rem == len) {
+          releaseByteBuf();
+       }
+
+       return len;
     }
 
     @Override
     public int read() throws IOException {
+       if (ONE_BYTE == null) {
+          ONE_BYTE = new byte[1];
+       }
+       int r = read(ONE_BYTE, 0, 1);
+       if (r < 0) {
+          return r;
+       }
 
-        ByteBuf take;
-        try {
-            take = isList.take();
-            boolean isReadable = take.isReadable();
-            if (checkEndOfInputOrError(take)) {
-                take.release();
-                return -1;
-            }
-
-            if (isReadable) {
-                return take.readInt();
-            } else {
-                take.release(); //We don't need `0`
-            }
-
-            return 0;
-        } catch (InterruptedException e) {
-            throw new IOException("Interrupted.", e);
-        }
+       return ONE_BYTE[0] & 0xff;
     }
 
     @Override
-    public void close() throws IOException {
-        if (isList != null) {
-            while (!isList.isEmpty()) {
-                try {
-                    isList.take().release();
-                } catch (InterruptedException e) {
-                    throw new IOException("Interrupted. Potential ByteBuf Leak.", e);
-                }
-            }
+    public void close() {
+
+        releaseByteBuf();
+
+        cleanup(true);
+    }
+
+    private void releaseByteBuf() {
+        if (current != null) {
+            current.release();
         }
-        super.close();
+
+        current = null;
+        buffer = null;
+    }
+
+    protected synchronized ByteBuffer awaitNext() {
+       while (isList.isEmpty()) {
+          if (end) {
+             return null;
+          }
+
+          try {
+             reading = true;
+             wait();
+             reading = false;
+          } catch (InterruptedException ie) {
+             // waiting uninterruptibly
+          }
+       }
+
+       current = isList.poll();
+       return current.nioBuffer().asReadOnlyBuffer();
+    }
+
+    public void complete(Throwable cause) {
+       this.cause = cause;
+       cleanup(cause != null);
+    }
+
+    protected synchronized void cleanup(boolean drain) {
+       if (drain) {
+          while (!isList.isEmpty()) {
+             isList.poll().release();
+          }
+       }
+
+       end = true;
+
+       if (reading) {
+          notifyAll();
+       }
     }
 
     @Override
     public int available() throws IOException {
-        ByteBuf peek = isList.peek();
-        if (peek != null && peek.isReadable()) {
-            return peek.readableBytes();
-        }
-        return 0;
+        return buffer == null ? 0 : buffer.remaining();
     }
 
-    private boolean checkEndOfInputOrError(ByteBuf take) throws IOException {
-        return take == Unpooled.EMPTY_BUFFER;
+    public synchronized void publish(ByteBuf content) {
+       if (end || content.nioBuffer().remaining() == 0) {
+          content.release();
+          return;
+       }
+
+       isList.add(content);
+       if (reading) {
+          notifyAll();
+       }
+    }
+
+    public void clear() {
+        end = false;
+        isList.clear();
     }
 }
diff --git a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HelloWorldTest.java b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HelloWorldTest.java
index 6716c9b..20caa53 100644
--- a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HelloWorldTest.java
+++ b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HelloWorldTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -77,6 +77,7 @@
     public void testConnection() {
         Response response = target().path(ROOT_PATH).request("text/plain").get();
         assertEquals(200, response.getStatus());
+        response.close();
     }
 
     @Test
diff --git a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HttpHeadersTest.java b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HttpHeadersTest.java
index 10cfa14..f5438e5 100644
--- a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HttpHeadersTest.java
+++ b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/HttpHeadersTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, 2018 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -64,5 +64,6 @@
 
         assertEquals(200, response.getStatus());
         assertTrue(response.hasEntity());
+        response.close();
     }
 }
diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java
index e64476d..6eeac17 100644
--- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java
+++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -25,11 +25,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
 
 import javax.ws.rs.core.SecurityContext;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
@@ -55,7 +52,7 @@
 class JerseyHttp2ServerHandler extends ChannelDuplexHandler {
 
     private final URI baseUri;
-    private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
+    private final NettyInputStream nettyInputStream = new NettyInputStream();
     private final NettyHttpContainer container;
     private final ResourceConfig resourceConfig;
 
@@ -92,9 +89,9 @@
      * Process incoming data.
      */
     private void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) throws Exception {
-        isList.add(data.content());
+        nettyInputStream.publish(data.content());
         if (data.isEndStream()) {
-            isList.add(Unpooled.EMPTY_BUFFER);
+            nettyInputStream.complete(null);
         }
     }
 
@@ -163,11 +160,11 @@
             ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
                 @Override
                 public void operationComplete(Future<? super Void> future) throws Exception {
-                    isList.add(Unpooled.EMPTY_BUFFER);
+                    nettyInputStream.complete(future.cause());
                 }
             });
 
-            requestContext.setEntityStream(new NettyInputStream(isList));
+            requestContext.setEntityStream(nettyInputStream);
         } else {
             requestContext.setEntityStream(new InputStream() {
                 @Override
diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java
index 712cb1f..0f2a7ae 100644
--- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java
+++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -20,13 +20,11 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
 
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.MediaType;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -53,7 +51,7 @@
 class JerseyServerHandler extends ChannelInboundHandlerAdapter {
 
     private final URI baseUri;
-    private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
+    private final NettyInputStream nettyInputStream = new NettyInputStream();
     private final NettyHttpContainer container;
     private final ResourceConfig resourceConfig;
 
@@ -82,7 +80,7 @@
                 ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
             }
 
-            isList.clear(); // clearing the content - possible leftover from previous request processing.
+            nettyInputStream.clear(); // clearing the content - possible leftover from previous request processing.
             final ContainerRequest requestContext = createContainerRequest(ctx, req);
 
             requestContext.setWriter(new NettyResponseWriter(ctx, req, container));
@@ -105,7 +103,7 @@
                 //Otherwise, it's safe to discard during next processing
                 if ((!isJson && contentLength != -1) || HttpUtil.isTransferEncodingChunked(req)
                         || (isJson && contentLength >= 2)) {
-                    requestContext.setEntityStream(new NettyInputStream(isList));
+                    requestContext.setEntityStream(nettyInputStream);
                 }
             }
 
@@ -128,11 +126,11 @@
 
           ByteBuf content = httpContent.content();
           if (content.isReadable()) {
-              isList.add(content);
+              nettyInputStream.publish(content);
           }
 
           if (msg instanceof LastHttpContent) {
-              isList.add(Unpooled.EMPTY_BUFFER);
+              nettyInputStream.complete(null);
           }
       }
     }
diff --git a/pom.xml b/pom.xml
index a157ec4..3cad108 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2141,7 +2141,7 @@
         <mockito.version>1.10.19</mockito.version>
         <moxy.version>2.7.4</moxy.version>
         <mustache.version>0.8.17</mustache.version>
-        <netty.version>4.1.31.Final</netty.version>
+        <netty.version>4.1.43.Final</netty.version>
         <nexus-staging.mvn.plugin.version>1.6.7</nexus-staging.mvn.plugin.version>
         <opentracing.version>0.30.0</opentracing.version>
         <osgi.version>6.0.0</osgi.version>