Change JettyConnector 'readTimeout' behavior to match socket read tim… (#5114)

* Change JettyConnector 'readTimeout' behavior to match socket read timeout definition - e.g., ApacheConnector behavior matches it.

* Read timeout: Time on waiting to receive the first data byte.

The `timeout` method timeouts the request even if data were already received, capping the query to a maximum execution time.
This behavior is problematic when streaming data over a prolonged duration; the client has already received data bytes, but data continues to flow.

I provided a jetty specific property (jersey.config.jetty.client.totalTimeout) that configures the 'totalTimeout' when required.
diff --git a/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyClientProperties.java b/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyClientProperties.java
index ef2c8be..aa7a77b 100644
--- a/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyClientProperties.java
+++ b/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyClientProperties.java
@@ -86,6 +86,21 @@
         "jersey.config.jetty.client.syncListenerResponseMaxSize";
 
     /**
+     * Total timeout interval, in milliseconds.
+     * <p>
+     * The value MUST be an instance convertible to {@link java.lang.Integer}. A
+     * value of zero (0) is equivalent to an interval of infinity.
+     * </p>
+     * <p>
+     * The default value is infinity (0).
+     * </p>
+     * <p>
+     * The name of the configuration property is <tt>{@value}</tt>.
+     * </p>
+     */
+    public static final String TOTAL_TIMEOUT = "jersey.config.jetty.client.totalTimeout";
+
+    /**
      * Get the value of the specified property.
      *
      * If the property is not set or the real value type is not compatible with the specified value type, returns {@code null}.
diff --git a/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyConnector.java b/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyConnector.java
index 89a0c05..111022d 100644
--- a/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyConnector.java
+++ b/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyConnector.java
@@ -314,8 +314,14 @@
         request.followRedirects(clientRequest.resolveProperty(ClientProperties.FOLLOW_REDIRECTS, true));
         final Object readTimeout = clientRequest.resolveProperty(ClientProperties.READ_TIMEOUT, -1);
         if (readTimeout != null && readTimeout instanceof Integer && (Integer) readTimeout > 0) {
-            request.timeout((Integer) readTimeout, TimeUnit.MILLISECONDS);
+            request.idleTimeout((Integer) readTimeout, TimeUnit.MILLISECONDS);
         }
+
+        final Object totalTimeout = clientRequest.resolveProperty(JettyClientProperties.TOTAL_TIMEOUT, -1);
+        if (totalTimeout != null && totalTimeout instanceof Integer && (Integer) totalTimeout > 0) {
+            request.timeout((Integer) totalTimeout, TimeUnit.MILLISECONDS);
+        }
+
         return request;
     }
 
diff --git a/connectors/jetty-connector/src/test/java/org/glassfish/jersey/jetty/connector/TimeoutTest.java b/connectors/jetty-connector/src/test/java/org/glassfish/jersey/jetty/connector/TimeoutTest.java
index 21469f4..48c5bed 100644
--- a/connectors/jetty-connector/src/test/java/org/glassfish/jersey/jetty/connector/TimeoutTest.java
+++ b/connectors/jetty-connector/src/test/java/org/glassfish/jersey/jetty/connector/TimeoutTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013, 2020 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2022 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -16,30 +16,37 @@
 
 package org.glassfish.jersey.jetty.connector;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.logging.Logger;
 
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.ProcessingException;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.Application;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
 
+import org.glassfish.jersey.CommonProperties;
 import org.glassfish.jersey.client.ClientConfig;
 import org.glassfish.jersey.client.ClientProperties;
 import org.glassfish.jersey.logging.LoggingFeature;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.test.JerseyTest;
-
 import org.junit.Test;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
 
 /**
  * @author Martin Matula
@@ -65,6 +72,48 @@
             }
             return "GET";
         }
+
+        /**
+         * Long-running streaming request
+         *
+         * @param count number of packets send
+         * @param pauseMillis pause between each packets
+         */
+        @GET
+        @Path("stream")
+        public Response streamsWithDelay(@QueryParam("start") @DefaultValue("0") int startMillis, @QueryParam("count") int count,
+                @QueryParam("pauseMillis") int pauseMillis) {
+            StreamingOutput streamingOutput = streamSlowly(startMillis, count, pauseMillis);
+
+            return Response.ok(streamingOutput)
+                    .build();
+        }
+    }
+
+    private static StreamingOutput streamSlowly(int startMillis, int count, int pauseMillis) {
+
+        return output -> {
+            try {
+                TimeUnit.MILLISECONDS.sleep(startMillis);
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+            output.write("begin\n".getBytes(StandardCharsets.UTF_8));
+            output.flush();
+            for (int i = 0; i < count; i++) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(pauseMillis);
+                }
+                catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+
+                output.write(("message " + i + "\n").getBytes(StandardCharsets.UTF_8));
+                output.flush();
+            }
+            output.write("end".getBytes(StandardCharsets.UTF_8));
+        };
     }
 
     @Override
@@ -121,4 +170,74 @@
             c.close();
         }
     }
+
+    /**
+     * Test accessing an operation that is streaming slowly
+     *
+     * @throws ProcessingException in case of a test error.
+     */
+    @Test
+    public void testSlowlyStreamedContentDoesNotReadTimeout() throws Exception {
+
+        int count = 5;
+        int pauseMillis = 50;
+
+        final Response response = target("test")
+                .property(ClientProperties.READ_TIMEOUT, 100L)
+                .property(CommonProperties.OUTBOUND_CONTENT_LENGTH_BUFFER_SERVER, "-1")
+                .path("stream")
+                .queryParam("count", count)
+                .queryParam("pauseMillis", pauseMillis)
+                .request().get();
+
+        assertTrue(response.readEntity(String.class).contains("end"));
+    }
+
+    @Test
+    public void testSlowlyStreamedContentDoesTotalTimeout() throws Exception {
+
+        int count = 5;
+        int pauseMillis = 50;
+
+        try {
+        target("test")
+                .property(JettyClientProperties.TOTAL_TIMEOUT, 100L)
+                .property(CommonProperties.OUTBOUND_CONTENT_LENGTH_BUFFER_SERVER, "-1")
+                .path("stream")
+                .queryParam("count", count)
+                .queryParam("pauseMillis", pauseMillis)
+                .request().get();
+
+            fail("This operation should trigger total timeout");
+        } catch (ProcessingException e) {
+            assertEquals(TimeoutException.class, e.getCause().getClass());
+        }
+    }
+
+    /**
+     * Test accessing an operation that is streaming slowly
+     *
+     * @throws ProcessingException in case of a test error.
+     */
+    @Test
+    public void testSlowToStartStreamedContentDoesReadTimeout() throws Exception {
+
+        int start = 150;
+        int count = 5;
+        int pauseMillis = 50;
+
+        try {
+            target("test")
+                    .property(ClientProperties.READ_TIMEOUT, 100L)
+                    .property(CommonProperties.OUTBOUND_CONTENT_LENGTH_BUFFER_SERVER, "-1")
+                    .path("stream")
+                    .queryParam("start", start)
+                    .queryParam("count", count)
+                    .queryParam("pauseMillis", pauseMillis)
+                    .request().get();
+            fail("This operation should trigger idle timeout");
+        } catch (ProcessingException e) {
+            assertEquals(TimeoutException.class, e.getCause().getClass());
+        }
+    }
 }