Support NettyConnector & RequestEntityProcessing.BUFFERED

Signed-off-by: jansupol <jan.supol@oracle.com>
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java
index 9406b51..a2c675a 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java
@@ -84,7 +84,7 @@
 import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
 import org.glassfish.jersey.client.spi.Connector;
 import org.glassfish.jersey.message.internal.OutboundMessageContext;
-import org.glassfish.jersey.netty.connector.internal.JerseyChunkedInput;
+import org.glassfish.jersey.netty.connector.internal.NettyEntityWriter;
 
 /**
  * Netty connector implementation.
@@ -391,27 +391,34 @@
                         }
                     };
                 ch.closeFuture().addListener(closeListener);
-                if (jerseyRequest.getLengthLong() == -1) {
-                    HttpUtil.setTransferEncodingChunked(nettyRequest, true);
-                } else {
-                    nettyRequest.headers().add(HttpHeaderNames.CONTENT_LENGTH, jerseyRequest.getLengthLong());
+
+                final NettyEntityWriter entityWriter = NettyEntityWriter.getInstance(jerseyRequest, ch);
+                switch (entityWriter.getType()) {
+                    case CHUNKED:
+                        HttpUtil.setTransferEncodingChunked(nettyRequest, true);
+                        break;
+                    case PRESET:
+                        nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, jerseyRequest.getLengthLong());
+                        break;
+//                  case DELAYED:
+//                      // Set later after the entity is "written"
+//                      break;
                 }
 
                 // Send the HTTP request.
-                ch.writeAndFlush(nettyRequest);
+                entityWriter.writeAndFlush(nettyRequest);
 
-                final JerseyChunkedInput jerseyChunkedInput = new JerseyChunkedInput(ch);
                 jerseyRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() {
                     @Override
                     public OutputStream getOutputStream(int contentLength) throws IOException {
-                        return jerseyChunkedInput;
+                        return entityWriter.getOutputStream();
                     }
                 });
 
                 if (HttpUtil.isTransferEncodingChunked(nettyRequest)) {
-                    ch.write(new HttpChunkedInput(jerseyChunkedInput));
+                    entityWriter.write(new HttpChunkedInput(entityWriter.getChunkedInput()));
                 } else {
-                    ch.write(jerseyChunkedInput);
+                    entityWriter.write(entityWriter.getChunkedInput());
                 }
 
                 executorService.execute(new Runnable() {
@@ -422,19 +429,28 @@
 
                         try {
                             jerseyRequest.writeEntity();
+
+                            if (entityWriter.getType() == NettyEntityWriter.Type.DELAYED) {
+                                replaceHeaders(jerseyRequest, nettyRequest.headers()); // WriterInterceptor changes
+                                nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, entityWriter.getLength());
+                                entityWriter.flush();
+                            }
+
                         } catch (IOException e) {
                             responseDone.completeExceptionally(e);
                         }
                     }
                 });
 
-                ch.flush();
+                if (entityWriter.getType() != NettyEntityWriter.Type.DELAYED) {
+                    entityWriter.flush();
+                }
             } else {
                 // Send the HTTP request.
                 ch.writeAndFlush(nettyRequest);
             }
 
-        } catch (InterruptedException e) {
+        } catch (IOException | InterruptedException e) {
             responseDone.completeExceptionally(e);
         }
     }
@@ -508,4 +524,11 @@
         }
         return headers;
     }
+
+    private static HttpHeaders replaceHeaders(ClientRequest jerseyRequest, HttpHeaders headers) {
+        for (final Map.Entry<String, List<String>> e : jerseyRequest.getStringHeaders().entrySet()) {
+            headers.set(e.getKey(), e.getValue());
+        }
+        return headers;
+    }
 }
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyEntityWriter.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyEntityWriter.java
new file mode 100644
index 0000000..a9e7040
--- /dev/null
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyEntityWriter.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.netty.connector.internal;
+
+import io.netty.channel.Channel;
+import io.netty.handler.stream.ChunkedInput;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientRequest;
+import org.glassfish.jersey.client.RequestEntityProcessing;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * The Entity Writer is used to write entity in Netty. One implementation is delayed,
+ * so that the complete message length can be set to Content-Length header.
+ */
+public interface NettyEntityWriter {
+
+    /**
+     * Type of the entity writer. {@code CHUNKED} is used for chunked data. {@code PRESET} is for buffered data, but the
+     * content length was pre-set by the customer. {@code DELAYED} is for buffered data where the content-length is unknown.
+     * The headers must not be written before the entity is provided by MessageBodyWriter to know the exact length.
+     */
+    enum Type {
+        CHUNKED,
+        PRESET,
+        DELAYED
+    }
+
+    /**
+     * Writes the Object to the channel
+     * @param object object to be written
+     */
+    void write(Object object);
+
+    /**
+     * Writes the Object to the channel and flush.
+     * @param object object to be written
+     */
+    void writeAndFlush(Object object);
+
+    /**
+     * Flushes the writen objects. Can throw IOException.
+     * @throws IOException
+     */
+    void flush() throws IOException;
+
+    /**
+     * Get the netty Chunked Input to be written.
+     * @return The Chunked input instance
+     */
+    ChunkedInput getChunkedInput();
+
+    /**
+     * Get the {@link OutputStream} used to write an entity
+     * @return the OutputStream to write an entity
+     */
+    OutputStream getOutputStream();
+
+    /**
+     * Get the length of the entity written to the {@link OutputStream}
+     * @return
+     */
+    long getLength();
+
+    /**
+     * Return Type of
+     * @return
+     */
+    Type getType();
+
+    static NettyEntityWriter getInstance(ClientRequest clientRequest, Channel channel) {
+        final long lengthLong = clientRequest.getLengthLong();
+        final RequestEntityProcessing entityProcessing = clientRequest.resolveProperty(
+                ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.class);
+
+        if ((entityProcessing == null && lengthLong == -1) || entityProcessing == RequestEntityProcessing.CHUNKED) {
+            return new DirectEntityWriter(channel, Type.CHUNKED);
+        } else if (lengthLong != -1) {
+            return new DirectEntityWriter(channel, Type.PRESET);
+        } else {
+            return new DelayedEntityWriter(channel, Type.DELAYED);
+        }
+    }
+
+    class DirectEntityWriter implements NettyEntityWriter {
+        private final Channel channel;
+        private final JerseyChunkedInput stream;
+        private final Type type;
+
+        public DirectEntityWriter(Channel channel, Type type) {
+            this.channel = channel;
+            stream = new JerseyChunkedInput(channel);
+            this.type = type;
+        }
+
+        @Override
+        public void write(Object object) {
+            channel.write(object);
+        }
+
+        @Override
+        public void writeAndFlush(Object object) {
+            channel.writeAndFlush(object);
+        }
+
+        @Override
+        public void flush() {
+            channel.flush();
+        }
+
+        @Override
+        public ChunkedInput getChunkedInput() {
+            return stream;
+        }
+
+        @Override
+        public OutputStream getOutputStream() {
+            return stream;
+        }
+
+        @Override
+        public long getLength() {
+            return stream.progress();
+        }
+
+        @Override
+        public Type getType() {
+            return type;
+        }
+    }
+
+    class DelayedEntityWriter implements NettyEntityWriter {
+        private final List<Runnable> delayedOps;
+        private final DirectEntityWriter writer;
+        private final DelayedOutputStream outputStream;
+
+        private boolean flushed = false;
+        private boolean closed = false;
+
+        public DelayedEntityWriter(Channel channel, Type type) {
+            this.writer = new DirectEntityWriter(channel, type);
+            this.delayedOps = new LinkedList<>();
+            this.outputStream = new DelayedOutputStream();
+        }
+
+
+        @Override
+        public void write(Object object) {
+            if (!flushed) {
+                delayedOps.add(() -> writer.write(object));
+            } else {
+                writer.write(object);
+            }
+        }
+
+        @Override
+        public void writeAndFlush(Object object) {
+            if (!flushed) {
+                delayedOps.add(() -> writer.writeAndFlush(object));
+            } else {
+                writer.writeAndFlush(object);
+            }
+        }
+
+        @Override
+        public void flush() throws IOException {
+            _flush();
+            if (!closed) {
+                closed = true;
+                writer.getOutputStream().close(); // Jersey automatically closes DelayedOutputStream not this one!
+            }
+            writer.flush();
+        }
+
+        private void _flush() throws IOException {
+            if (!flushed) {
+                flushed = true;
+                for (Runnable runnable : delayedOps) {
+                    runnable.run();
+                }
+
+                if (outputStream.b != null) {
+                    writer.getOutputStream().write(outputStream.b, outputStream.off, outputStream.len);
+                }
+            }
+        }
+
+        @Override
+        public ChunkedInput getChunkedInput() {
+            return writer.getChunkedInput();
+        }
+
+        @Override
+        public OutputStream getOutputStream() {
+            return outputStream;
+        }
+
+
+        @Override
+        public long getLength() {
+            return outputStream.len - outputStream.off;
+        }
+
+        @Override
+        public Type getType() {
+            return writer.getType();
+        }
+
+        private class DelayedOutputStream extends OutputStream {
+            private byte[] b;
+            private int off;
+            private int len;
+
+            @Override
+            public void write(int b) throws IOException {
+                write(new byte[]{(byte) (b & 0xFF)}, 0, 1);
+            }
+
+            @Override
+            public void write(byte[] b) throws IOException {
+                write(b, 0, b.length);
+            }
+
+            @Override
+            public void write(byte[] b, int off, int len) throws IOException {
+                if (!flushed && this.b == null) {
+                    this.b = b;
+                    this.off = off;
+                    this.len = len;
+                } else {
+                    DelayedEntityWriter.this._flush();
+                    writer.getOutputStream().write(b, off, len);
+                }
+            }
+        }
+    }
+}
diff --git a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/BufferedTest.java b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/BufferedTest.java
new file mode 100644
index 0000000..fdaf5be
--- /dev/null
+++ b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/BufferedTest.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.netty.connector;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.RequestEntityProcessing;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.client.ClientRequestContext;
+import javax.ws.rs.client.ClientRequestFilter;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.WriterInterceptor;
+import javax.ws.rs.ext.WriterInterceptorContext;
+import java.io.IOException;
+
+public class BufferedTest extends JerseyTest {
+
+    private static String HEADER_1 = "First";
+    private static String HEADER_2 = "Second";
+    private static String HEADER_3 = "Third";
+    private static String ENTITY = "entity";
+
+
+    @Path("/buffered")
+    public static class BufferedTestResource {
+        @POST
+        public String post(@Context HttpHeaders headers, String entity) {
+            System.out.println("Remote");
+            String ret = headers.getHeaderString(HEADER_1)
+                    + headers.getHeaderString(HEADER_2)
+                    + headers.getHeaderString(HEADER_3)
+                    + entity;
+            System.out.println(ret);
+            return ret;
+        }
+    }
+
+    @Override
+    protected Application configure() {
+        return new ResourceConfig(BufferedTestResource.class);
+    }
+
+    @Override
+    protected void configureClient(ClientConfig config) {
+        config.connectorProvider(new NettyConnectorProvider())
+                .property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.BUFFERED);
+    }
+
+    @Test
+    public void test() {
+        try (Response r = target("buffered")
+                .register(new ClientRequestFilter() {
+                    @Override
+                    public void filter(ClientRequestContext requestContext) throws IOException {
+                        requestContext.setEntity(ENTITY);
+                        requestContext.getHeaders().add(HEADER_2, HEADER_2);
+                    }
+                })
+                .register(new WriterInterceptor() {
+                    @Override
+                    public void aroundWriteTo(WriterInterceptorContext context) throws IOException, WebApplicationException {
+                        context.getHeaders().add(HEADER_3, HEADER_3);
+                        context.proceed();
+                    }
+                })
+                .request()
+                .header(HEADER_1, HEADER_1)
+                .post(Entity.entity("ENTITY", MediaType.TEXT_PLAIN_TYPE))) {
+            String response = r.readEntity(String.class);
+            Assertions.assertEquals(HEADER_1 + HEADER_2 + HEADER_3 + ENTITY, response);
+        }
+    }
+}
diff --git a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/BufferingTest.java b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/BufferingTest.java
index 18346e8..1256b1c 100644
--- a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/BufferingTest.java
+++ b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/BufferingTest.java
@@ -38,6 +38,7 @@
 import org.glassfish.jersey.grizzly.connector.GrizzlyConnectorProvider;
 import org.glassfish.jersey.jdk.connector.JdkConnectorProvider;
 import org.glassfish.jersey.logging.LoggingFeature;
+import org.glassfish.jersey.netty.connector.NettyConnectorProvider;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.test.JerseyTest;
 
@@ -78,6 +79,7 @@
                 Arguments.of(new TestArguments(() -> new ApacheConnectorProvider(), RequestEntityProcessing.CHUNKED)),
                 Arguments.of(new TestArguments(() -> new Apache5ConnectorProvider(), RequestEntityProcessing.CHUNKED)),
                 Arguments.of(new TestArguments(() -> new GrizzlyConnectorProvider(), RequestEntityProcessing.CHUNKED)),
+                Arguments.of(new TestArguments(() -> new NettyConnectorProvider(), RequestEntityProcessing.CHUNKED)),
                 Arguments.of(new TestArguments(() -> new HttpUrlConnectorProvider(), RequestEntityProcessing.BUFFERED)),
                 Arguments.of(new TestArguments(() -> new JdkConnectorProvider(), RequestEntityProcessing.BUFFERED))
         );