Merge pull request #5559 from jansupol/m242c
Merge 2.x into 3.0
diff --git a/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java b/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java
index 264c453..0e36dee 100644
--- a/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java
+++ b/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2010, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -31,9 +31,11 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -521,7 +523,7 @@
try {
final ConnectionClosingMechanism closingMechanism = new ConnectionClosingMechanism(clientRequest, request);
- responseContext.setEntityStream(getInputStream(response, closingMechanism));
+ responseContext.setEntityStream(getInputStream(response, closingMechanism, () -> clientRequest.isCancelled()));
} catch (final IOException e) {
LOGGER.log(Level.SEVERE, null, e);
}
@@ -730,13 +732,14 @@
}
private static InputStream getInputStream(final CloseableHttpResponse response,
- final ConnectionClosingMechanism closingMechanism) throws IOException {
+ final ConnectionClosingMechanism closingMechanism,
+ final Supplier<Boolean> isCancelled) throws IOException {
final InputStream inputStream;
if (response.getEntity() == null) {
inputStream = new ByteArrayInputStream(new byte[0]);
} else {
- final InputStream i = response.getEntity().getContent();
+ final InputStream i = new CancellableInputStream(response.getEntity().getContent(), isCancelled);
if (i.markSupported()) {
inputStream = i;
} else {
@@ -885,4 +888,68 @@
}
}
}
+
+ private static class CancellableInputStream extends InputStream {
+ private final InputStream in;
+ private final Supplier<Boolean> isCancelled;
+
+ private CancellableInputStream(InputStream in, Supplier<Boolean> isCancelled) {
+ this.in = in;
+ this.isCancelled = isCancelled;
+ }
+
+ public int read(byte b[]) throws IOException {
+ checkAborted();
+ return in.read();
+ }
+
+ public int read(byte b[], int off, int len) throws IOException {
+ checkAborted();
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public int read() throws IOException {
+ checkAborted();
+ return in.read();
+ }
+
+ public boolean markSupported() {
+ return in.markSupported();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ checkAborted();
+ return in.skip(n);
+ }
+
+ @Override
+ public int available() throws IOException {
+ checkAborted();
+ return in.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ in.mark(readlimit);
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ checkAborted();
+ in.reset();
+ }
+
+ private void checkAborted() throws IOException {
+ if (isCancelled.get()) {
+ throw new IOException(new CancellationException());
+ }
+ }
+ }
}
diff --git a/connectors/apache5-connector/src/main/java/org/glassfish/jersey/apache5/connector/Apache5Connector.java b/connectors/apache5-connector/src/main/java/org/glassfish/jersey/apache5/connector/Apache5Connector.java
index f7c30a9..8d93b91 100644
--- a/connectors/apache5-connector/src/main/java/org/glassfish/jersey/apache5/connector/Apache5Connector.java
+++ b/connectors/apache5-connector/src/main/java/org/glassfish/jersey/apache5/connector/Apache5Connector.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2022, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -32,8 +32,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
+import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -531,7 +533,7 @@
try {
final ConnectionClosingMechanism closingMechanism = new ConnectionClosingMechanism(clientRequest, request);
- responseContext.setEntityStream(getInputStream(response, closingMechanism));
+ responseContext.setEntityStream(getInputStream(response, closingMechanism, () -> clientRequest.isCancelled()));
} catch (final IOException e) {
LOGGER.log(Level.SEVERE, null, e);
}
@@ -741,13 +743,14 @@
}
private static InputStream getInputStream(final CloseableHttpResponse response,
- final ConnectionClosingMechanism closingMechanism) throws IOException {
+ final ConnectionClosingMechanism closingMechanism,
+ final Supplier<Boolean> isCancelled) throws IOException {
final InputStream inputStream;
if (response.getEntity() == null) {
inputStream = new ByteArrayInputStream(new byte[0]);
} else {
- final InputStream i = response.getEntity().getContent();
+ final InputStream i = new CancellableInputStream(response.getEntity().getContent(), isCancelled);
if (i.markSupported()) {
inputStream = i;
} else {
@@ -889,4 +892,69 @@
}
}
}
+
+ private static class CancellableInputStream extends InputStream {
+ private final InputStream in;
+ private final Supplier<Boolean> isCancelled;
+
+ private CancellableInputStream(InputStream in, Supplier<Boolean> isCancelled) {
+ this.in = in;
+ this.isCancelled = isCancelled;
+ }
+
+ public int read(byte b[]) throws IOException {
+ checkAborted();
+ return in.read();
+ }
+
+ public int read(byte b[], int off, int len) throws IOException {
+ checkAborted();
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public int read() throws IOException {
+ checkAborted();
+ return in.read();
+ }
+
+ public boolean markSupported() {
+ return in.markSupported();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ checkAborted();
+ return in.skip(n);
+ }
+
+ @Override
+ public int available() throws IOException {
+ checkAborted();
+ return in.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ in.mark(readlimit);
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ checkAborted();
+ in.reset();
+ }
+
+ private void checkAborted() throws IOException {
+ if (isCancelled.get()) {
+ throw new IOException(new CancellationException());
+ }
+ }
+ }
+
}
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
index 4a9836d..5d1d2f2 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -25,6 +25,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
@@ -158,6 +159,10 @@
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
+ if (jerseyRequest.isCancelled()) {
+ responseAvailable.completeExceptionally(new CancellationException());
+ return;
+ }
if (msg instanceof HttpResponse) {
final HttpResponse response = (HttpResponse) msg;
jerseyResponse = new ClientResponse(new Response.StatusType() {
diff --git a/containers/jersey-servlet-core/src/main/java/org/glassfish/jersey/servlet/ServletContainer.java b/containers/jersey-servlet-core/src/main/java/org/glassfish/jersey/servlet/ServletContainer.java
index 13702a9..147ce8c 100644
--- a/containers/jersey-servlet-core/src/main/java/org/glassfish/jersey/servlet/ServletContainer.java
+++ b/containers/jersey-servlet-core/src/main/java/org/glassfish/jersey/servlet/ServletContainer.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2022 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -294,6 +294,7 @@
final URI baseUri;
final URI requestUri;
try {
+ LOGGER.debugLog("ServletContainer.service(...) started");
baseUri = absoluteUriBuilder.replacePath(encodedBasePath).build();
String queryParameters = ContainerUtils.encodeUnsafeCharacters(request.getQueryString());
if (queryParameters == null) {
@@ -531,6 +532,7 @@
final URI baseUri;
final URI requestUri;
try {
+ LOGGER.debugLog("ServletContainer.doFilter(...) started");
final UriBuilder absoluteUriBuilder = UriBuilder.fromUri(request.getRequestURL().toString());
// depending on circumstances, use the correct path to replace in the absolute request URI
diff --git a/containers/jetty-http/src/main/java11/org/glassfish/jersey/jetty/JettyHttpContainer.java b/containers/jetty-http/src/main/java11/org/glassfish/jersey/jetty/JettyHttpContainer.java
index 1205576..1fcc1b1 100644
--- a/containers/jetty-http/src/main/java11/org/glassfish/jersey/jetty/JettyHttpContainer.java
+++ b/containers/jetty-http/src/main/java11/org/glassfish/jersey/jetty/JettyHttpContainer.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2013, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -148,6 +148,7 @@
final Response response = request.getResponse();
final ResponseWriter responseWriter = new ResponseWriter(request, response, configSetStatusOverSendError);
try {
+ LOGGER.debugLog("JettyHttpContainer.handle(...) started");
final URI baseUri = getBaseUri(request);
final URI requestUri = getRequestUri(request, baseUri);
final ContainerRequest requestContext = new ContainerRequest(
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/ClientRequest.java b/core-client/src/main/java/org/glassfish/jersey/client/ClientRequest.java
index 992311e..fc21c02 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/ClientRequest.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/ClientRequest.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -25,6 +25,10 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -90,6 +94,8 @@
private LazyValue<PropertiesResolver> propertiesResolver = Values.lazy(
(Value<PropertiesResolver>) () -> PropertiesResolver.create(getConfiguration(), getPropertiesDelegate())
);
+ // by default nothing to be cancelled.
+ private Future cancellable = NotCancellable.INSTANCE;
private static final Logger LOGGER = Logger.getLogger(ClientRequest.class.getName());
@@ -126,6 +132,7 @@
this.writerInterceptors = original.writerInterceptors;
this.propertiesDelegate = new MapPropertiesDelegate(original.propertiesDelegate);
this.ignoreUserAgent = original.ignoreUserAgent;
+ this.cancellable = original.cancellable;
}
@Override
@@ -584,4 +591,66 @@
public void ignoreUserAgent(final boolean ignore) {
this.ignoreUserAgent = ignore;
}
+
+ /**
+ * Sets the new {@code Future} that may cancel this {@link ClientRequest}.
+ * @param cancellable
+ */
+ void setCancellable(Future cancellable) {
+ this.cancellable = cancellable;
+ }
+
+ /**
+ * Cancels this {@link ClientRequest}. May result in {@link java.util.concurrent.CancellationException} later in this
+ * request processing if this {@link ClientRequest} is backed by a {@link Future} provided to
+ * {@link JerseyInvocation.Builder#setCancellable(Future)}.
+ * @param mayInterruptIfRunning may have no effect or {@code true} if the thread executing this task should be interrupted
+ * (if the thread is known to the implementation);
+ * otherwise, in-progress tasks are allowed to complete
+ */
+ public void cancel(boolean mayInterruptIfRunning) {
+ cancellable.cancel(mayInterruptIfRunning);
+ }
+
+ /**
+ * Returns {@code true} if this {@link ClientRequest} was cancelled
+ * before it completed normally.
+ *
+ * @return {@code true} if this {@link ClientRequest} was cancelled
+ * before it completed normally
+ */
+ public boolean isCancelled() {
+ return cancellable.isCancelled();
+ }
+
+ private static class NotCancellable implements Future {
+ public static final Future INSTANCE = new NotCancellable();
+ private boolean isCancelled = false;
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ isCancelled = true;
+ return isCancelled;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return isCancelled;
+ }
+
+ @Override
+ public boolean isDone() {
+ return false;
+ }
+
+ @Override
+ public Object get() throws InterruptedException, ExecutionException {
+ return null;
+ }
+
+ @Override
+ public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return null;
+ }
+ }
}
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java b/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
index 222c2a9..c120856 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2011, 2021 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2011, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -573,6 +573,18 @@
throw new IllegalStateException(
LocalizationMessages.CLIENT_RX_PROVIDER_NOT_REGISTERED(clazz.getSimpleName()));
}
+
+ /**
+ * Sets Future that backs {@link ClientRequest} {@link ClientRequest#isCancelled()} method. Can be used for instance
+ * by {@link CompletionStageRxInvoker} to pass the created {@link CompletableFuture} to the provided {@link SyncInvoker}.
+ * @param cancellable the {@link Future} whose result of {@link Future#cancel(boolean)} will be available by
+ * {@link ClientRequest#isCancelled()}.
+ * @return the updated builder.
+ */
+ public Builder setCancellable(Future cancellable) {
+ requestContext.setCancellable(cancellable);
+ return this;
+ }
}
/* package */ static class AsyncInvoker extends CompletableFutureAsyncInvoker implements jakarta.ws.rs.client.AsyncInvoker {
@@ -711,6 +723,8 @@
public Future<Response> submit() {
final CompletableFuture<Response> responseFuture = new CompletableFuture<>();
final ClientRuntime runtime = request().getClientRuntime();
+
+ requestContext.setCancellable(responseFuture);
runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext),
new InvocationResponseCallback<>(responseFuture, (request, scope) -> translate(request, scope, Response.class))));
@@ -725,6 +739,7 @@
final CompletableFuture<T> responseFuture = new CompletableFuture<>();
final ClientRuntime runtime = request().getClientRuntime();
+ requestContext.setCancellable(responseFuture);
runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext),
new InvocationResponseCallback<T>(responseFuture, (request, scope) -> translate(request, scope, responseType))));
@@ -764,6 +779,7 @@
final CompletableFuture<T> responseFuture = new CompletableFuture<>();
final ClientRuntime runtime = request().getClientRuntime();
+ requestContext.setCancellable(responseFuture);
runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext),
new InvocationResponseCallback<T>(responseFuture, (request, scope) -> translate(request, scope, responseType))));
@@ -888,6 +904,7 @@
}
};
final ClientRuntime runtime = request().getClientRuntime();
+ requestContext.setCancellable(responseFuture);
runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext), responseCallback));
} catch (final Throwable error) {
final ProcessingException ce;
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java b/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java
index f5a3409..c89cf19 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2011, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2011, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
@@ -155,7 +156,7 @@
);
}
- private static InputStream getInputStream(final HttpURLConnection uc) throws IOException {
+ private static InputStream getInputStream(final HttpURLConnection uc, final ClientRequest clientRequest) throws IOException {
return new InputStream() {
private final UnsafeValue<InputStream, IOException> in = Values.lazy(new UnsafeValue<InputStream, IOException>() {
@Override
@@ -190,6 +191,10 @@
if (closed) {
throw new IOException("Stream closed");
}
+ if (clientRequest.isCancelled()) {
+ close();
+ throw new IOException(new CancellationException());
+ }
}
@Override
@@ -311,7 +316,7 @@
if (DEFAULT_SSL_SOCKET_FACTORY.get() == suc.getSSLSocketFactory()) {
// indicates that the custom socket factory was not set
suc.setSSLSocketFactory(sslSocketFactory.get());
- }
+ }
}
}
@@ -448,7 +453,7 @@
);
try {
- InputStream inputStream = getInputStream(uc);
+ InputStream inputStream = getInputStream(uc, request);
responseContext.setEntityStream(inputStream);
} catch (IOException ioe) {
// allow at least a partial response in a ResponseProcessingException
diff --git a/pom.xml b/pom.xml
index d0641e2..4c042e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -890,6 +890,27 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>verify</id>
+ <phase>validate</phase>
+ <goals>
+ <!-- Fail the build if checkstyle rules for contributions are not met. -->
+ <goal>check</goal>
+ </goals>
+ <configuration>
+ <configLocation>etc/config/checkstyle-verify.xml</configLocation>
+ <consoleOutput>true</consoleOutput>
+ <failOnViolation>true</failOnViolation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <excludes>**/module-info.java</excludes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
<extensions>
<extension>
@@ -1188,37 +1209,6 @@
</distributionManagement>
</profile>
<profile>
- <id>jdk1.7+</id>
- <activation>
- <jdk>[1.7,)</jdk>
- </activation>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <executions>
- <execution>
- <id>verify</id>
- <phase>validate</phase>
- <goals>
- <!-- Fail the build if checkstyle rules for contributions are not met. -->
- <goal>check</goal>
- </goals>
- <configuration>
- <configLocation>etc/config/checkstyle-verify.xml</configLocation>
- <consoleOutput>true</consoleOutput>
- <failOnViolation>true</failOnViolation>
- <includeTestSourceDirectory>true</includeTestSourceDirectory>
- <excludes>**/module-info.java</excludes>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
<id>sonar</id>
<properties>
<!-- Sonar/Reporting settings (heavily inspired at http://www.aheritier.net/maven-failsafe-sonar-and-jacoco-are-in-a-boat/ -->
diff --git a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/connector/FutureCancelTest.java b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/connector/FutureCancelTest.java
new file mode 100644
index 0000000..72b2d41
--- /dev/null
+++ b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/connector/FutureCancelTest.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.tests.e2e.client.connector;
+
+import org.glassfish.jersey.apache.connector.ApacheConnectorProvider;
+import org.glassfish.jersey.apache5.connector.Apache5ConnectorProvider;
+import org.glassfish.jersey.client.AbstractRxInvoker;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.HttpUrlConnectorProvider;
+import org.glassfish.jersey.client.JerseyInvocation;
+import org.glassfish.jersey.client.RequestEntityProcessing;
+import org.glassfish.jersey.client.spi.ConnectorProvider;
+import org.glassfish.jersey.netty.connector.NettyConnectorProvider;
+import org.glassfish.jersey.server.ChunkedOutput;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.client.ClientBuilder;
+import jakarta.ws.rs.client.CompletionStageRxInvoker;
+import jakarta.ws.rs.client.Entity;
+import jakarta.ws.rs.client.RxInvokerProvider;
+import jakarta.ws.rs.client.SyncInvoker;
+import jakarta.ws.rs.core.Application;
+import jakarta.ws.rs.core.GenericType;
+import jakarta.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class FutureCancelTest extends JerseyTest {
+
+ public static final long SLEEP = 100L;
+
+ public static List<ConnectorProvider> testData() {
+ return Arrays.asList(
+ new ApacheConnectorProvider(),
+ new Apache5ConnectorProvider(),
+ new HttpUrlConnectorProvider(),
+ new NettyConnectorProvider()
+ );
+ }
+
+ @Path("/")
+ public static class FutureCancelResource {
+ @GET
+ public ChunkedOutput<String> sendData() {
+ ChunkedOutput<String> chunkedOutput = new ChunkedOutput<>(String.class);
+ Thread newThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i != 100; i++) {
+ try {
+ chunkedOutput.write(String.valueOf(i));
+ Thread.sleep(SLEEP);
+ } catch (Exception e) {
+ // consume
+ }
+ }
+ }
+ });
+ newThread.start();
+
+ return chunkedOutput;
+ }
+ }
+
+ @Override
+ protected Application configure() {
+ return new ResourceConfig(FutureCancelResource.class);
+ }
+
+ @ParameterizedTest
+ @MethodSource("testData")
+ public void testFutureCancel(ConnectorProvider connectorProvider) throws InterruptedException, ExecutionException {
+ ClientConfig config = new ClientConfig();
+ config.connectorProvider(connectorProvider);
+
+ Future<List<String>> future = ClientBuilder.newClient(config)
+ .register(new FutureCancelRxInvokerProvider())
+ .property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.CHUNKED)
+ .target(target().getUri()).request().rx(FutureCancelRxInvoker.class).get().toCompletableFuture();
+
+ int expectedSize = 2;
+
+ while (RX_LIST.size() < expectedSize) {
+ Thread.sleep(SLEEP);
+ }
+ future.cancel(true);
+
+ Thread.sleep(2 * SLEEP); // wait to see no new messages arrive
+ int size = RX_LIST.size(); // some might have beween RX_LIST.size() and cancel()
+ while (size > expectedSize) { // be sure no more come
+ Thread.sleep(SLEEP);
+ expectedSize = size;
+ size = RX_LIST.size();
+ }
+
+ Assertions.assertTrue(size < 10, "Received " + size + " messages");
+ }
+
+ private static List<String> RX_LIST = new LinkedList<>();
+
+ public static class FutureCancelRxInvokerProvider implements RxInvokerProvider<FutureCancelRxInvoker> {
+
+ Function<InputStream, Object> function = new Function<InputStream, Object>() {
+ @Override
+ public Object apply(InputStream inputStream) {
+ byte[] number = new byte[8];
+ int len = 0;
+ do {
+ try {
+ if ((len = inputStream.read(number)) != -1) {
+ RX_LIST.add(new String(number).substring(0, len));
+ } else {
+ break;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } while (true);
+ return RX_LIST;
+ }
+ };
+
+ @Override
+ public boolean isProviderFor(Class<?> clazz) {
+ return FutureCancelRxInvoker.class.equals(clazz);
+ }
+
+ @Override
+ public FutureCancelRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
+ return new FutureCancelRxInvoker(syncInvoker, executorService, function);
+ }
+ }
+
+ private static class FutureCancelRxInvoker extends AbstractRxInvoker<CompletionStage> implements CompletionStageRxInvoker {
+ private final Function<InputStream, Object> consumer;
+
+ public FutureCancelRxInvoker(SyncInvoker syncInvoker, ExecutorService executor, Function<InputStream, Object> consumer) {
+ super(syncInvoker, executor);
+ this.consumer = consumer;
+ }
+
+ @Override
+ public <R> CompletionStage method(String name, Entity<?> entity, Class<R> responseType) {
+ CompletableFuture<R> completableFuture = CompletableFuture.supplyAsync(new Supplier<R>() {
+ @Override
+ public R get() {
+ Response r = getSyncInvoker().get();
+ InputStream is = r.readEntity(InputStream.class);
+ Object o = consumer.apply(is);
+ return (R) o;
+ }
+ }, getExecutorService());
+ ((JerseyInvocation.Builder) getSyncInvoker()).setCancellable(completableFuture);
+ return completableFuture;
+ }
+
+ @Override
+ public <R> CompletionStage method(String name, Entity<?> entity, GenericType<R> responseType) {
+ CompletableFuture<R> completableFuture = CompletableFuture.supplyAsync(new Supplier<R>() {
+ @Override
+ public R get() {
+ Response r = getSyncInvoker().get();
+ InputStream is = r.readEntity(InputStream.class);
+ Object o = consumer.apply(is);
+ return (R) o;
+ }
+ }, getExecutorService());
+ return completableFuture;
+ }
+ }
+}