Merge pull request #5559 from jansupol/m242c

Merge 2.x into 3.0
diff --git a/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java b/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java
index 264c453..0e36dee 100644
--- a/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java
+++ b/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2010, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2010, 2024 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -31,9 +31,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.stream.Collectors;
@@ -521,7 +523,7 @@
 
             try {
                 final ConnectionClosingMechanism closingMechanism = new ConnectionClosingMechanism(clientRequest, request);
-                responseContext.setEntityStream(getInputStream(response, closingMechanism));
+                responseContext.setEntityStream(getInputStream(response, closingMechanism, () -> clientRequest.isCancelled()));
             } catch (final IOException e) {
                 LOGGER.log(Level.SEVERE, null, e);
             }
@@ -730,13 +732,14 @@
     }
 
     private static InputStream getInputStream(final CloseableHttpResponse response,
-                                              final ConnectionClosingMechanism closingMechanism) throws IOException {
+                                              final ConnectionClosingMechanism closingMechanism,
+                                              final Supplier<Boolean> isCancelled) throws IOException {
         final InputStream inputStream;
 
         if (response.getEntity() == null) {
             inputStream = new ByteArrayInputStream(new byte[0]);
         } else {
-            final InputStream i = response.getEntity().getContent();
+            final InputStream i = new CancellableInputStream(response.getEntity().getContent(), isCancelled);
             if (i.markSupported()) {
                 inputStream = i;
             } else {
@@ -885,4 +888,68 @@
             }
         }
     }
+
+    private static class CancellableInputStream extends InputStream {
+        private final InputStream in;
+        private final Supplier<Boolean> isCancelled;
+
+        private CancellableInputStream(InputStream in, Supplier<Boolean> isCancelled) {
+            this.in = in;
+            this.isCancelled = isCancelled;
+        }
+
+        public int read(byte b[]) throws IOException {
+            checkAborted();
+            return in.read();
+        }
+
+        public int read(byte b[], int off, int len) throws IOException {
+            checkAborted();
+            return in.read(b, off, len);
+        }
+
+        @Override
+        public int read() throws IOException {
+            checkAborted();
+            return in.read();
+        }
+
+        public boolean markSupported() {
+            return in.markSupported();
+        }
+
+        @Override
+        public long skip(long n) throws IOException {
+            checkAborted();
+            return in.skip(n);
+        }
+
+        @Override
+        public int available() throws IOException {
+            checkAborted();
+            return in.available();
+        }
+
+        @Override
+        public void close() throws IOException {
+            in.close();
+        }
+
+        @Override
+        public synchronized void mark(int readlimit) {
+            in.mark(readlimit);
+        }
+
+        @Override
+        public synchronized void reset() throws IOException {
+            checkAborted();
+            in.reset();
+        }
+
+        private void checkAborted() throws IOException {
+            if (isCancelled.get()) {
+                throw new IOException(new CancellationException());
+            }
+        }
+    }
 }
diff --git a/connectors/apache5-connector/src/main/java/org/glassfish/jersey/apache5/connector/Apache5Connector.java b/connectors/apache5-connector/src/main/java/org/glassfish/jersey/apache5/connector/Apache5Connector.java
index f7c30a9..8d93b91 100644
--- a/connectors/apache5-connector/src/main/java/org/glassfish/jersey/apache5/connector/Apache5Connector.java
+++ b/connectors/apache5-connector/src/main/java/org/glassfish/jersey/apache5/connector/Apache5Connector.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2022, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2022, 2024 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -32,8 +32,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
+import java.util.function.Supplier;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.stream.Collectors;
@@ -531,7 +533,7 @@
 
             try {
                 final ConnectionClosingMechanism closingMechanism = new ConnectionClosingMechanism(clientRequest, request);
-                responseContext.setEntityStream(getInputStream(response, closingMechanism));
+                responseContext.setEntityStream(getInputStream(response, closingMechanism, () -> clientRequest.isCancelled()));
             } catch (final IOException e) {
                 LOGGER.log(Level.SEVERE, null, e);
             }
