Support NettyConnector & RequestEntityProcessing.BUFFERED
Signed-off-by: jansupol <jan.supol@oracle.com>
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java
index 9406b51..a2c675a 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java
@@ -84,7 +84,7 @@
import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
import org.glassfish.jersey.client.spi.Connector;
import org.glassfish.jersey.message.internal.OutboundMessageContext;
-import org.glassfish.jersey.netty.connector.internal.JerseyChunkedInput;
+import org.glassfish.jersey.netty.connector.internal.NettyEntityWriter;
/**
* Netty connector implementation.
@@ -391,27 +391,34 @@
}
};
ch.closeFuture().addListener(closeListener);
- if (jerseyRequest.getLengthLong() == -1) {
- HttpUtil.setTransferEncodingChunked(nettyRequest, true);
- } else {
- nettyRequest.headers().add(HttpHeaderNames.CONTENT_LENGTH, jerseyRequest.getLengthLong());
+
+ final NettyEntityWriter entityWriter = NettyEntityWriter.getInstance(jerseyRequest, ch);
+ switch (entityWriter.getType()) {
+ case CHUNKED:
+ HttpUtil.setTransferEncodingChunked(nettyRequest, true);
+ break;
+ case PRESET:
+ nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, jerseyRequest.getLengthLong());
+ break;
+// case DELAYED:
+// // Set later after the entity is "written"
+// break;
}
// Send the HTTP request.
- ch.writeAndFlush(nettyRequest);
+ entityWriter.writeAndFlush(nettyRequest);
- final JerseyChunkedInput jerseyChunkedInput = new JerseyChunkedInput(ch);
jerseyRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() {
@Override
public OutputStream getOutputStream(int contentLength) throws IOException {
- return jerseyChunkedInput;
+ return entityWriter.getOutputStream();
}
});
if (HttpUtil.isTransferEncodingChunked(nettyRequest)) {
- ch.write(new HttpChunkedInput(jerseyChunkedInput));
+ entityWriter.write(new HttpChunkedInput(entityWriter.getChunkedInput()));
} else {
- ch.write(jerseyChunkedInput);
+ entityWriter.write(entityWriter.getChunkedInput());
}
executorService.execute(new Runnable() {
@@ -422,19 +429,28 @@
try {
jerseyRequest.writeEntity();
+
+ if (entityWriter.getType() == NettyEntityWriter.Type.DELAYED) {
+ replaceHeaders(jerseyRequest, nettyRequest.headers()); // WriterInterceptor changes
+ nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, entityWriter.getLength());
+ entityWriter.flush();
+ }
+
} catch (IOException e) {
responseDone.completeExceptionally(e);
}
}
});
- ch.flush();
+ if (entityWriter.getType() != NettyEntityWriter.Type.DELAYED) {
+ entityWriter.flush();
+ }
} else {
// Send the HTTP request.
ch.writeAndFlush(nettyRequest);
}
- } catch (InterruptedException e) {
+ } catch (IOException | InterruptedException e) {
responseDone.completeExceptionally(e);
}
}
@@ -508,4 +524,11 @@
}
return headers;
}
+
+ private static HttpHeaders replaceHeaders(ClientRequest jerseyRequest, HttpHeaders headers) {
+ for (final Map.Entry<String, List<String>> e : jerseyRequest.getStringHeaders().entrySet()) {
+ headers.set(e.getKey(), e.getValue());
+ }
+ return headers;
+ }
}
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyEntityWriter.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyEntityWriter.java
new file mode 100644
index 0000000..a9e7040
--- /dev/null
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyEntityWriter.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.netty.connector.internal;
+
+import io.netty.channel.Channel;
+import io.netty.handler.stream.ChunkedInput;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientRequest;
+import org.glassfish.jersey.client.RequestEntityProcessing;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * The Entity Writer is used to write entity in Netty. One implementation is delayed,
+ * so that the complete message length can be set to Content-Length header.
+ */
+public interface NettyEntityWriter {
+
+ /**
+ * Type of the entity writer. {@code CHUNKED} is used for chunked data. {@code PRESET} is for buffered data, but the
+ * content length was pre-set by the customer. {@code DELAYED} is for buffered data where the content-length is unknown.
+ * The headers must not be written before the entity is provided by MessageBodyWriter to know the exact length.
+ */
+ enum Type {
+ CHUNKED,
+ PRESET,
+ DELAYED
+ }
+
+ /**
+ * Writes the Object to the channel
+ * @param object object to be written
+ */
+ void write(Object object);
+
+ /**
+ * Writes the Object to the channel and flush.
+ * @param object object to be written
+ */
+ void writeAndFlush(Object object);
+
+ /**
+ * Flushes the writen objects. Can throw IOException.
+ * @throws IOException
+ */
+ void flush() throws IOException;
+
+ /**
+ * Get the netty Chunked Input to be written.
+ * @return The Chunked input instance
+ */
+ ChunkedInput getChunkedInput();
+
+ /**
+ * Get the {@link OutputStream} used to write an entity
+ * @return the OutputStream to write an entity
+ */
+ OutputStream getOutputStream();
+
+ /**
+ * Get the length of the entity written to the {@link OutputStream}
+ * @return
+ */
+ long getLength();
+
+ /**
+ * Return Type of
+ * @return
+ */
+ Type getType();
+
+ static NettyEntityWriter getInstance(ClientRequest clientRequest, Channel channel) {
+ final long lengthLong = clientRequest.getLengthLong();
+ final RequestEntityProcessing entityProcessing = clientRequest.resolveProperty(
+ ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.class);
+
+ if ((entityProcessing == null && lengthLong == -1) || entityProcessing == RequestEntityProcessing.CHUNKED) {
+ return new DirectEntityWriter(channel, Type.CHUNKED);
+ } else if (lengthLong != -1) {
+ return new DirectEntityWriter(channel, Type.PRESET);
+ } else {
+ return new DelayedEntityWriter(channel, Type.DELAYED);
+ }
+ }
+
+ class DirectEntityWriter implements NettyEntityWriter {
+ private final Channel channel;
+ private final JerseyChunkedInput stream;
+ private final Type type;
+
+ public DirectEntityWriter(Channel channel, Type type) {
+ this.channel = channel;
+ stream = new JerseyChunkedInput(channel);
+ this.type = type;
+ }
+
+ @Override
+ public void write(Object object) {
+ channel.write(object);
+ }
+
+ @Override
+ public void writeAndFlush(Object object) {
+ channel.writeAndFlush(object);
+ }
+
+ @Override
+ public void flush() {
+ channel.flush();
+ }
+
+ @Override
+ public ChunkedInput getChunkedInput() {
+ return stream;
+ }
+
+ @Override
+ public OutputStream getOutputStream() {
+ return stream;
+ }
+
+ @Override
+ public long getLength() {
+ return stream.progress();
+ }
+
+ @Override
+ public Type getType() {
+ return type;
+ }
+ }
+
+ class DelayedEntityWriter implements NettyEntityWriter {
+ private final List<Runnable> delayedOps;
+ private final DirectEntityWriter writer;
+ private final DelayedOutputStream outputStream;
+
+ private boolean flushed = false;
+ private boolean closed = false;
+
+ public DelayedEntityWriter(Channel channel, Type type) {
+ this.writer = new DirectEntityWriter(channel, type);
+ this.delayedOps = new LinkedList<>();
+ this.outputStream = new DelayedOutputStream();
+ }
+
+
+ @Override
+ public void write(Object object) {
+ if (!flushed) {
+ delayedOps.add(() -> writer.write(object));
+ } else {
+ writer.write(object);
+ }
+ }
+
+ @Override
+ public void writeAndFlush(Object object) {
+ if (!flushed) {
+ delayedOps.add(() -> writer.writeAndFlush(object));
+ } else {
+ writer.writeAndFlush(object);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ _flush();
+ if (!closed) {
+ closed = true;
+ writer.getOutputStream().close(); // Jersey automatically closes DelayedOutputStream not this one!
+ }
+ writer.flush();
+ }
+
+ private void _flush() throws IOException {
+ if (!flushed) {
+ flushed = true;
+ for (Runnable runnable : delayedOps) {
+ runnable.run();
+ }
+
+ if (outputStream.b != null) {
+ writer.getOutputStream().write(outputStream.b, outputStream.off, outputStream.len);
+ }
+ }
+ }
+
+ @Override
+ public ChunkedInput getChunkedInput() {
+ return writer.getChunkedInput();
+ }
+
+ @Override
+ public OutputStream getOutputStream() {
+ return outputStream;
+ }
+
+
+ @Override
+ public long getLength() {
+ return outputStream.len - outputStream.off;
+ }
+
+ @Override
+ public Type getType() {
+ return writer.getType();
+ }
+
+ private class DelayedOutputStream extends OutputStream {
+ private byte[] b;
+ private int off;
+ private int len;
+
+ @Override
+ public void write(int b) throws IOException {
+ write(new byte[]{(byte) (b & 0xFF)}, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (!flushed && this.b == null) {
+ this.b = b;
+ this.off = off;
+ this.len = len;
+ } else {
+ DelayedEntityWriter.this._flush();
+ writer.getOutputStream().write(b, off, len);
+ }
+ }
+ }
+ }
+}
diff --git a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/BufferedTest.java b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/BufferedTest.java
new file mode 100644
index 0000000..fdaf5be
--- /dev/null
+++ b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/BufferedTest.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.netty.connector;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.RequestEntityProcessing;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.client.ClientRequestContext;
+import javax.ws.rs.client.ClientRequestFilter;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.WriterInterceptor;
+import javax.ws.rs.ext.WriterInterceptorContext;
+import java.io.IOException;
+
+public class BufferedTest extends JerseyTest {
+
+ private static String HEADER_1 = "First";
+ private static String HEADER_2 = "Second";
+ private static String HEADER_3 = "Third";
+ private static String ENTITY = "entity";
+
+
+ @Path("/buffered")
+ public static class BufferedTestResource {
+ @POST
+ public String post(@Context HttpHeaders headers, String entity) {
+ System.out.println("Remote");
+ String ret = headers.getHeaderString(HEADER_1)
+ + headers.getHeaderString(HEADER_2)
+ + headers.getHeaderString(HEADER_3)
+ + entity;
+ System.out.println(ret);
+ return ret;
+ }
+ }
+
+ @Override
+ protected Application configure() {
+ return new ResourceConfig(BufferedTestResource.class);
+ }
+
+ @Override
+ protected void configureClient(ClientConfig config) {
+ config.connectorProvider(new NettyConnectorProvider())
+ .property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.BUFFERED);
+ }
+
+ @Test
+ public void test() {
+ try (Response r = target("buffered")
+ .register(new ClientRequestFilter() {
+ @Override
+ public void filter(ClientRequestContext requestContext) throws IOException {
+ requestContext.setEntity(ENTITY);
+ requestContext.getHeaders().add(HEADER_2, HEADER_2);
+ }
+ })
+ .register(new WriterInterceptor() {
+ @Override
+ public void aroundWriteTo(WriterInterceptorContext context) throws IOException, WebApplicationException {
+ context.getHeaders().add(HEADER_3, HEADER_3);
+ context.proceed();
+ }
+ })
+ .request()
+ .header(HEADER_1, HEADER_1)
+ .post(Entity.entity("ENTITY", MediaType.TEXT_PLAIN_TYPE))) {
+ String response = r.readEntity(String.class);
+ Assertions.assertEquals(HEADER_1 + HEADER_2 + HEADER_3 + ENTITY, response);
+ }
+ }
+}
diff --git a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/BufferingTest.java b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/BufferingTest.java
index 18346e8..1256b1c 100644
--- a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/BufferingTest.java
+++ b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/BufferingTest.java
@@ -38,6 +38,7 @@
import org.glassfish.jersey.grizzly.connector.GrizzlyConnectorProvider;
import org.glassfish.jersey.jdk.connector.JdkConnectorProvider;
import org.glassfish.jersey.logging.LoggingFeature;
+import org.glassfish.jersey.netty.connector.NettyConnectorProvider;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
@@ -78,6 +79,7 @@
Arguments.of(new TestArguments(() -> new ApacheConnectorProvider(), RequestEntityProcessing.CHUNKED)),
Arguments.of(new TestArguments(() -> new Apache5ConnectorProvider(), RequestEntityProcessing.CHUNKED)),
Arguments.of(new TestArguments(() -> new GrizzlyConnectorProvider(), RequestEntityProcessing.CHUNKED)),
+ Arguments.of(new TestArguments(() -> new NettyConnectorProvider(), RequestEntityProcessing.CHUNKED)),
Arguments.of(new TestArguments(() -> new HttpUrlConnectorProvider(), RequestEntityProcessing.BUFFERED)),
Arguments.of(new TestArguments(() -> new JdkConnectorProvider(), RequestEntityProcessing.BUFFERED))
);