Add ability to configure the queue capacity for ChunkedOutput (#5621)

diff --git a/core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java b/core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java
index 79f98f9..100a14b 100644
--- a/core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java
+++ b/core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -27,12 +27,11 @@
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.inject.Provider;
 import javax.ws.rs.container.ConnectionCallback;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.ext.WriterInterceptor;
 
-import javax.inject.Provider;
-
 import org.glassfish.jersey.process.internal.RequestContext;
 import org.glassfish.jersey.process.internal.RequestScope;
 import org.glassfish.jersey.server.internal.LocalizationMessages;
@@ -51,7 +50,7 @@
 public class ChunkedOutput<T> extends GenericType<T> implements Closeable {
     private static final byte[] ZERO_LENGTH_DELIMITER = new byte[0];
 
-    private final BlockingDeque<T> queue = new LinkedBlockingDeque<>();
+    private final BlockingDeque<T> queue;
     private final byte[] chunkDelimiter;
     private final AtomicBoolean resumed = new AtomicBoolean(false);
     private final Object lock = new Object();
@@ -70,12 +69,59 @@
     private volatile ContainerResponse responseContext;
     private volatile ConnectionCallback connectionCallback;
 
-
     /**
      * Create new {@code ChunkedOutput}.
      */
     protected ChunkedOutput() {
         this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
+        queue = new LinkedBlockingDeque<>();
+    }
+
+    /**
+     * Create new {@code ChunkedOutput} based on builder.
+     *
+     * @param builder the builder to use
+     */
+    protected ChunkedOutput(Builder<T> builder) {
+        super();
+        if (builder.queueCapacity > 0) {
+            queue = new LinkedBlockingDeque<>(builder.queueCapacity);
+        } else {
+            queue = new LinkedBlockingDeque<>();
+        }
+        if (builder.chunkDelimiter != null) {
+            this.chunkDelimiter = new byte[builder.chunkDelimiter.length];
+            System.arraycopy(builder.chunkDelimiter, 0, this.chunkDelimiter, 0, builder.chunkDelimiter.length);
+        } else {
+            this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
+        }
+        if (builder.asyncContextProvider != null) {
+            this.asyncContext = builder.asyncContextProvider.get();
+        }
+    }
+
+    /**
+     * Create new {@code ChunkedOutput} based on builder.
+     *
+     * @param builder the builder to use
+     */
+    private ChunkedOutput(TypedBuilder<T> builder) {
+        super(builder.chunkType);
+
+        if (builder.queueCapacity > 0) {
+            queue = new LinkedBlockingDeque<>(builder.queueCapacity);
+        } else {
+            queue = new LinkedBlockingDeque<>();
+        }
+        if (builder.chunkDelimiter != null) {
+            this.chunkDelimiter = new byte[builder.chunkDelimiter.length];
+            System.arraycopy(builder.chunkDelimiter, 0, this.chunkDelimiter, 0, builder.chunkDelimiter.length);
+        } else {
+            this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
+        }
+        if (builder.asyncContextProvider != null) {
+            this.asyncContext = builder.asyncContextProvider.get();
+        }
     }
 
     /**
@@ -86,6 +132,7 @@
     public ChunkedOutput(final Type chunkType) {
         super(chunkType);
         this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
+        queue = new LinkedBlockingDeque<>();
     }
 
     /**
@@ -101,6 +148,7 @@
         } else {
             this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
         }
+        queue = new LinkedBlockingDeque<>();
     }
 
     /**
@@ -118,6 +166,7 @@
         }
 
         this.asyncContext = asyncContextProvider == null ? null : asyncContextProvider.get();
+        queue = new LinkedBlockingDeque<>();
     }
 
     /**
@@ -135,6 +184,7 @@
         } else {
             this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
         }
+        queue = new LinkedBlockingDeque<>();
     }
 
     /**
@@ -149,6 +199,7 @@
         } else {
             this.chunkDelimiter = chunkDelimiter.getBytes();
         }
+        queue = new LinkedBlockingDeque<>();
     }
 
     /**
@@ -165,6 +216,26 @@
         } else {
             this.chunkDelimiter = chunkDelimiter.getBytes();
         }
+        queue = new LinkedBlockingDeque<>();
+    }
+
+    /**
+     * Returns a builder to create a ChunkedOutput with custom configuration.
+     *
+     * @return builder
+     */
+    public static <T> Builder<T> builder() {
+        return new Builder<>();
+    }
+
+    /**
+     * Returns a builder to create a ChunkedOutput with custom configuration.
+     *
+     * @param chunkType      chunk type. Must not be {code null}.
+     * @return builder
+     */
+    public static <T> TypedBuilder<T> builder(Type chunkType) {
+        return new TypedBuilder<>(chunkType);
     }
 
     /**
@@ -179,7 +250,12 @@
         }
 
         if (chunk != null) {
-            queue.add(chunk);
+            try {
+                queue.put(chunk);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException(e);
+            }
         }
 
         flushQueue();
@@ -265,9 +341,9 @@
                             }
                             throw mpe;
                         } finally {
-                           synchronized (lock) {
-                               touchingEntityStream = false;
-                           }
+                            synchronized (lock) {
+                                touchingEntityStream = false;
+                            }
                         }
 
                         t = queue.poll();
@@ -341,7 +417,6 @@
 
     /**
      * Get state information.
-     *
      * Please note that {@code ChunkedOutput} can be closed by the client side - client can close connection
      * from its side.
      *
@@ -353,10 +428,12 @@
 
     /**
      * Executed only in case of close being triggered by client.
+     *
      * @param e Exception causing the close
      */
-    protected void onClose(Exception e){
-
+    protected void onClose(Exception e) {
+        // drain queue when an exception occurs to prevent deadlocks
+        queue.clear();
     }
 
     @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
@@ -399,4 +476,78 @@
         this.connectionCallback = connectionCallbackRunner;
         flushQueue();
     }
+
+    /**
+     * Builder that allows to create a new ChunkedOutput based on the given configuration options.
+     *
+     * @param <Y>
+     */
+    public static class Builder<Y> {
+        byte[] chunkDelimiter;
+        int queueCapacity = -1;
+        Provider<AsyncContext> asyncContextProvider;
+
+        private Builder() {
+            // hide constructor
+        }
+
+        /**
+         * Set the chunk delimiter, in bytes.
+         * @param chunkDelimiter the chunk delimiter in bytes
+         * @return builder
+         */
+        public Builder<Y> chunkDelimiter(byte[] chunkDelimiter) {
+            this.chunkDelimiter = chunkDelimiter;
+            return this;
+        }
+
+        /**
+         * Set the queue capacity. If greater than 0, the queue is bounded and will block when full.
+         * @param queueCapacity the queue capacity
+         * @return builder
+         */
+        public Builder<Y> queueCapacity(int queueCapacity) {
+            this.queueCapacity = queueCapacity;
+            return this;
+        }
+
+        /**
+         * Set the async context provider.
+         * @param asyncContextProvider the async context provider
+         * @return builder
+         */
+        public Builder<Y> asyncContextProvider(Provider<AsyncContext> asyncContextProvider) {
+            this.asyncContextProvider = asyncContextProvider;
+            return this;
+        }
+
+        /**
+         * Build the ChunkedOutput based on the given configuration.
+         * @return the ChunkedOutput
+         */
+        public ChunkedOutput<Y> build() {
+            return new ChunkedOutput<>(this);
+        }
+    }
+
+    /**
+     * Builder that allows to create a new ChunkedOutput based on the given configuration options.
+     *
+     * @param <Y>
+     */
+    public static class TypedBuilder<Y> extends Builder<Y> {
+        private Type chunkType;
+
+        private TypedBuilder(Type chunkType) {
+            this.chunkType = chunkType;
+        }
+
+        /**
+         * Build the ChunkedOutput based on the given configuration.
+         * @return the ChunkedOutput
+         */
+        public ChunkedOutput<Y> build() {
+            return new ChunkedOutput<>(this);
+        }
+    }
 }
diff --git a/docs/src/main/docbook/async.xml b/docs/src/main/docbook/async.xml
index f496bab..f39b76b 100644
--- a/docs/src/main/docbook/async.xml
+++ b/docs/src/main/docbook/async.xml
@@ -1,7 +1,7 @@
 <?xml version="1.0"?>
 <!--
 
-    Copyright (c) 2012, 2021 Oracle and/or its affiliates. All rights reserved.
+    Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
 
     This program and the accompanying materials are made available under the
     terms of the Eclipse Public License v. 2.0, which is available at
@@ -270,6 +270,9 @@
     public ChunkedOutput<String> getChunkedResponse() {
         final ChunkedOutput<String> output = new ChunkedOutput<String>(String.class);
 
+        // Or use the builder pattern instead, which also allows to configure the queue capacity
+        // final ChunkedOutput<String> output = ChunkedOutput.<String>builder(String.class).queueCapacity(10).build();
+
         new Thread() {
             public void run() {
                 try {
diff --git a/tests/e2e-server/src/test/java/org/glassfish/jersey/tests/e2e/server/ChunkedInputOutputTest.java b/tests/e2e-server/src/test/java/org/glassfish/jersey/tests/e2e/server/ChunkedInputOutputTest.java
index 69e75a6..8a5a3e5 100644
--- a/tests/e2e-server/src/test/java/org/glassfish/jersey/tests/e2e/server/ChunkedInputOutputTest.java
+++ b/tests/e2e-server/src/test/java/org/glassfish/jersey/tests/e2e/server/ChunkedInputOutputTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2022 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -61,13 +61,33 @@
     @Path("/test")
     public static class TestResource {
         /**
+         * Get chunk stream with a queue capacity of 2.
+         *
+         * @return chunk stream.
+         */
+        @GET
+        @Path("/testWithBuilder")
+        public ChunkedOutput<String> getWithBuilder() {
+            return getOutput(ChunkedOutput.<String>builder(String.class).queueCapacity(2)
+                             .chunkDelimiter("\r\n".getBytes()).build());
+        }
+
+        /**
          * Get chunk stream.
          *
          * @return chunk stream.
          */
         @GET
         public ChunkedOutput<String> get() {
-            final ChunkedOutput<String> output = new ChunkedOutput<>(String.class, "\r\n");
+            return getOutput(new ChunkedOutput<>(String.class, "\r\n"));
+        }
+
+        /**
+         * Get chunk stream.
+         *
+         * @return chunk stream.
+         */
+        private ChunkedOutput<String> getOutput(ChunkedOutput<String> output) {
 
             new Thread() {
                 @Override
@@ -183,6 +203,19 @@
     }
 
     /**
+     * Test retrieving chunked response stream as a single response string, when a builder with capacity is used.
+     *
+     * @throws Exception in case of a failure during the test execution.
+     */
+    @Test
+    public void testChunkedOutputToSingleStringWithBuilder() throws Exception {
+        final String response = target().path("test/testWithBuilder").request().get(String.class);
+
+        assertEquals("test\r\ntest\r\ntest\r\n", response,
+                "Unexpected value of chunked response unmarshalled as a single string.");
+    }
+
+    /**
      * Test retrieving chunked response stream sequentially as individual chunks using chunked input.
      *
      * @throws Exception in case of a failure during the test execution.