@@ -741,13 +743,14 @@
     }
 
     private static InputStream getInputStream(final CloseableHttpResponse response,
-                                              final ConnectionClosingMechanism closingMechanism) throws IOException {
+                                              final ConnectionClosingMechanism closingMechanism,
+                                              final Supplier<Boolean> isCancelled) throws IOException {
         final InputStream inputStream;
 
         if (response.getEntity() == null) {
             inputStream = new ByteArrayInputStream(new byte[0]);
         } else {
-            final InputStream i = response.getEntity().getContent();
+            final InputStream i = new CancellableInputStream(response.getEntity().getContent(), isCancelled);
             if (i.markSupported()) {
                 inputStream = i;
             } else {
@@ -889,4 +892,69 @@
             }
         }
     }
+
+    private static class CancellableInputStream extends InputStream {
+        private final InputStream in;
+        private final Supplier<Boolean> isCancelled;
+
+        private CancellableInputStream(InputStream in, Supplier<Boolean> isCancelled) {
+            this.in = in;
+            this.isCancelled = isCancelled;
+        }
+
+        public int read(byte b[]) throws IOException {
+            checkAborted();
+            return in.read();
+        }
+
+        public int read(byte b[], int off, int len) throws IOException {
+            checkAborted();
+            return in.read(b, off, len);
+        }
+
+        @Override
+        public int read() throws IOException {
+            checkAborted();
+            return in.read();
+        }
+
+        public boolean markSupported() {
+            return in.markSupported();
+        }
+
+        @Override
+        public long skip(long n) throws IOException {
+            checkAborted();
+            return in.skip(n);
+        }
+
+        @Override
+        public int available() throws IOException {
+            checkAborted();
+            return in.available();
+        }
+
+        @Override
+        public void close() throws IOException {
+            in.close();
+        }
+
+        @Override
+        public synchronized void mark(int readlimit) {
+            in.mark(readlimit);
+        }
+
+        @Override
+        public synchronized void reset() throws IOException {
+            checkAborted();
+            in.reset();
+        }
+
+        private void checkAborted() throws IOException {
+            if (isCancelled.get()) {
+                throw new IOException(new CancellationException());
+            }
+        }
+    }
+
 }
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
index 4a9836d..5d1d2f2 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2024 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -25,6 +25,7 @@
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
@@ -158,6 +159,10 @@
 
     @Override
     public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
