Offer the Client (partial) response in ProcessingException. (#4460)

Offer the user (partial) response in ProcessingException.
In the HttpUrlConnector, when there is a response,
throw ResponseProcessingException instead of a simple
ProcessingException.

Signed-off-by: Jan Supol <jan.supol@oracle.com>
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/ClientRuntime.java b/core-client/src/main/java/org/glassfish/jersey/client/ClientRuntime.java
index 9d5ef02..e19d787 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/ClientRuntime.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/ClientRuntime.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2020 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -34,6 +34,7 @@
 
 import javax.inject.Provider;
 
+import org.glassfish.jersey.client.internal.ClientResponseProcessingException;
 import org.glassfish.jersey.client.internal.LocalizationMessages;
 import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
 import org.glassfish.jersey.client.spi.Connector;
@@ -299,6 +300,9 @@
             }
 
             response = Stages.process(response, responseProcessingRoot);
+        } catch (final ClientResponseProcessingException crpe) {
+            processingException = crpe;
+            response = crpe.getClientResponse();
         } catch (final ProcessingException pe) {
             processingException = pe;
         } catch (final Throwable t) {
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java b/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
index 22ee42c..ceb9009 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
@@ -25,6 +25,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.function.BiFunction;
 import java.util.logging.Logger;
 
 import javax.ws.rs.BadRequestException;
@@ -57,6 +58,7 @@
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
 
+import org.glassfish.jersey.client.internal.ClientResponseProcessingException;
 import org.glassfish.jersey.client.internal.LocalizationMessages;
 import org.glassfish.jersey.internal.MapPropertiesDelegate;
 import org.glassfish.jersey.internal.inject.Providers;
@@ -612,9 +614,10 @@
     public Response invoke() throws ProcessingException, WebApplicationException {
         final ClientRuntime runtime = request().getClientRuntime();
         final RequestScope requestScope = runtime.getRequestScope();
-        return requestScope.runInScope(
-                (Producer<Response>) () -> new InboundJaxrsResponse(runtime.invoke(requestForCall(requestContext)),
-                                                                    requestScope));
+
+        return runInScope(((Producer<Response>) () ->
+                        new InboundJaxrsResponse(runtime.invoke(requestForCall(requestContext)), requestScope)),
+                        requestScope);
     }
 
     @Override
@@ -624,17 +627,9 @@
         }
         final ClientRuntime runtime = request().getClientRuntime();
         final RequestScope requestScope = runtime.getRequestScope();
-        //noinspection Duplicates
-        return requestScope.runInScope(() -> {
-            try {
-                return translate(runtime.invoke(requestForCall(requestContext)), requestScope, responseType);
-            } catch (final ProcessingException ex) {
-                if (ex.getCause() instanceof WebApplicationException) {
-                    throw (WebApplicationException) ex.getCause();
-                }
-                throw ex;
-            }
-        });
+
+        return runInScope(() ->
+                translate(runtime.invoke(requestForCall(requestContext)), requestScope, responseType), requestScope);
     }
 
     @Override
@@ -644,41 +639,37 @@
         }
         final ClientRuntime runtime = request().getClientRuntime();
         final RequestScope requestScope = runtime.getRequestScope();
