Add ability to configure the queue capacity for ChunkedOutput (#5621)
diff --git a/core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java b/core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java index 79f98f9..100a14b 100644 --- a/core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java +++ b/core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java
@@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2023 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -27,12 +27,11 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; +import javax.inject.Provider; import javax.ws.rs.container.ConnectionCallback; import javax.ws.rs.core.GenericType; import javax.ws.rs.ext.WriterInterceptor; -import javax.inject.Provider; - import org.glassfish.jersey.process.internal.RequestContext; import org.glassfish.jersey.process.internal.RequestScope; import org.glassfish.jersey.server.internal.LocalizationMessages; @@ -51,7 +50,7 @@ public class ChunkedOutput<T> extends GenericType<T> implements Closeable { private static final byte[] ZERO_LENGTH_DELIMITER = new byte[0]; - private final BlockingDeque<T> queue = new LinkedBlockingDeque<>(); + private final BlockingDeque<T> queue; private final byte[] chunkDelimiter; private final AtomicBoolean resumed = new AtomicBoolean(false); private final Object lock = new Object(); @@ -70,12 +69,59 @@ private volatile ContainerResponse responseContext; private volatile ConnectionCallback connectionCallback; - /** * Create new {@code ChunkedOutput}. */ protected ChunkedOutput() { this.chunkDelimiter = ZERO_LENGTH_DELIMITER; + queue = new LinkedBlockingDeque<>(); + } + + /** + * Create new {@code ChunkedOutput} based on builder. + * + * @param builder the builder to use + */ + protected ChunkedOutput(Builder<T> builder) { + super(); + if (builder.queueCapacity > 0) { + queue = new LinkedBlockingDeque<>(builder.queueCapacity); + } else { + queue = new LinkedBlockingDeque<>(); + } + if (builder.chunkDelimiter != null) { + this.chunkDelimiter = new byte[builder.chunkDelimiter.length]; + System.arraycopy(builder.chunkDelimiter, 0, this.chunkDelimiter, 0, builder.chunkDelimiter.length); + } else { + this.chunkDelimiter = ZERO_LENGTH_DELIMITER; + } + if (builder.asyncContextProvider != null) { + this.asyncContext = builder.asyncContextProvider.get(); + } + } + + /** + * Create new {@code ChunkedOutput} based on builder. + * + * @param builder the builder to use + */ + private ChunkedOutput(TypedBuilder<T> builder) { + super(builder.chunkType); + + if (builder.queueCapacity > 0) { + queue = new LinkedBlockingDeque<>(builder.queueCapacity); + } else { + queue = new LinkedBlockingDeque<>(); + } + if (builder.chunkDelimiter != null) { + this.chunkDelimiter = new byte[builder.chunkDelimiter.length]; + System.arraycopy(builder.chunkDelimiter, 0, this.chunkDelimiter, 0, builder.chunkDelimiter.length); + } else { + this.chunkDelimiter = ZERO_LENGTH_DELIMITER; + } + if (builder.asyncContextProvider != null) { + this.asyncContext = builder.asyncContextProvider.get(); + } } /** @@ -86,6 +132,7 @@ public ChunkedOutput(final Type chunkType) { super(chunkType); this.chunkDelimiter = ZERO_LENGTH_DELIMITER; + queue = new LinkedBlockingDeque<>(); } /** @@ -101,6 +148,7 @@ } else { this.chunkDelimiter = ZERO_LENGTH_DELIMITER; } + queue = new LinkedBlockingDeque<>(); } /** @@ -118,6 +166,7 @@ } this.asyncContext = asyncContextProvider == null ? null : asyncContextProvider.get(); + queue = new LinkedBlockingDeque<>(); } /** @@ -135,6 +184,7 @@ } else { this.chunkDelimiter = ZERO_LENGTH_DELIMITER; } + queue = new LinkedBlockingDeque<>(); } /** @@ -149,6 +199,7 @@ } else { this.chunkDelimiter = chunkDelimiter.getBytes(); } + queue = new LinkedBlockingDeque<>(); } /** @@ -165,6 +216,26 @@ } else { this.chunkDelimiter = chunkDelimiter.getBytes(); } + queue = new LinkedBlockingDeque<>(); + } + + /** + * Returns a builder to create a ChunkedOutput with custom configuration. + * + * @return builder + */ + public static <T> Builder<T> builder() { + return new Builder<>(); + } + + /** + * Returns a builder to create a ChunkedOutput with custom configuration. + * + * @param chunkType chunk type. Must not be {code null}. + * @return builder + */ + public static <T> TypedBuilder<T> builder(Type chunkType) { + return new TypedBuilder<>(chunkType); } /** @@ -179,7 +250,12 @@ } if (chunk != null) { - queue.add(chunk); + try { + queue.put(chunk); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } } flushQueue(); @@ -265,9 +341,9 @@ } throw mpe; } finally { - synchronized (lock) { - touchingEntityStream = false; - } + synchronized (lock) { + touchingEntityStream = false; + } } t = queue.poll(); @@ -341,7 +417,6 @@ /** * Get state information. - * * Please note that {@code ChunkedOutput} can be closed by the client side - client can close connection * from its side. * @@ -353,10 +428,12 @@ /** * Executed only in case of close being triggered by client. + * * @param e Exception causing the close */ - protected void onClose(Exception e){ - + protected void onClose(Exception e) { + // drain queue when an exception occurs to prevent deadlocks + queue.clear(); } @SuppressWarnings("EqualsWhichDoesntCheckParameterClass") @@ -399,4 +476,78 @@ this.connectionCallback = connectionCallbackRunner; flushQueue(); } + + /** + * Builder that allows to create a new ChunkedOutput based on the given configuration options. + * + * @param <Y> + */ + public static class Builder<Y> { + byte[] chunkDelimiter; + int queueCapacity = -1; + Provider<AsyncContext> asyncContextProvider; + + private Builder() { + // hide constructor + } + + /** + * Set the chunk delimiter, in bytes. + * @param chunkDelimiter the chunk delimiter in bytes + * @return builder + */ + public Builder<Y> chunkDelimiter(byte[] chunkDelimiter) { + this.chunkDelimiter = chunkDelimiter; + return this; + } + + /** + * Set the queue capacity. If greater than 0, the queue is bounded and will block when full. + * @param queueCapacity the queue capacity + * @return builder + */ + public Builder<Y> queueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; + return this; + } + + /** + * Set the async context provider. + * @param asyncContextProvider the async context provider + * @return builder + */ + public Builder<Y> asyncContextProvider(Provider<AsyncContext> asyncContextProvider) { + this.asyncContextProvider = asyncContextProvider; + return this; + } + + /** + * Build the ChunkedOutput based on the given configuration. + * @return the ChunkedOutput + */ + public ChunkedOutput<Y> build() { + return new ChunkedOutput<>(this); + } + } + + /** + * Builder that allows to create a new ChunkedOutput based on the given configuration options. + * + * @param <Y> + */ + public static class TypedBuilder<Y> extends Builder<Y> { + private Type chunkType; + + private TypedBuilder(Type chunkType) { + this.chunkType = chunkType; + } + + /** + * Build the ChunkedOutput based on the given configuration. + * @return the ChunkedOutput + */ + public ChunkedOutput<Y> build() { + return new ChunkedOutput<>(this); + } + } }
diff --git a/docs/src/main/docbook/async.xml b/docs/src/main/docbook/async.xml index f496bab..f39b76b 100644 --- a/docs/src/main/docbook/async.xml +++ b/docs/src/main/docbook/async.xml
@@ -1,7 +1,7 @@ <?xml version="1.0"?> <!-- - Copyright (c) 2012, 2021 Oracle and/or its affiliates. All rights reserved. + Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved. This program and the accompanying materials are made available under the terms of the Eclipse Public License v. 2.0, which is available at @@ -270,6 +270,9 @@ public ChunkedOutput<String> getChunkedResponse() { final ChunkedOutput<String> output = new ChunkedOutput<String>(String.class); + // Or use the builder pattern instead, which also allows to configure the queue capacity + // final ChunkedOutput<String> output = ChunkedOutput.<String>builder(String.class).queueCapacity(10).build(); + new Thread() { public void run() { try {
diff --git a/tests/e2e-server/src/test/java/org/glassfish/jersey/tests/e2e/server/ChunkedInputOutputTest.java b/tests/e2e-server/src/test/java/org/glassfish/jersey/tests/e2e/server/ChunkedInputOutputTest.java index 69e75a6..8a5a3e5 100644 --- a/tests/e2e-server/src/test/java/org/glassfish/jersey/tests/e2e/server/ChunkedInputOutputTest.java +++ b/tests/e2e-server/src/test/java/org/glassfish/jersey/tests/e2e/server/ChunkedInputOutputTest.java
@@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2022 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -61,13 +61,33 @@ @Path("/test") public static class TestResource { /** + * Get chunk stream with a queue capacity of 2. + * + * @return chunk stream. + */ + @GET + @Path("/testWithBuilder") + public ChunkedOutput<String> getWithBuilder() { + return getOutput(ChunkedOutput.<String>builder(String.class).queueCapacity(2) + .chunkDelimiter("\r\n".getBytes()).build()); + } + + /** * Get chunk stream. * * @return chunk stream. */ @GET public ChunkedOutput<String> get() { - final ChunkedOutput<String> output = new ChunkedOutput<>(String.class, "\r\n"); + return getOutput(new ChunkedOutput<>(String.class, "\r\n")); + } + + /** + * Get chunk stream. + * + * @return chunk stream. + */ + private ChunkedOutput<String> getOutput(ChunkedOutput<String> output) { new Thread() { @Override @@ -183,6 +203,19 @@ } /** + * Test retrieving chunked response stream as a single response string, when a builder with capacity is used. + * + * @throws Exception in case of a failure during the test execution. + */ + @Test + public void testChunkedOutputToSingleStringWithBuilder() throws Exception { + final String response = target().path("test/testWithBuilder").request().get(String.class); + + assertEquals("test\r\ntest\r\ntest\r\n", response, + "Unexpected value of chunked response unmarshalled as a single string."); + } + + /** * Test retrieving chunked response stream sequentially as individual chunks using chunked input. * * @throws Exception in case of a failure during the test execution.