Change JettyConnector 'readTimeout' behavior to match socket read tim… (#5114)
* Change JettyConnector 'readTimeout' behavior to match socket read timeout definition - e.g., ApacheConnector behavior matches it.
* Read timeout: Time on waiting to receive the first data byte.
The `timeout` method timeouts the request even if data were already received, capping the query to a maximum execution time.
This behavior is problematic when streaming data over a prolonged duration; the client has already received data bytes, but data continues to flow.
I provided a jetty specific property (jersey.config.jetty.client.totalTimeout) that configures the 'totalTimeout' when required.
diff --git a/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyClientProperties.java b/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyClientProperties.java
index ef2c8be..aa7a77b 100644
--- a/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyClientProperties.java
+++ b/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyClientProperties.java
@@ -86,6 +86,21 @@
"jersey.config.jetty.client.syncListenerResponseMaxSize";
/**
+ * Total timeout interval, in milliseconds.
+ * <p>
+ * The value MUST be an instance convertible to {@link java.lang.Integer}. A
+ * value of zero (0) is equivalent to an interval of infinity.
+ * </p>
+ * <p>
+ * The default value is infinity (0).
+ * </p>
+ * <p>
+ * The name of the configuration property is <tt>{@value}</tt>.
+ * </p>
+ */
+ public static final String TOTAL_TIMEOUT = "jersey.config.jetty.client.totalTimeout";
+
+ /**
* Get the value of the specified property.
*
* If the property is not set or the real value type is not compatible with the specified value type, returns {@code null}.
diff --git a/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyConnector.java b/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyConnector.java
index 89a0c05..111022d 100644
--- a/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyConnector.java
+++ b/connectors/jetty-connector/src/main/java/org/glassfish/jersey/jetty/connector/JettyConnector.java
@@ -314,8 +314,14 @@
request.followRedirects(clientRequest.resolveProperty(ClientProperties.FOLLOW_REDIRECTS, true));
final Object readTimeout = clientRequest.resolveProperty(ClientProperties.READ_TIMEOUT, -1);
if (readTimeout != null && readTimeout instanceof Integer && (Integer) readTimeout > 0) {
- request.timeout((Integer) readTimeout, TimeUnit.MILLISECONDS);
+ request.idleTimeout((Integer) readTimeout, TimeUnit.MILLISECONDS);
}
+
+ final Object totalTimeout = clientRequest.resolveProperty(JettyClientProperties.TOTAL_TIMEOUT, -1);
+ if (totalTimeout != null && totalTimeout instanceof Integer && (Integer) totalTimeout > 0) {
+ request.timeout((Integer) totalTimeout, TimeUnit.MILLISECONDS);
+ }
+
return request;
}
diff --git a/connectors/jetty-connector/src/test/java/org/glassfish/jersey/jetty/connector/TimeoutTest.java b/connectors/jetty-connector/src/test/java/org/glassfish/jersey/jetty/connector/TimeoutTest.java
index 21469f4..48c5bed 100644
--- a/connectors/jetty-connector/src/test/java/org/glassfish/jersey/jetty/connector/TimeoutTest.java
+++ b/connectors/jetty-connector/src/test/java/org/glassfish/jersey/jetty/connector/TimeoutTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2013, 2020 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2022 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -16,30 +16,37 @@
package org.glassfish.jersey.jetty.connector;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.ProcessingException;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import org.glassfish.jersey.CommonProperties;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.logging.LoggingFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
-
import org.junit.Test;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
/**
* @author Martin Matula
@@ -65,6 +72,48 @@
}
return "GET";
}
+
+ /**
+ * Long-running streaming request
+ *
+ * @param count number of packets send
+ * @param pauseMillis pause between each packets
+ */
+ @GET
+ @Path("stream")
+ public Response streamsWithDelay(@QueryParam("start") @DefaultValue("0") int startMillis, @QueryParam("count") int count,
+ @QueryParam("pauseMillis") int pauseMillis) {
+ StreamingOutput streamingOutput = streamSlowly(startMillis, count, pauseMillis);
+
+ return Response.ok(streamingOutput)
+ .build();
+ }
+ }
+
+ private static StreamingOutput streamSlowly(int startMillis, int count, int pauseMillis) {
+
+ return output -> {
+ try {
+ TimeUnit.MILLISECONDS.sleep(startMillis);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ output.write("begin\n".getBytes(StandardCharsets.UTF_8));
+ output.flush();
+ for (int i = 0; i < count; i++) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(pauseMillis);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ output.write(("message " + i + "\n").getBytes(StandardCharsets.UTF_8));
+ output.flush();
+ }
+ output.write("end".getBytes(StandardCharsets.UTF_8));
+ };
}
@Override
@@ -121,4 +170,74 @@
c.close();
}
}
+
+ /**
+ * Test accessing an operation that is streaming slowly
+ *
+ * @throws ProcessingException in case of a test error.
+ */
+ @Test
+ public void testSlowlyStreamedContentDoesNotReadTimeout() throws Exception {
+
+ int count = 5;
+ int pauseMillis = 50;
+
+ final Response response = target("test")
+ .property(ClientProperties.READ_TIMEOUT, 100L)
+ .property(CommonProperties.OUTBOUND_CONTENT_LENGTH_BUFFER_SERVER, "-1")
+ .path("stream")
+ .queryParam("count", count)
+ .queryParam("pauseMillis", pauseMillis)
+ .request().get();
+
+ assertTrue(response.readEntity(String.class).contains("end"));
+ }
+
+ @Test
+ public void testSlowlyStreamedContentDoesTotalTimeout() throws Exception {
+
+ int count = 5;
+ int pauseMillis = 50;
+
+ try {
+ target("test")
+ .property(JettyClientProperties.TOTAL_TIMEOUT, 100L)
+ .property(CommonProperties.OUTBOUND_CONTENT_LENGTH_BUFFER_SERVER, "-1")
+ .path("stream")
+ .queryParam("count", count)
+ .queryParam("pauseMillis", pauseMillis)
+ .request().get();
+
+ fail("This operation should trigger total timeout");
+ } catch (ProcessingException e) {
+ assertEquals(TimeoutException.class, e.getCause().getClass());
+ }
+ }
+
+ /**
+ * Test accessing an operation that is streaming slowly
+ *
+ * @throws ProcessingException in case of a test error.
+ */
+ @Test
+ public void testSlowToStartStreamedContentDoesReadTimeout() throws Exception {
+
+ int start = 150;
+ int count = 5;
+ int pauseMillis = 50;
+
+ try {
+ target("test")
+ .property(ClientProperties.READ_TIMEOUT, 100L)
+ .property(CommonProperties.OUTBOUND_CONTENT_LENGTH_BUFFER_SERVER, "-1")
+ .path("stream")
+ .queryParam("start", start)
+ .queryParam("count", count)
+ .queryParam("pauseMillis", pauseMillis)
+ .request().get();
+ fail("This operation should trigger idle timeout");
+ } catch (ProcessingException e) {
+ assertEquals(TimeoutException.class, e.getCause().getClass());
+ }
+ }
}