+        if (jerseyRequest.isCancelled()) {
+            responseAvailable.completeExceptionally(new CancellationException());
+            return;
+        }
         if (msg instanceof HttpResponse) {
             final HttpResponse response = (HttpResponse) msg;
             jerseyResponse = new ClientResponse(new Response.StatusType() {
diff --git a/containers/jersey-servlet-core/src/main/java/org/glassfish/jersey/servlet/ServletContainer.java b/containers/jersey-servlet-core/src/main/java/org/glassfish/jersey/servlet/ServletContainer.java
index 13702a9..147ce8c 100644
--- a/containers/jersey-servlet-core/src/main/java/org/glassfish/jersey/servlet/ServletContainer.java
+++ b/containers/jersey-servlet-core/src/main/java/org/glassfish/jersey/servlet/ServletContainer.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2022 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -294,6 +294,7 @@
         final URI baseUri;
         final URI requestUri;
         try {
+            LOGGER.debugLog("ServletContainer.service(...) started");
             baseUri = absoluteUriBuilder.replacePath(encodedBasePath).build();
             String queryParameters = ContainerUtils.encodeUnsafeCharacters(request.getQueryString());
             if (queryParameters == null) {
@@ -531,6 +532,7 @@
         final URI baseUri;
         final URI requestUri;
         try {
+            LOGGER.debugLog("ServletContainer.doFilter(...) started");
             final UriBuilder absoluteUriBuilder = UriBuilder.fromUri(request.getRequestURL().toString());
 
             // depending on circumstances, use the correct path to replace in the absolute request URI
diff --git a/containers/jetty-http/src/main/java11/org/glassfish/jersey/jetty/JettyHttpContainer.java b/containers/jetty-http/src/main/java11/org/glassfish/jersey/jetty/JettyHttpContainer.java
index 1205576..1fcc1b1 100644
--- a/containers/jetty-http/src/main/java11/org/glassfish/jersey/jetty/JettyHttpContainer.java
+++ b/containers/jetty-http/src/main/java11/org/glassfish/jersey/jetty/JettyHttpContainer.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2024 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -148,6 +148,7 @@
         final Response response = request.getResponse();
         final ResponseWriter responseWriter = new ResponseWriter(request, response, configSetStatusOverSendError);
         try {
+            LOGGER.debugLog("JettyHttpContainer.handle(...) started");
             final URI baseUri = getBaseUri(request);
             final URI requestUri = getRequestUri(request, baseUri);
             final ContainerRequest requestContext = new ContainerRequest(
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/ClientRequest.java b/core-client/src/main/java/org/glassfish/jersey/client/ClientRequest.java
index 992311e..fc21c02 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/ClientRequest.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/ClientRequest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -25,6 +25,10 @@
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -90,6 +94,8 @@
     private  LazyValue<PropertiesResolver> propertiesResolver = Values.lazy(
             (Value<PropertiesResolver>) () -> PropertiesResolver.create(getConfiguration(), getPropertiesDelegate())
     );
+    // by default nothing to be cancelled.
+    private Future cancellable = NotCancellable.INSTANCE;
 
     private static final Logger LOGGER = Logger.getLogger(ClientRequest.class.getName());
 
@@ -126,6 +132,7 @@
         this.writerInterceptors = original.writerInterceptors;
         this.propertiesDelegate = new MapPropertiesDelegate(original.propertiesDelegate);
         this.ignoreUserAgent = original.ignoreUserAgent;
+        this.cancellable = original.cancellable;
     }
 
     @Override
@@ -584,4 +591,66 @@
     public void ignoreUserAgent(final boolean ignore) {
         this.ignoreUserAgent = ignore;
     }
+
+    /**
+     * Sets the new {@code Future} that may cancel this {@link ClientRequest}.
+     * @param cancellable
+     */
+    void setCancellable(Future cancellable) {
+        this.cancellable = cancellable;
+    }
+
+    /**
+     * Cancels this {@link ClientRequest}. May result in {@link java.util.concurrent.CancellationException} later in this
+     * request processing if this {@link ClientRequest} is backed by a {@link Future} provided to
+     * {@link JerseyInvocation.Builder#setCancellable(Future)}.
+     * @param mayInterruptIfRunning may have no effect or {@code true} if the thread executing this task should be interrupted
+     *                              (if the thread is known to the implementation);
+     *                              otherwise, in-progress tasks are allowed to complete
+     */
+    public void cancel(boolean mayInterruptIfRunning) {
+        cancellable.cancel(mayInterruptIfRunning);
+    }
+
+    /**
+     * Returns {@code true} if this {@link ClientRequest} was cancelled
+     * before it completed normally.
+     *
+     * @return {@code true} if this {@link ClientRequest} was cancelled
+     * before it completed normally
+     */
+    public boolean isCancelled() {
+        return cancellable.isCancelled();
+    }
+
+    private static class NotCancellable implements Future {
+        public static final Future INSTANCE = new NotCancellable();
+        private boolean isCancelled = false;
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            isCancelled = true;
+            return isCancelled;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return isCancelled;
+        }
+
+        @Override
+        public boolean isDone() {
+            return false;
+        }
+
+        @Override
+        public Object get() throws InterruptedException, ExecutionException {
+            return null;
+        }
+
+        @Override
+        public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            return null;
+        }
+    }
 }
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java b/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
index 222c2a9..c120856 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2011, 2021 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2011, 2024 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -573,6 +573,18 @@
             throw new IllegalStateException(
                     LocalizationMessages.CLIENT_RX_PROVIDER_NOT_REGISTERED(clazz.getSimpleName()));
         }
+
+        /**
+         * Sets Future that backs {@link ClientRequest} {@link ClientRequest#isCancelled()} method. Can be used for instance
+         * by {@link CompletionStageRxInvoker} to pass the created {@link CompletableFuture} to the provided {@link SyncInvoker}.
+         * @param cancellable the {@link Future} whose result of {@link Future#cancel(boolean)} will be available by
+         * {@link ClientRequest#isCancelled()}.
+         * @return the updated builder.
+         */
+        public Builder setCancellable(Future cancellable) {
+            requestContext.setCancellable(cancellable);
+            return this;
+        }
     }
 
     /* package */ static class AsyncInvoker extends CompletableFutureAsyncInvoker implements jakarta.ws.rs.client.AsyncInvoker {
@@ -711,6 +723,8 @@
     public Future<Response> submit() {
         final CompletableFuture<Response> responseFuture = new CompletableFuture<>();
         final ClientRuntime runtime = request().getClientRuntime();
+
+        requestContext.setCancellable(responseFuture);
         runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext),
                 new InvocationResponseCallback<>(responseFuture, (request, scope) -> translate(request, scope, Response.class))));
 
@@ -725,6 +739,7 @@
         final CompletableFuture<T> responseFuture = new CompletableFuture<>();
         final ClientRuntime runtime = request().getClientRuntime();
 
+        requestContext.setCancellable(responseFuture);
         runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext),
                 new InvocationResponseCallback<T>(responseFuture, (request, scope) -> translate(request, scope, responseType))));
 
