Support NettyConnector & RequestEntityProcessing.BUFFERED Signed-off-by: jansupol <jan.supol@oracle.com>
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java index 9406b51..a2c675a 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java
@@ -84,7 +84,7 @@ import org.glassfish.jersey.client.spi.AsyncConnectorCallback; import org.glassfish.jersey.client.spi.Connector; import org.glassfish.jersey.message.internal.OutboundMessageContext; -import org.glassfish.jersey.netty.connector.internal.JerseyChunkedInput; +import org.glassfish.jersey.netty.connector.internal.NettyEntityWriter; /** * Netty connector implementation. @@ -391,27 +391,34 @@ } }; ch.closeFuture().addListener(closeListener); - if (jerseyRequest.getLengthLong() == -1) { - HttpUtil.setTransferEncodingChunked(nettyRequest, true); - } else { - nettyRequest.headers().add(HttpHeaderNames.CONTENT_LENGTH, jerseyRequest.getLengthLong()); + + final NettyEntityWriter entityWriter = NettyEntityWriter.getInstance(jerseyRequest, ch); + switch (entityWriter.getType()) { + case CHUNKED: + HttpUtil.setTransferEncodingChunked(nettyRequest, true); + break; + case PRESET: + nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, jerseyRequest.getLengthLong()); + break; +// case DELAYED: +// // Set later after the entity is "written" +// break; } // Send the HTTP request. - ch.writeAndFlush(nettyRequest); + entityWriter.writeAndFlush(nettyRequest); - final JerseyChunkedInput jerseyChunkedInput = new JerseyChunkedInput(ch); jerseyRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() { @Override public OutputStream getOutputStream(int contentLength) throws IOException { - return jerseyChunkedInput; + return entityWriter.getOutputStream(); } }); if (HttpUtil.isTransferEncodingChunked(nettyRequest)) { - ch.write(new HttpChunkedInput(jerseyChunkedInput)); + entityWriter.write(new HttpChunkedInput(entityWriter.getChunkedInput())); } else { - ch.write(jerseyChunkedInput); + entityWriter.write(entityWriter.getChunkedInput()); } executorService.execute(new Runnable() { @@ -422,19 +429,28 @@ try { jerseyRequest.writeEntity(); + + if (entityWriter.getType() == NettyEntityWriter.Type.DELAYED) { + replaceHeaders(jerseyRequest, nettyRequest.headers()); // WriterInterceptor changes + nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, entityWriter.getLength()); + entityWriter.flush(); + } + } catch (IOException e) { responseDone.completeExceptionally(e); } } }); - ch.flush(); + if (entityWriter.getType() != NettyEntityWriter.Type.DELAYED) { + entityWriter.flush(); + } } else { // Send the HTTP request. ch.writeAndFlush(nettyRequest); } - } catch (InterruptedException e) { + } catch (IOException | InterruptedException e) { responseDone.completeExceptionally(e); } } @@ -508,4 +524,11 @@ } return headers; } + + private static HttpHeaders replaceHeaders(ClientRequest jerseyRequest, HttpHeaders headers) { + for (final Map.Entry<String, List<String>> e : jerseyRequest.getStringHeaders().entrySet()) { + headers.set(e.getKey(), e.getValue()); + } + return headers; + } }
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyEntityWriter.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyEntityWriter.java new file mode 100644 index 0000000..a9e7040 --- /dev/null +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyEntityWriter.java
@@ -0,0 +1,255 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.netty.connector.internal; + +import io.netty.channel.Channel; +import io.netty.handler.stream.ChunkedInput; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.client.ClientRequest; +import org.glassfish.jersey.client.RequestEntityProcessing; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.LinkedList; +import java.util.List; + +/** + * The Entity Writer is used to write entity in Netty. One implementation is delayed, + * so that the complete message length can be set to Content-Length header. + */ +public interface NettyEntityWriter { + + /** + * Type of the entity writer. {@code CHUNKED} is used for chunked data. {@code PRESET} is for buffered data, but the + * content length was pre-set by the customer. {@code DELAYED} is for buffered data where the content-length is unknown. + * The headers must not be written before the entity is provided by MessageBodyWriter to know the exact length. + */ + enum Type { + CHUNKED, + PRESET, + DELAYED + } + + /** + * Writes the Object to the channel + * @param object object to be written + */ + void write(Object object); + + /** + * Writes the Object to the channel and flush. + * @param object object to be written + */ + void writeAndFlush(Object object); + + /** + * Flushes the writen objects. Can throw IOException. + * @throws IOException + */ + void flush() throws IOException; + + /** + * Get the netty Chunked Input to be written. + * @return The Chunked input instance + */ + ChunkedInput getChunkedInput(); + + /** + * Get the {@link OutputStream} used to write an entity + * @return the OutputStream to write an entity + */ + OutputStream getOutputStream(); + + /** + * Get the length of the entity written to the {@link OutputStream} + * @return + */ + long getLength(); + + /** + * Return Type of + * @return + */ + Type getType(); + + static NettyEntityWriter getInstance(ClientRequest clientRequest, Channel channel) { + final long lengthLong = clientRequest.getLengthLong(); + final RequestEntityProcessing entityProcessing = clientRequest.resolveProperty( + ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.class); + + if ((entityProcessing == null && lengthLong == -1) || entityProcessing == RequestEntityProcessing.CHUNKED) { + return new DirectEntityWriter(channel, Type.CHUNKED); + } else if (lengthLong != -1) { + return new DirectEntityWriter(channel, Type.PRESET); + } else { + return new DelayedEntityWriter(channel, Type.DELAYED); + } + } + + class DirectEntityWriter implements NettyEntityWriter { + private final Channel channel; + private final JerseyChunkedInput stream; + private final Type type; + + public DirectEntityWriter(Channel channel, Type type) { + this.channel = channel; + stream = new JerseyChunkedInput(channel); + this.type = type; + } + + @Override + public void write(Object object) { + channel.write(object); + } + + @Override + public void writeAndFlush(Object object) { + channel.writeAndFlush(object); + } + + @Override + public void flush() { + channel.flush(); + } + + @Override + public ChunkedInput getChunkedInput() { + return stream; + } + + @Override + public OutputStream getOutputStream() { + return stream; + } + + @Override + public long getLength() { + return stream.progress(); + } + + @Override + public Type getType() { + return type; + } + } + + class DelayedEntityWriter implements NettyEntityWriter { + private final List<Runnable> delayedOps; + private final DirectEntityWriter writer; + private final DelayedOutputStream outputStream; + + private boolean flushed = false; + private boolean closed = false; + + public DelayedEntityWriter(Channel channel, Type type) { + this.writer = new DirectEntityWriter(channel, type); + this.delayedOps = new LinkedList<>(); + this.outputStream = new DelayedOutputStream(); + } + + + @Override + public void write(Object object) { + if (!flushed) { + delayedOps.add(() -> writer.write(object)); + } else { + writer.write(object); + } + } + + @Override + public void writeAndFlush(Object object) { + if (!flushed) { + delayedOps.add(() -> writer.writeAndFlush(object)); + } else { + writer.writeAndFlush(object); + } + } + + @Override + public void flush() throws IOException { + _flush(); + if (!closed) { + closed = true; + writer.getOutputStream().close(); // Jersey automatically closes DelayedOutputStream not this one! + } + writer.flush(); + } + + private void _flush() throws IOException { + if (!flushed) { + flushed = true; + for (Runnable runnable : delayedOps) { + runnable.run(); + } + + if (outputStream.b != null) { + writer.getOutputStream().write(outputStream.b, outputStream.off, outputStream.len); + } + } + } + + @Override + public ChunkedInput getChunkedInput() { + return writer.getChunkedInput(); + } + + @Override + public OutputStream getOutputStream() { + return outputStream; + } + + + @Override + public long getLength() { + return outputStream.len - outputStream.off; + } + + @Override + public Type getType() { + return writer.getType(); + } + + private class DelayedOutputStream extends OutputStream { + private byte[] b; + private int off; + private int len; + + @Override + public void write(int b) throws IOException { + write(new byte[]{(byte) (b & 0xFF)}, 0, 1); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (!flushed && this.b == null) { + this.b = b; + this.off = off; + this.len = len; + } else { + DelayedEntityWriter.this._flush(); + writer.getOutputStream().write(b, off, len); + } + } + } + } +}
diff --git a/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/BufferedTest.java b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/BufferedTest.java new file mode 100644 index 0000000..fdaf5be --- /dev/null +++ b/connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/BufferedTest.java
@@ -0,0 +1,99 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.netty.connector; + +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.client.RequestEntityProcessing; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.client.ClientRequestContext; +import javax.ws.rs.client.ClientRequestFilter; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.WriterInterceptor; +import javax.ws.rs.ext.WriterInterceptorContext; +import java.io.IOException; + +public class BufferedTest extends JerseyTest { + + private static String HEADER_1 = "First"; + private static String HEADER_2 = "Second"; + private static String HEADER_3 = "Third"; + private static String ENTITY = "entity"; + + + @Path("/buffered") + public static class BufferedTestResource { + @POST + public String post(@Context HttpHeaders headers, String entity) { + System.out.println("Remote"); + String ret = headers.getHeaderString(HEADER_1) + + headers.getHeaderString(HEADER_2) + + headers.getHeaderString(HEADER_3) + + entity; + System.out.println(ret); + return ret; + } + } + + @Override + protected Application configure() { + return new ResourceConfig(BufferedTestResource.class); + } + + @Override + protected void configureClient(ClientConfig config) { + config.connectorProvider(new NettyConnectorProvider()) + .property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.BUFFERED); + } + + @Test + public void test() { + try (Response r = target("buffered") + .register(new ClientRequestFilter() { + @Override + public void filter(ClientRequestContext requestContext) throws IOException { + requestContext.setEntity(ENTITY); + requestContext.getHeaders().add(HEADER_2, HEADER_2); + } + }) + .register(new WriterInterceptor() { + @Override + public void aroundWriteTo(WriterInterceptorContext context) throws IOException, WebApplicationException { + context.getHeaders().add(HEADER_3, HEADER_3); + context.proceed(); + } + }) + .request() + .header(HEADER_1, HEADER_1) + .post(Entity.entity("ENTITY", MediaType.TEXT_PLAIN_TYPE))) { + String response = r.readEntity(String.class); + Assertions.assertEquals(HEADER_1 + HEADER_2 + HEADER_3 + ENTITY, response); + } + } +}
diff --git a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/BufferingTest.java b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/BufferingTest.java index 18346e8..1256b1c 100644 --- a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/BufferingTest.java +++ b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/BufferingTest.java
@@ -38,6 +38,7 @@ import org.glassfish.jersey.grizzly.connector.GrizzlyConnectorProvider; import org.glassfish.jersey.jdk.connector.JdkConnectorProvider; import org.glassfish.jersey.logging.LoggingFeature; +import org.glassfish.jersey.netty.connector.NettyConnectorProvider; import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.test.JerseyTest; @@ -78,6 +79,7 @@ Arguments.of(new TestArguments(() -> new ApacheConnectorProvider(), RequestEntityProcessing.CHUNKED)), Arguments.of(new TestArguments(() -> new Apache5ConnectorProvider(), RequestEntityProcessing.CHUNKED)), Arguments.of(new TestArguments(() -> new GrizzlyConnectorProvider(), RequestEntityProcessing.CHUNKED)), + Arguments.of(new TestArguments(() -> new NettyConnectorProvider(), RequestEntityProcessing.CHUNKED)), Arguments.of(new TestArguments(() -> new HttpUrlConnectorProvider(), RequestEntityProcessing.BUFFERED)), Arguments.of(new TestArguments(() -> new JdkConnectorProvider(), RequestEntityProcessing.BUFFERED)) );