-        //noinspection Duplicates
-        return requestScope.runInScope(() -> {
-            try {
-                return translate(runtime.invoke(requestForCall(requestContext)), requestScope, responseType);
-            } catch (final ProcessingException ex) {
-                if (ex.getCause() instanceof WebApplicationException) {
-                    throw (WebApplicationException) ex.getCause();
-                }
-                throw ex;
+
+        return runInScope(() ->
+                translate(runtime.invoke(requestForCall(requestContext)), requestScope, responseType), requestScope);
+    }
+
+    private <T> T runInScope(Producer<T> producer, RequestScope scope) throws ProcessingException, WebApplicationException {
+        return scope.runInScope(() -> call(producer, scope));
+    }
+
+    private <T> T call(Producer<T> producer, RequestScope scope)
+            throws ProcessingException, WebApplicationException {
+        try {
+            return producer.call();
+        } catch (final ClientResponseProcessingException crpe) {
+            throw new ResponseProcessingException(
+                    translate(crpe.getClientResponse(), scope, Response.class), crpe.getCause()
+            );
+        } catch (final ProcessingException ex) {
+            if (WebApplicationException.class.isInstance(ex.getCause())) {
+                throw (WebApplicationException) ex.getCause();
             }
-        });
+            throw ex;
+        }
     }
 
     @Override
     public Future<Response> submit() {
         final CompletableFuture<Response> responseFuture = new CompletableFuture<>();
         final ClientRuntime runtime = request().getClientRuntime();
-        runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext), new ResponseCallback() {
-
-            @Override
-            public void completed(final ClientResponse response, final RequestScope scope) {
-                if (!responseFuture.isCancelled()) {
-                    responseFuture.complete(new InboundJaxrsResponse(response, scope));
-                } else {
-                    response.close();
-                }
-            }
-
-            @Override
-            public void failed(final ProcessingException error) {
-                if (!responseFuture.isCancelled()) {
-                    responseFuture.completeExceptionally(error);
-                }
-            }
-        }));
+        runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext),
+                new InvocationResponseCallback<>(responseFuture, (request, scope) -> translate(request, scope, Response.class))));
 
         return responseFuture;
     }
@@ -689,35 +680,10 @@
             throw new IllegalArgumentException(LocalizationMessages.RESPONSE_TYPE_IS_NULL());
         }
         final CompletableFuture<T> responseFuture = new CompletableFuture<>();
-        //noinspection Duplicates
         final ClientRuntime runtime = request().getClientRuntime();
-        runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext), new ResponseCallback() {
 
-            @Override
-            public void completed(final ClientResponse response, final RequestScope scope) {
-                if (responseFuture.isCancelled()) {
-                    response.close();
-                    return;
-                }
-                try {
-                    responseFuture.complete(translate(response, scope, responseType));
-                } catch (final ProcessingException ex) {
-                    failed(ex);
-                }
-            }
-
-            @Override
-            public void failed(final ProcessingException error) {
-                if (responseFuture.isCancelled()) {
-                    return;
-                }
-                if (error.getCause() instanceof WebApplicationException) {
-                    responseFuture.completeExceptionally(error.getCause());
-                } else {
-                    responseFuture.completeExceptionally(error);
-                }
-            }
-        }));
+        runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext),
+                new InvocationResponseCallback<T>(responseFuture, (request, scope) -> translate(request, scope, responseType))));
 
         return responseFuture;
     }
@@ -753,36 +719,10 @@
             throw new IllegalArgumentException(LocalizationMessages.RESPONSE_TYPE_IS_NULL());
         }
         final CompletableFuture<T> responseFuture = new CompletableFuture<>();
-        //noinspection Duplicates
         final ClientRuntime runtime = request().getClientRuntime();
-        runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext), new ResponseCallback() {
 
-            @Override
-            public void completed(final ClientResponse response, final RequestScope scope) {
-                if (responseFuture.isCancelled()) {
-                    response.close();
-                    return;
-                }
-
-                try {
-                    responseFuture.complete(translate(response, scope, responseType));
-                } catch (final ProcessingException ex) {
-                    failed(ex);
-                }
-            }
-
-            @Override
-            public void failed(final ProcessingException error) {
-                if (responseFuture.isCancelled()) {
-                    return;
-                }
-                if (error.getCause() instanceof WebApplicationException) {
-                    responseFuture.completeExceptionally(error.getCause());
-                } else {
-                    responseFuture.completeExceptionally(error);
-                }
-            }
-        }));
+        runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext),
+                new InvocationResponseCallback<T>(responseFuture, (request, scope) -> translate(request, scope, responseType))));
 
         return responseFuture;
     }