@@ -764,6 +779,7 @@
         final CompletableFuture<T> responseFuture = new CompletableFuture<>();
         final ClientRuntime runtime = request().getClientRuntime();
 
+        requestContext.setCancellable(responseFuture);
         runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext),
                 new InvocationResponseCallback<T>(responseFuture, (request, scope) -> translate(request, scope, responseType))));
 
@@ -888,6 +904,7 @@
                 }
             };
             final ClientRuntime runtime = request().getClientRuntime();
+            requestContext.setCancellable(responseFuture);
             runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext), responseCallback));
         } catch (final Throwable error) {
             final ProcessingException ce;
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java b/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java
index f5a3409..c89cf19 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2011, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2011, 2024 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -37,6 +37,7 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.function.Supplier;
@@ -155,7 +156,7 @@
         );
     }
 
-    private static InputStream getInputStream(final HttpURLConnection uc) throws IOException {
+    private static InputStream getInputStream(final HttpURLConnection uc, final ClientRequest clientRequest) throws IOException {
         return new InputStream() {
             private final UnsafeValue<InputStream, IOException> in = Values.lazy(new UnsafeValue<InputStream, IOException>() {
                 @Override
@@ -190,6 +191,10 @@
                 if (closed) {
                     throw new IOException("Stream closed");
                 }
+                if (clientRequest.isCancelled()) {
+                    close();
+                    throw new IOException(new CancellationException());
+                }
             }
 
             @Override
@@ -311,7 +316,7 @@
             if (DEFAULT_SSL_SOCKET_FACTORY.get() == suc.getSSLSocketFactory()) {
                 // indicates that the custom socket factory was not set
                 suc.setSSLSocketFactory(sslSocketFactory.get());
-                }
+            }
         }
     }
 
@@ -448,7 +453,7 @@
         );
 
         try {
-            InputStream inputStream = getInputStream(uc);
+            InputStream inputStream = getInputStream(uc, request);
             responseContext.setEntityStream(inputStream);
         } catch (IOException ioe) {
             // allow at least a partial response in a ResponseProcessingException
diff --git a/pom.xml b/pom.xml
index d0641e2..4c042e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -890,6 +890,27 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <!-- Fail the build if checkstyle rules for contributions are not met. -->
+                            <goal>check</goal>
+                        </goals>
+                        <configuration>
+                            <configLocation>etc/config/checkstyle-verify.xml</configLocation>
+                            <consoleOutput>true</consoleOutput>
+                            <failOnViolation>true</failOnViolation>
+                            <includeTestSourceDirectory>true</includeTestSourceDirectory>
+                            <excludes>**/module-info.java</excludes>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
         <extensions>
             <extension>
@@ -1188,37 +1209,6 @@
             </distributionManagement>
         </profile>
         <profile>
-            <id>jdk1.7+</id>
-            <activation>
-                <jdk>[1.7,)</jdk>
-            </activation>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>org.apache.maven.plugins</groupId>
-                        <artifactId>maven-checkstyle-plugin</artifactId>
-                        <executions>
-                            <execution>
-                                <id>verify</id>
-                                <phase>validate</phase>
-                                <goals>
-                                    <!-- Fail the build if checkstyle rules for contributions are not met. -->
-                                    <goal>check</goal>
-                                </goals>
-                                <configuration>
-                                    <configLocation>etc/config/checkstyle-verify.xml</configLocation>
-                                    <consoleOutput>true</consoleOutput>
-                                    <failOnViolation>true</failOnViolation>
-                                    <includeTestSourceDirectory>true</includeTestSourceDirectory>
-                                    <excludes>**/module-info.java</excludes>
-                                </configuration>
-                            </execution>
-                        </executions>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-        <profile>
             <id>sonar</id>
             <properties>
                 <!-- Sonar/Reporting settings (heavily inspired at http://www.aheritier.net/maven-failsafe-sonar-and-jacoco-are-in-a-boat/ -->
diff --git a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/connector/FutureCancelTest.java b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/connector/FutureCancelTest.java
new file mode 100644
index 0000000..72b2d41
--- /dev/null
+++ b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/connector/FutureCancelTest.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.tests.e2e.client.connector;
+
+import org.glassfish.jersey.apache.connector.ApacheConnectorProvider;
+import org.glassfish.jersey.apache5.connector.Apache5ConnectorProvider;
+import org.glassfish.jersey.client.AbstractRxInvoker;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.HttpUrlConnectorProvider;
+import org.glassfish.jersey.client.JerseyInvocation;
+import org.glassfish.jersey.client.RequestEntityProcessing;
+import org.glassfish.jersey.client.spi.ConnectorProvider;
+import org.glassfish.jersey.netty.connector.NettyConnectorProvider;
+import org.glassfish.jersey.server.ChunkedOutput;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.client.ClientBuilder;
+import jakarta.ws.rs.client.CompletionStageRxInvoker;
+import jakarta.ws.rs.client.Entity;
+import jakarta.ws.rs.client.RxInvokerProvider;
+import jakarta.ws.rs.client.SyncInvoker;
+import jakarta.ws.rs.core.Application;
+import jakarta.ws.rs.core.GenericType;
+import jakarta.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class FutureCancelTest extends JerseyTest {
+
+    public static final long SLEEP = 100L;
+
+    public static List<ConnectorProvider> testData() {
+        return Arrays.asList(
+                new ApacheConnectorProvider(),
+                new Apache5ConnectorProvider(),
+                new HttpUrlConnectorProvider(),
+                new NettyConnectorProvider()
+        );
+    }
+
+    @Path("/")
+    public static class FutureCancelResource {
+        @GET
+        public ChunkedOutput<String> sendData() {
+            ChunkedOutput<String> chunkedOutput = new ChunkedOutput<>(String.class);
+            Thread newThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    for (int i = 0; i != 100; i++) {
+                        try {
+                            chunkedOutput.write(String.valueOf(i));
+                            Thread.sleep(SLEEP);
+                        } catch (Exception e) {
+                            // consume
+                        }
+                    }
+                }
+            });
+            newThread.start();
+
+            return chunkedOutput;
+        }
+    }
+
+    @Override
+    protected Application configure() {
+        return new ResourceConfig(FutureCancelResource.class);
+    }
+
+    @ParameterizedTest
+    @MethodSource("testData")
+    public void testFutureCancel(ConnectorProvider connectorProvider) throws InterruptedException, ExecutionException {
+        ClientConfig config = new ClientConfig();
+        config.connectorProvider(connectorProvider);
+
+        Future<List<String>> future = ClientBuilder.newClient(config)
+                .register(new FutureCancelRxInvokerProvider())
+                .property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.CHUNKED)
+                .target(target().getUri()).request().rx(FutureCancelRxInvoker.class).get().toCompletableFuture();
+
+        int expectedSize = 2;
+
+        while (RX_LIST.size() < expectedSize) {
+            Thread.sleep(SLEEP);
+        }
+        future.cancel(true);
+
+        Thread.sleep(2 * SLEEP); // wait to see no new messages arrive
+        int size = RX_LIST.size(); // some might have beween RX_LIST.size() and cancel()
+        while (size > expectedSize) { // be sure no more come
+            Thread.sleep(SLEEP);
+            expectedSize = size;
+            size = RX_LIST.size();
+        }
+
+        Assertions.assertTrue(size < 10, "Received " + size + " messages");
+    }
+
+    private static List<String> RX_LIST = new LinkedList<>();
+
+    public static class FutureCancelRxInvokerProvider implements RxInvokerProvider<FutureCancelRxInvoker> {
+
+        Function<InputStream, Object> function = new Function<InputStream, Object>() {
+            @Override
+            public Object apply(InputStream inputStream) {
+                byte[] number = new byte[8];
+                int len = 0;
+                do {
+                    try {
+                        if ((len = inputStream.read(number)) != -1) {
+                            RX_LIST.add(new String(number).substring(0, len));
+                        } else {
+                            break;
+                        }
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                } while (true);
+                return RX_LIST;
+            }
+        };
+
+        @Override
+        public boolean isProviderFor(Class<?> clazz) {
+            return FutureCancelRxInvoker.class.equals(clazz);
+        }
+
+        @Override
+        public FutureCancelRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
+            return new FutureCancelRxInvoker(syncInvoker, executorService, function);
+        }
+    }
+
+    private static class FutureCancelRxInvoker extends AbstractRxInvoker<CompletionStage> implements CompletionStageRxInvoker {
+        private final Function<InputStream, Object> consumer;
+
+        public FutureCancelRxInvoker(SyncInvoker syncInvoker, ExecutorService executor, Function<InputStream, Object> consumer) {
+            super(syncInvoker, executor);
+            this.consumer = consumer;
+        }
+
+        @Override
+        public <R> CompletionStage method(String name, Entity<?> entity, Class<R> responseType) {
+            CompletableFuture<R> completableFuture = CompletableFuture.supplyAsync(new Supplier<R>() {
+                @Override
+                public R get() {
+                    Response r = getSyncInvoker().get();
+                    InputStream is = r.readEntity(InputStream.class);
+                    Object o = consumer.apply(is);
+                    return (R) o;
+                }
+            }, getExecutorService());
+            ((JerseyInvocation.Builder) getSyncInvoker()).setCancellable(completableFuture);
+            return completableFuture;
+        }
+
+        @Override
+        public <R> CompletionStage method(String name, Entity<?> entity, GenericType<R> responseType) {
+            CompletableFuture<R> completableFuture = CompletableFuture.supplyAsync(new Supplier<R>() {
+                @Override
+                public R get() {
+                    Response r = getSyncInvoker().get();
+                    InputStream is = r.readEntity(InputStream.class);
+                    Object o = consumer.apply(is);
+                    return (R) o;
+                }
+            }, getExecutorService());
+            return completableFuture;
+        }
+    }
+}