@@ -883,14 +823,24 @@
 
                 @Override
                 public void failed(final ProcessingException error) {
+                    Exception called = null;
                     try {
                         if (error.getCause() instanceof WebApplicationException) {
                             responseFuture.completeExceptionally(error.getCause());
                         } else if (!responseFuture.isCancelled()) {
-                            responseFuture.completeExceptionally(error);
+                            try {
+                                call(() -> { throw error; }, null);
+                            } catch (Exception ex) {
+                                called = ex;
+                                responseFuture.completeExceptionally(ex);
+                            }
                         }
                     } finally {
-                        callback.failed(error.getCause() instanceof CancellationException ? error.getCause() : error);
+                        callback.failed(
+                                error.getCause() instanceof CancellationException
+                                        ? error.getCause()
+                                        : called != null ? called : error
+                        );
                     }
                 }
             };
@@ -899,7 +849,10 @@
         } catch (final Throwable error) {
             final ProcessingException ce;
             //noinspection ChainOfInstanceofChecks
-            if (error instanceof ProcessingException) {
+            if (error instanceof ClientResponseProcessingException) {
+                ce = new ProcessingException(error.getCause());
+                responseFuture.completeExceptionally(ce);
+            } else if (error instanceof ProcessingException) {
                 ce = (ProcessingException) error;
                 responseFuture.completeExceptionally(ce);
             } else if (error instanceof WebApplicationException) {
@@ -1006,4 +959,45 @@
     public String toString() {
         return "JerseyInvocation [" + request().getMethod() + ' ' + request().getUri() + "]";
     }
+
+    private class InvocationResponseCallback<R> implements ResponseCallback {
+        private final CompletableFuture<R> responseFuture;
+        private final BiFunction<ClientResponse, RequestScope, R> producer;
+
+        private InvocationResponseCallback(CompletableFuture<R> responseFuture,
+                                           BiFunction<ClientResponse, RequestScope, R> producer) {
+            this.responseFuture = responseFuture;
+            this.producer = producer;
+        }
+
+        @Override
+        public void completed(final ClientResponse response, final RequestScope scope) {
+            if (responseFuture.isCancelled()) {
+                response.close();
+                return;
+            }
+
+
+            try {
+                responseFuture.complete(producer.apply(response, scope));
+            } catch (final ProcessingException ex) {
+                failed(ex);
+            }
+        }
+
+        @Override
+        public void failed(final ProcessingException error) {
+            if (responseFuture.isCancelled()) {
+                return;
+            }
+
+            try {
+                call(() -> {
+                    throw error;
+                }, null);
+            } catch (Exception exception) {
+                responseFuture.completeExceptionally(exception);
+            }
+        }
+    }
 }
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/internal/ClientResponseProcessingException.java b/core-client/src/main/java/org/glassfish/jersey/client/internal/ClientResponseProcessingException.java
new file mode 100644
index 0000000..ef38b63
--- /dev/null
+++ b/core-client/src/main/java/org/glassfish/jersey/client/internal/ClientResponseProcessingException.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.client.internal;
+
+import org.glassfish.jersey.client.ClientResponse;
+
+import javax.ws.rs.ProcessingException;
+
+/**
+ * This is a representation of a @{link ProcessingException} containing a @{link ClientResponse} instance.
+ * This exception is meant to be converted to a {@code ResponseProcessingException} at a point where
+ * {@link ClientResponse} is converted to a {@code Response} before it is delivered to a user.
+ * @since 2.31
+ */
+public class ClientResponseProcessingException extends ProcessingException {
+    private static final long serialVersionUID = 3389677946623416847L;
+    private final ClientResponse clientResponse;
+
+    /**
+     * An instance of {@code ClientResponseProcessingException} containing {@link ClientResponse} and cause {@link Throwable}.
+     * @param clientResponse a {@link ClientResponse} to be converted to {@code Response}.
+     * @param cause a cause of the exception.
+     */
+    public ClientResponseProcessingException(ClientResponse clientResponse, Throwable cause) {
+        super(cause);
+        this.clientResponse = clientResponse;
+    }
+
+    /**
+     * Return a {@link ClientResponse} to be converted to {@code Response} to be put to a {@code ResponseProcessingException}.
+     * @return a {@link ClientResponse} to be converted to {@code Response}.
+     */
+    public ClientResponse getClientResponse() {
+        return clientResponse;
+    }
+}
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java b/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java
index b84c5b3..a4255a7 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2011, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2011, 2020 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -328,51 +328,61 @@
         secureConnection(request.getClient(), uc);
 
         final Object entity = request.getEntity();
-        if (entity != null) {
-            RequestEntityProcessing entityProcessing = request.resolveProperty(
-                    ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.class);
+        Exception storedException = null;
+        try {
+            if (entity != null) {
+                RequestEntityProcessing entityProcessing = request.resolveProperty(
+                        ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.class);
 
-            if (entityProcessing == null || entityProcessing != RequestEntityProcessing.BUFFERED) {
-                final long length = request.getLengthLong();
-                if (fixLengthStreaming && length > 0) {
-                    // uc.setFixedLengthStreamingMode(long) was introduced in JDK 1.7 and Jersey client supports 1.6+
-                    if ("1.6".equals(Runtime.class.getPackage().getSpecificationVersion())) {
-                        uc.setFixedLengthStreamingMode(request.getLength());
-                    } else {
+                if (entityProcessing == null || entityProcessing != RequestEntityProcessing.BUFFERED) {
+                    final long length = request.getLengthLong();
+                    if (fixLengthStreaming && length > 0) {
                         uc.setFixedLengthStreamingMode(length);
+                    } else if (entityProcessing == RequestEntityProcessing.CHUNKED) {
+                        uc.setChunkedStreamingMode(chunkSize);
                     }
-                } else if (entityProcessing == RequestEntityProcessing.CHUNKED) {
-                    uc.setChunkedStreamingMode(chunkSize);
                 }
-            }
-            uc.setDoOutput(true);
+                uc.setDoOutput(true);
 
-            if ("GET".equalsIgnoreCase(httpMethod)) {
-                final Logger logger = Logger.getLogger(HttpUrlConnector.class.getName());
-                if (logger.isLoggable(Level.INFO)) {
-                    logger.log(Level.INFO, LocalizationMessages.HTTPURLCONNECTION_REPLACES_GET_WITH_ENTITY());
+                if ("GET".equalsIgnoreCase(httpMethod)) {
+                    final Logger logger = Logger.getLogger(HttpUrlConnector.class.getName());
+                    if (logger.isLoggable(Level.INFO)) {
+                        logger.log(Level.INFO, LocalizationMessages.HTTPURLCONNECTION_REPLACES_GET_WITH_ENTITY());
+                    }
                 }
-            }
 
-            request.setStreamProvider(contentLength -> {
+                request.setStreamProvider(contentLength -> {
+                    setOutboundHeaders(request.getStringHeaders(), uc);
+                    return uc.getOutputStream();
+                });
+                request.writeEntity();
+
+            } else {
                 setOutboundHeaders(request.getStringHeaders(), uc);
-                return uc.getOutputStream();
-            });
-            request.writeEntity();
-
-        } else {
-            setOutboundHeaders(request.getStringHeaders(), uc);
+            }
+        } catch (IOException ioe) {
+            if (uc.getResponseCode() == -1) {
+                throw ioe;
+            } else {
+                storedException = ioe;
+            }
         }
 
         final int code = uc.getResponseCode();
         final String reasonPhrase = uc.getResponseMessage();
         final Response.StatusType status =
                 reasonPhrase == null ? Statuses.from(code) : Statuses.from(code, reasonPhrase);
-        final URI resolvedRequestUri;
+
+        URI resolvedRequestUri = null;
         try {
             resolvedRequestUri = uc.getURL().toURI();
         } catch (URISyntaxException e) {
-            throw new ProcessingException(e);
+            // if there is already an exception stored, the stored exception is what matters most
+            if (storedException == null) {
+                storedException = e;
+            } else {
+                storedException.addSuppressed(e);
+            }
         }
 
         ClientResponse responseContext = new ClientResponse(status, request, resolvedRequestUri);
@@ -384,7 +394,22 @@
                   .collect(Collectors.toMap(Map.Entry::getKey,
                                             Map.Entry::getValue))
         );
-        responseContext.setEntityStream(getInputStream(uc));
+
+        try {
+            InputStream inputStream = getInputStream(uc);
+            responseContext.setEntityStream(inputStream);
+        } catch (IOException ioe) {
+            // allow at least a partial response in a ResponseProcessingException
+            if (storedException == null) {
+                storedException = ioe;
+            } else {
+                storedException.addSuppressed(ioe);
+            }
+        }
+
+        if (storedException != null) {
+            throw new ClientResponseProcessingException(responseContext, storedException);
+        }
 
         return responseContext;
     }
diff --git a/tests/e2e/src/test/java/org/glassfish/jersey/tests/e2e/MessageBodyExceptionWrappingTest.java b/tests/e2e/src/test/java/org/glassfish/jersey/tests/e2e/MessageBodyExceptionWrappingTest.java
index 2b5d2e6..c2446f4 100644
--- a/tests/e2e/src/test/java/org/glassfish/jersey/tests/e2e/MessageBodyExceptionWrappingTest.java
+++ b/tests/e2e/src/test/java/org/glassfish/jersey/tests/e2e/MessageBodyExceptionWrappingTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2018 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2020 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -72,9 +72,15 @@
         try {
             Response response = resource.request().post(Entity.entity(source, MediaType.TEXT_XML_TYPE));
             fail("Exception expected, instead response with " + response.getStatus() + " status has been returned.");
-        } catch (ProcessingException e) {
-            assertEquals(WebApplicationException.class, e.getCause().getClass());
-            assertEquals(555, ((WebApplicationException) e.getCause()).getResponse().getStatus());
+        } catch (WebApplicationException e) {
+            assertEquals(555, e.getResponse().getStatus());
+        }
+
+        try {
+            Response response = resource.request().post(Entity.entity(source, MediaType.TEXT_XML_TYPE), Response.class);
+            fail("Exception expected, instead response with " + response.getStatus() + " status has been returned.");
+        } catch (WebApplicationException e) {
+            assertEquals(555, e.getResponse().getStatus());
         }
     }
 
diff --git a/tests/integration/jersey-4003/pom.xml b/tests/integration/jersey-4003/pom.xml
new file mode 100644
index 0000000..ae005fc
--- /dev/null
+++ b/tests/integration/jersey-4003/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+
+    This program and the accompanying materials are made available under the
+    terms of the Eclipse Public License v. 2.0, which is available at
+    http://www.eclipse.org/legal/epl-2.0.
+
+    This Source Code may also be made available under the following Secondary
+    Licenses when the conditions for such availability set forth in the
+    Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+    version 2 with the GNU Classpath Exception, which is available at
+    https://www.gnu.org/software/classpath/license.html.
+
+    SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>project</artifactId>
+        <groupId>org.glassfish.jersey.tests.integration</groupId>
+        <version>2.31-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>jersey-4003</artifactId>
+    <name>jersey-tests-integration-jersey-4003</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+            <artifactId>jersey-test-framework-provider-bundle</artifactId>
+            <type>pom</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/tests/integration/jersey-4003/src/test/java/org/glassfish/jersey/tests/integration/jersey4003/LostResponseTest.java b/tests/integration/jersey-4003/src/test/java/org/glassfish/jersey/tests/integration/jersey4003/LostResponseTest.java
new file mode 100644
index 0000000..2ba8042
--- /dev/null
+++ b/tests/integration/jersey-4003/src/test/java/org/glassfish/jersey/tests/integration/jersey4003/LostResponseTest.java
@@ -0,0 +1,260 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.tests.integration.jersey4003;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.HttpUrlConnectorProvider;
+import org.glassfish.jersey.client.JerseyClientBuilder;
+import org.glassfish.jersey.client.JerseyCompletionStageRxInvoker;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.mockito.Mockito;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.client.ResponseProcessingException;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class LostResponseTest {
+
+    private static final String DUMMY_URL = "http://foo";
+    private static final int RESPONSE_CODE = 503;
+
+    private Client client;
+    private Entity<?> bodyEntity;
+
+    @Before
+    public void setup() throws IOException {
+        HttpUrlConnectorProvider.ConnectionFactory connectionFactory =
+                Mockito.mock(HttpUrlConnectorProvider.ConnectionFactory.class);
+        HttpURLConnection connection = Mockito.mock(HttpURLConnection.class);
+        Mockito.when(connectionFactory.getConnection(Mockito.any(URL.class))).thenReturn(connection);
+
+        OutputStream outputStream = Mockito.mock(OutputStream.class);
+        Mockito.when(connection.getOutputStream()).thenReturn(outputStream);
+
+        Mockito.when(connection.getURL()).thenReturn(new URL(DUMMY_URL));
+        Mockito.when(connection.getResponseCode()).thenReturn(RESPONSE_CODE);
+
+        // When the below line is commented, the test succeeds.
+        Mockito.doThrow(new IOException("Injected Write Failure"))
+                .when(outputStream)
+                .write(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt());
+
+        ClientConfig clientConfig = new ClientConfig();
+        clientConfig.connectorProvider(
+                new HttpUrlConnectorProvider().connectionFactory(connectionFactory));
+        client = JerseyClientBuilder.newBuilder().withConfig(clientConfig).build();
+
+        ByteArrayInputStream bodyStream = new ByteArrayInputStream(new byte[100]);
+        bodyEntity = Entity.entity(bodyStream, MediaType.APPLICATION_OCTET_STREAM_TYPE);
+    }
+
+    @Test
+    public void putEntityFailure() {
+        try {
+            client.target(DUMMY_URL).request().put(bodyEntity);
+            Assert.fail("Expected ResponseProcessing exception has not been thrown");
+        } catch (ResponseProcessingException rpe) {
+            try (Response response = rpe.getResponse()) {
+                Assert.assertEquals(RESPONSE_CODE, response.getStatus());
+            }
+        }
+    }
+
+    @Test
+    public void putEntityAndClassTypeFailure() {
+        try {
+            client.target(DUMMY_URL).request().put(bodyEntity, String.class);
+            Assert.fail("Expected ResponseProcessing exception has not been thrown");
+        } catch (ResponseProcessingException rpe) {
+            try (Response response = rpe.getResponse()) {
+                Assert.assertEquals(RESPONSE_CODE, response.getStatus());
+            }
+        }
+    }
+
+    @Test
+    public void putEntityAndGenericTypeTypeFailure() {
+        try {
+            client.target(DUMMY_URL).request().put(bodyEntity, new GenericType<String>(){});
+            Assert.fail("Expected ResponseProcessing exception has not been thrown");
+        } catch (ResponseProcessingException rpe) {
+            try (Response response = rpe.getResponse()) {
+                Assert.assertEquals(RESPONSE_CODE, response.getStatus());
+            }
+        }
+    }
+
+    @Test
+    public void asyncPutEntityFailure() throws InterruptedException {
+        try {
+            Future<Response> future = client.target(DUMMY_URL).request().async().put(bodyEntity);
+            future.get();
+            Assert.fail("Expected ResponseProcessing exception has not been thrown");
+        } catch (ExecutionException ee) {
+            try {
+                throw (RuntimeException) ee.getCause();
+            } catch (ResponseProcessingException rpe) {
+                try (Response response = rpe.getResponse()) {
+                    Assert.assertEquals(RESPONSE_CODE, response.getStatus());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void asyncPutEntityAndClassFailure() throws InterruptedException {
+        try {
+            Future<String> future = client.target(DUMMY_URL).request().async().put(bodyEntity, String.class);
+            future.get();
+            Assert.fail("Expected ResponseProcessing exception has not been thrown");
+        } catch (ExecutionException ee) {
+            try {
+                throw (RuntimeException) ee.getCause();
+            } catch (ResponseProcessingException rpe) {
+                try (Response response = rpe.getResponse()) {
+                    Assert.assertEquals(RESPONSE_CODE, response.getStatus());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void asyncPutEntityAndGenericTypeTypeFailure() throws InterruptedException {
+        try {
+            Future<String> future = client.target(DUMMY_URL).request().async().put(bodyEntity, new GenericType<String>(){});
+            future.get();
+            Assert.fail("Expected ResponseProcessing exception has not been thrown");
+        } catch (ExecutionException ee) {
+            try {
+                throw (RuntimeException) ee.getCause();
+            } catch (ResponseProcessingException rpe) {
+                try (Response response = rpe.getResponse()) {
+                    Assert.assertEquals(RESPONSE_CODE, response.getStatus());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void asyncPutEntityWithCallbackFailure() throws InterruptedException {
+        AtomicReference<Throwable> callbackThrowable = new AtomicReference<>();
+        CountDownLatch failedLatch = new CountDownLatch(1);
+        try {
+            Future<Response> future =
+                    client.target(DUMMY_URL).request().async().put(bodyEntity, new InvocationCallback<Response>() {
+                @Override
+                public void completed(Response response) {
+                    Assert.fail("Expected ResponseProcessing exception has not been thrown");
+                }
+
+                @Override
+                public void failed(Throwable throwable) {
+                    callbackThrowable.set(throwable);
+                    failedLatch.countDown();
+                }
+            });
+            future.get();
+            Assert.fail("Expected ResponseProcessing exception has not been thrown");
+        } catch (ExecutionException ee) {
+            try {
+                throw (RuntimeException) ee.getCause();
+            } catch (ResponseProcessingException rpe) {
+                try (Response response = rpe.getResponse()) {
+                    Assert.assertEquals(RESPONSE_CODE, response.getStatus());
+                }
+            }
+            failedLatch.await(5000, TimeUnit.MILLISECONDS);
+            Throwable ct = callbackThrowable.get();
+            Assert.assertTrue("Callback has not been hit", ct != null);
+            Assert.assertTrue("The exception is " + ct.getClass().getName(),
+                    ResponseProcessingException.class.isInstance(ct));
+        }
+    }
+
+    @Test
+    public void rxPutEntityFailure() throws InterruptedException {
+        try {
+            CompletionStage<Response> future = client.target(DUMMY_URL).request().rx().put(bodyEntity);
+            future.toCompletableFuture().get();
+            Assert.fail("Expected ResponseProcessing exception has not been thrown");
+        } catch (ExecutionException ee) {
+            try {
+                throw (RuntimeException) ee.getCause();
+            } catch (ResponseProcessingException rpe) {
+                try (Response response = rpe.getResponse()) {
+                    Assert.assertEquals(RESPONSE_CODE, response.getStatus());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void rxPutEntityWithCallbackFailure() throws InterruptedException {
+        AtomicReference<Throwable> callbackThrowable = new AtomicReference<>();
+        CountDownLatch failedLatch = new CountDownLatch(1);
+        try {
+            Future<Response> future =
+                    client.target(DUMMY_URL).request().rx(JerseyCompletionStageRxInvoker.class)
+                            .put(bodyEntity, new InvocationCallback<Response>() {
+                        @Override
+                        public void completed(Response response) {
+                            Assert.fail("Expected ResponseProcessing exception has not been thrown");
+                        }
+
+                        @Override
+                        public void failed(Throwable throwable) {
+                            callbackThrowable.set(throwable);
+                            failedLatch.countDown();
+                        }
+                    });
+            future.get();
+            Assert.fail("Expected ResponseProcessing exception has not been thrown");
+        } catch (ExecutionException ee) {
+            try {
+                throw (RuntimeException) ee.getCause();
+            } catch (ResponseProcessingException rpe) {
+                try (Response response = rpe.getResponse()) {
+                    Assert.assertEquals(RESPONSE_CODE, response.getStatus());
+                }
+            }
+            failedLatch.await(5000, TimeUnit.MILLISECONDS);
+            Throwable ct = callbackThrowable.get();
+            Assert.assertTrue("Callback has not been hit", ct != null);
+            Assert.assertTrue("The exception is " + ct.getClass().getName(),
+                    ResponseProcessingException.class.isInstance(ct));
+        }
+    }
+}
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 0fbda7e..76a043b 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -82,6 +82,7 @@
         <module>jersey-780</module>
         <module>jersey-3670</module>
         <module>jersey-3992</module>
+        <module>jersey-4003</module>
         <module>jersey-4099</module>
         <module>jersey-4321</module>
         <module>jetty-response-close</module>