Connector to Helidon 2 Web Client

Signed-off-by: Jan Supol <jan.supol@oracle.com>
diff --git a/connectors/helidon-connector/pom.xml b/connectors/helidon-connector/pom.xml
new file mode 100644
index 0000000..10d1af6
--- /dev/null
+++ b/connectors/helidon-connector/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+
+    This program and the accompanying materials are made available under the
+    terms of the Eclipse Public License v. 2.0, which is available at
+    http://www.eclipse.org/legal/epl-2.0.
+
+    This Source Code may also be made available under the following Secondary
+    Licenses when the conditions for such availability set forth in the
+    Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+    version 2 with the GNU Classpath Exception, which is available at
+    https://www.gnu.org/software/classpath/license.html.
+
+    SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>project</artifactId>
+        <groupId>org.glassfish.jersey.connectors</groupId>
+        <version>2.31-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>jersey-helidon-connector</artifactId>
+    <packaging>jar</packaging>
+    <name>jersey-connectors-helidon</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.helidon.webclient</groupId>
+            <artifactId>helidon-webclient</artifactId>
+            <!-- Dear user, please use more stable version -->
+            <version>2.0.0-M3</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+            <artifactId>jersey-test-framework-provider-grizzly2</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.media</groupId>
+            <artifactId>jersey-media-sse</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>com.sun.istack</groupId>
+                <artifactId>istack-commons-maven-plugin</artifactId>
+                <inherited>true</inherited>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <inherited>true</inherited>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <inherited>false</inherited>
+                <configuration>
+                    <source>11</source>
+                    <target>11</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnector.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnector.java
new file mode 100644
index 0000000..6e8e968
--- /dev/null
+++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnector.java
@@ -0,0 +1,249 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import io.helidon.common.Version;
+import io.helidon.webclient.WebClient;
+import io.helidon.webclient.WebClientRequestBuilder;
+import io.helidon.webclient.WebClientResponse;
+import org.glassfish.jersey.client.ClientAsyncExecutorLiteral;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientRequest;
+import org.glassfish.jersey.client.ClientResponse;
+import org.glassfish.jersey.client.JerseyClient;
+import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
+import org.glassfish.jersey.client.spi.Connector;
+import org.glassfish.jersey.internal.util.PropertiesHelper;
+import org.glassfish.jersey.spi.ExecutorServiceProvider;
+
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.core.Configuration;
+import javax.ws.rs.core.Response;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.logging.Logger;
+
+/**
+ * A {@link Connector} that utilizes the Helidon HTTP Client to send and receive
+ * HTTP request and responses.
+ */
+class HelidonConnector implements Connector {
+
+    private static final String helidonVersion = "Helidon/" + Version.VERSION + " (java " + AccessController
+            .doPrivileged(PropertiesHelper.getSystemProperty("java.runtime.version")) + ")";
+    static final Logger LOGGER = Logger.getLogger(HelidonConnector.class.getName());
+
+    private final WebClient webClient;
+
+    private final ExecutorServiceKeeper executorServiceKeeper;
+    private final HelidonEntity.HelidonEntityType entityType;
+
+    private static final InputStream NO_CONTENT_INPUT_STREAM = new InputStream() {
+        @Override
+        public int read() throws IOException {
+            return -1;
+        }
+    };
+    // internal implementation entity type, can be removed in the future
+    // settable for testing purposes
+    private static final String INTERNAL_ENTITY_TYPE = "jersey.config.helidon.client.entity.type";
+
+    HelidonConnector(final Client client, final Configuration config) {
+        executorServiceKeeper = new ExecutorServiceKeeper(client);
+        entityType = getEntityType(config);
+
+        final WebClient.Builder webClientBuilder = WebClient.builder();
+
+        webClientBuilder.addReader(HelidonStructures.createInputStreamBodyReader());
+        HelidonEntity.helidonWriter(entityType).ifPresent(webClientBuilder::addWriter);
+
+        HelidonStructures.createProxy(config).ifPresent(webClientBuilder::proxy);
+
+        HelidonStructures.helidonConfig(config).ifPresent(webClientBuilder::config);
+
+        webClientBuilder.connectTimeout(ClientProperties.getValue(config.getProperties(),
+                ClientProperties.CONNECT_TIMEOUT, 10000), ChronoUnit.MILLIS);
+
+        HelidonStructures.createSSL(client.getSslContext()).ifPresent(webClientBuilder::ssl);
+
+        webClient = webClientBuilder.build();
+    }
+
+    @Override
+    public ClientResponse apply(ClientRequest request) {
+        try {
+            return applyInternal(request).toCompletableFuture().get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new ProcessingException(e);
+        }
+    }
+
+    @Override
+    public Future<?> apply(ClientRequest request, AsyncConnectorCallback callback) {
+        final BiConsumer<? super ClientResponse, ? super Throwable> action = (r, th) -> {
+            if (th == null) callback.response(r);
+            else callback.failure(th);
+        };
+        return applyInternal(request)
+                .whenCompleteAsync(action, executorServiceKeeper.getExecutorService(request))
+                .toCompletableFuture();
+    }
+
+    @Override
+    public String getName() {
+        return helidonVersion;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    private CompletionStage<ClientResponse> applyInternal(ClientRequest request) {
+        final WebClientRequestBuilder webClientRequestBuilder = webClient.method(request.getMethod());
+        webClientRequestBuilder.uri(request.getUri());
+
+        webClientRequestBuilder.headers(HelidonStructures.createHeaders(request.getRequestHeaders()));
+
+        for (String propertyName : request.getConfiguration().getPropertyNames()) {
+            Object property = request.getConfiguration().getProperty(propertyName);
+            if (!propertyName.startsWith("jersey") && String.class.isInstance(property)) {
+                webClientRequestBuilder.property(propertyName, (String) property);
+            }
+        }
+
+        for (String propertyName : request.getPropertyNames()) {
+            Object property = request.resolveProperty(propertyName, null);
+            if (!propertyName.startsWith("jersey") && String.class.isInstance(property)) {
+                webClientRequestBuilder.property(propertyName, (String) property);
+            }
+        }
+
+        // 2.0.0-M3
+        // HelidonStructures.createProxy(request).ifPresent(webClientRequestBuilder::proxy);
+
+        webClientRequestBuilder.followRedirects(request.resolveProperty(ClientProperties.FOLLOW_REDIRECTS, true));
+        webClientRequestBuilder.readTimeout(request.resolveProperty(ClientProperties.READ_TIMEOUT, 10000), ChronoUnit.MILLIS);
+
+        CompletionStage<WebClientResponse> responseStage = null;
+
+        if (request.hasEntity()) {
+            responseStage = HelidonEntity.submit(
+                    entityType, request, webClientRequestBuilder, executorServiceKeeper.getExecutorService(request)
+            );
+        } else {
+            responseStage = webClientRequestBuilder.submit();
+        }
+
+        return responseStage.thenCompose((a) -> convertResponse(request, a));
+    }
+
+    private CompletionStage<ClientResponse> convertResponse(final ClientRequest requestContext,
+                                                            final WebClientResponse webClientResponse) {
+
+        final ClientResponse responseContext = new ClientResponse(new Response.StatusType() {
+            @Override
+            public int getStatusCode() {
+                return webClientResponse.status().code();
+            }
+
+            @Override
+            public Response.Status.Family getFamily() {
+                return Response.Status.Family.familyOf(getStatusCode());
+            }
+
+            @Override
+            public String getReasonPhrase() {
+                return webClientResponse.status().reasonPhrase();
+            }
+        }, requestContext);
+
+        for (Map.Entry<String, List<String>> entry : webClientResponse.headers().toMap().entrySet()) {
+            for (String value : entry.getValue()) {
+                responseContext.getHeaders().add(entry.getKey(), value);
+            }
+        }
+
+        responseContext.setResolvedRequestUri(webClientResponse.lastEndpointURI());
+
+        final CompletionStage<InputStream> stream = HelidonStructures.hasEntity(webClientResponse)
+                ? webClientResponse.content().as(InputStream.class)
+                : CompletableFuture.supplyAsync(() -> NO_CONTENT_INPUT_STREAM);
+
+        return stream.thenApply((a) -> {
+            responseContext.setEntityStream(new FilterInputStream(a) {
+                private final AtomicBoolean closed = new AtomicBoolean(false);
+
+                @Override
+                public void close() throws IOException {
+                    // Avoid idempotent close in the underlying input stream
+                    if (!closed.compareAndSet(false, true)) {
+                        super.close();
+                    }
+                }
+            });
+            return responseContext;
+        });
+    }
+
+    private static HelidonEntity.HelidonEntityType getEntityType(final Configuration config) {
+        final String helidonType = ClientProperties.getValue(config.getProperties(),
+                INTERNAL_ENTITY_TYPE, HelidonEntity.HelidonEntityType.READABLE_BYTE_CHANNEL.name());
+        final HelidonEntity.HelidonEntityType entityType = HelidonEntity.HelidonEntityType.valueOf(helidonType);
+
+//        if (entityType != HelidonEntity.HelidonEntityType.READABLE_BYTE_CHANNEL) {
+//            // log warning for internal feature - no localization.properties
+//            LOGGER.warning(INTERNAL_ENTITY_TYPE + " is " + entityType.name());
+//        }
+
+        return entityType;
+    }
+
+    private static class ExecutorServiceKeeper {
+        private Optional<ExecutorService> executorService;
+
+        private ExecutorServiceKeeper(Client client) {
+            final ClientConfig config = ((JerseyClient) client).getConfiguration();
+            executorService = Optional.ofNullable(config.getExecutorService());
+        }
+
+        private ExecutorService getExecutorService(ClientRequest request) {
+            if (!executorService.isPresent()) {
+                // cache for multiple requests
+                executorService = Optional.ofNullable(request.getInjectionManager()
+                        .getInstance(ExecutorServiceProvider.class, ClientAsyncExecutorLiteral.INSTANCE).getExecutorService());
+            }
+
+            return executorService.get();
+        }
+    }
+}
diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnectorProvider.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnectorProvider.java
new file mode 100644
index 0000000..36092f4
--- /dev/null
+++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnectorProvider.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.Beta;
+import org.glassfish.jersey.client.spi.Connector;
+import org.glassfish.jersey.client.spi.ConnectorProvider;
+import org.glassfish.jersey.internal.util.JdkVersion;
+
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.core.Configuration;
+import java.io.OutputStream;
+
+/**
+ * Provider for Helidon WebClient {@link Connector} that utilizes the Helidon HTTP Client to send and receive
+ * HTTP request and responses. JDK 8 is not supported by the Helidon Connector.
+ * <p/>
+ * The following properties are only supported at construction of this class:
+ * <ul>
+ * <li>{@link org.glassfish.jersey.client.ClientProperties#CONNECT_TIMEOUT}</li>
+ * <li>{@link org.glassfish.jersey.client.ClientProperties#FOLLOW_REDIRECTS}</li>
+ * <li>{@link org.glassfish.jersey.client.ClientProperties#PROXY_URI}</li>
+ * <li>{@link org.glassfish.jersey.client.ClientProperties#PROXY_USERNAME}</li>
+ * <li>{@link org.glassfish.jersey.client.ClientProperties#PROXY_PASSWORD}</li>
+ * <li>{@link org.glassfish.jersey.client.ClientProperties#READ_TIMEOUT}</li>
+ * <li>{@link HelidonProperties#CONFIG}</li>
+ * </ul>
+ * <p>
+ * If a {@link org.glassfish.jersey.client.ClientResponse} is obtained and an
+ * entity is not read from the response then
+ * {@link org.glassfish.jersey.client.ClientResponse#close()} MUST be called
+ * after processing the response to release connection-based resources.
+ * </p>
+ * <p>
+ * Client operations are thread safe, the HTTP connection may
+ * be shared between different threads.
+ * </p>
+ * <p>
+ * If a response entity is obtained that is an instance of {@link java.io.Closeable}
+ * then the instance MUST be closed after processing the entity to release
+ * connection-based resources.
+ * </p>
+ * <p>
+ * This connector uses {@link org.glassfish.jersey.client.ClientProperties#OUTBOUND_CONTENT_LENGTH_BUFFER} to buffer the entity
+ * written for instance by {@link javax.ws.rs.core.StreamingOutput}. Should the buffer be small and
+ * {@link javax.ws.rs.core.StreamingOutput#write(OutputStream)} be called many times, the performance can drop. The Content-Length
+ * or the Content_Encoding header is set by the underlaying Helidon WebClient regardless of the
+ * {@link org.glassfish.jersey.client.ClientProperties#OUTBOUND_CONTENT_LENGTH_BUFFER} size, however.
+ * </p>
+ *
+ * @since 2.31
+ */
+@Beta
+public class HelidonConnectorProvider implements ConnectorProvider {
+    @Override
+    public Connector getConnector(Client client, Configuration runtimeConfig) {
+        if (JdkVersion.getJdkVersion().getMajor() < 11) {
+            throw new ProcessingException(LocalizationMessages.NOT_SUPPORTED());
+        }
+        return new HelidonConnector(client, runtimeConfig);
+    }
+}
diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonEntity.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonEntity.java
new file mode 100644
index 0000000..386cb62
--- /dev/null
+++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonEntity.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import io.helidon.common.GenericType;
+import io.helidon.common.http.DataChunk;
+import io.helidon.common.http.MediaType;
+import io.helidon.common.reactive.Multi;
+import io.helidon.common.reactive.OutputStreamPublisher;
+import io.helidon.common.reactive.Single;
+import io.helidon.media.common.ByteChannelBodyWriter;
+import io.helidon.media.common.ContentWriters;
+import io.helidon.media.common.MessageBodyContext;
+import io.helidon.media.common.MessageBodyWriter;
+import io.helidon.media.common.MessageBodyWriterContext;
+import io.helidon.webclient.WebClientRequestBuilder;
+import io.helidon.webclient.WebClientResponse;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientRequest;
+
+import javax.ws.rs.ProcessingException;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Flow;
+import java.util.function.Function;
+
+/**
+ * A utility class that converts outbound client entity to a class understandable by Helidon.
+ * Based on the {@link HelidonEntityType} an entity writer is provided to be registered by Helidon client
+ * and an Entity is provided to be submitted by the Helidon Client.
+ */
+class HelidonEntity {
+    /**
+     * HelidonEnity type chosen by HelidonEntityType
+     */
+    enum HelidonEntityType {
+        /**
+         * Simplest structure. Loads all data to the memory.
+         */
+        BYTE_ARRAY_OUTPUT_STREAM,
+        /**
+         * Readable ByteChannel that is capable of sending data in chunks.
+         * Capable of caching of bytes before the data are consumed by Helidon.
+         */
+        READABLE_BYTE_CHANNEL,
+        /**
+         * Helidon most native entity. Could be slower than {@link #READABLE_BYTE_CHANNEL}.
+         */
+        OUTPUT_STREAM_PUBLISHER
+    }
+
+    /**
+     * Get optional entity writer to be registered by the Helidon Client. For some default providers,
+     * nothing is needed to be registered.
+     * @param type the type of the entity class that works best for the Http Client request use case.
+     * @return possible writer to be registerd by the Helidon Client.
+     */
+    static Optional<MessageBodyWriter<?>> helidonWriter(HelidonEntityType type) {
+        switch (type) {
+            case BYTE_ARRAY_OUTPUT_STREAM:
+                return Optional.of(new OutputStreamBodyWriter());
+            case OUTPUT_STREAM_PUBLISHER:
+                //Helidon default
+                return Optional.empty();
+            case READABLE_BYTE_CHANNEL:
+                return Optional.of(ByteChannelBodyWriter.create());
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Convert Jersey {@code OutputStream} to an entity based on the client request use case and submits to the provided
+     * {@code WebClientRequestBuilder}.
+     * @param type the type of the Helidon entity.
+     * @param requestContext Jersey {@link ClientRequest} providing the entity {@code OutputStream}.
+     * @param requestBuilder Helidon {@code WebClientRequestBuilder} which is used to submit the entity
+     * @param executorService {@link ExecutorService} that fills the entity instance for Helidon with data from Jersey
+     *                      {@code OutputStream}.
+     * @return Helidon Client response completion stage.
+     */
+    static CompletionStage<WebClientResponse> submit(HelidonEntityType type,
+                                                     ClientRequest requestContext,
+                                                     WebClientRequestBuilder requestBuilder,
+                                                     ExecutorService executorService) {
+        CompletionStage<WebClientResponse> stage = null;
+        if (type != null) {
+            final int bufferSize = requestContext.resolveProperty(
+                    ClientProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 8192);
+            switch (type) {
+                case BYTE_ARRAY_OUTPUT_STREAM:
+                    final ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
+                    requestContext.setStreamProvider(contentLength -> baos);
+                    ((ProcessingRunnable) () -> requestContext.writeEntity()).run();
+                    stage = requestBuilder.submit(baos);
+                    break;
+                case READABLE_BYTE_CHANNEL:
+                    final OutputStreamChannel channel = new OutputStreamChannel(bufferSize);
+                    requestContext.setStreamProvider(contentLength -> channel);
+                    executorService.execute((ProcessingRunnable) () -> requestContext.writeEntity());
+                    stage = requestBuilder.submit(channel);
+                    break;
+                case OUTPUT_STREAM_PUBLISHER:
+                    final OutputStreamPublisher publisher = new OutputStreamPublisher();
+                    requestContext.setStreamProvider(contentLength -> publisher);
+                    executorService.execute((ProcessingRunnable) () -> {
+                        requestContext.writeEntity();
+                        publisher.close();
+                    });
+                    stage = requestBuilder.submit(Multi.from(publisher).map(DataChunk::create));
+                    break;
+            }
+        }
+        return stage;
+    }
+
+    @FunctionalInterface
+    private interface ProcessingRunnable extends Runnable {
+        void runOrThrow() throws IOException;
+
+        @Override
+        default void run() {
+            try {
+                runOrThrow();
+            } catch (IOException e) {
+                throw new ProcessingException(LocalizationMessages.ERROR_WRITING_ENTITY(e.getMessage()), e);
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static class OutputStreamBodyWriter implements MessageBodyWriter {
+        private OutputStreamBodyWriter() {
+        }
+
+        @Override
+        public Flow.Publisher<DataChunk> write(Single content, GenericType type, MessageBodyWriterContext context) {
+            context.contentType(MediaType.APPLICATION_OCTET_STREAM);
+            return content.flatMap(new ByteArrayOutputStreamToChunks());
+        }
+
+        @Override
+        public boolean accept(GenericType type, MessageBodyContext context) {
+            return ByteArrayOutputStream.class.isAssignableFrom(type.rawType());
+        }
+
+        private static class ByteArrayOutputStreamToChunks implements Function<ByteArrayOutputStream, Flow.Publisher<DataChunk>> {
+            @Override
+            public Flow.Publisher<DataChunk> apply(ByteArrayOutputStream byteArrayOutputStream) {
+                return ContentWriters.writeBytes(byteArrayOutputStream.toByteArray(), false);
+            }
+        }
+    }
+}
diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonProperties.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonProperties.java
new file mode 100644
index 0000000..62d0dbd
--- /dev/null
+++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonProperties.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.internal.util.PropertiesClass;
+
+import io.helidon.config.Config;
+import io.helidon.webclient.WebClient;
+
+/**
+ * Configuration options specific to the Client API that utilizes {@link HelidonConnector}
+ */
+@PropertiesClass
+public final class HelidonProperties {
+
+    /**
+     * A Helidon {@link Config} instance that is passed to {@link WebClient.Builder#config(Config)} if available
+     */
+    public static final String CONFIG = "jersey.config.helidon.client.config";
+}
diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonStructures.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonStructures.java
new file mode 100644
index 0000000..fbaffec
--- /dev/null
+++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonStructures.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import io.helidon.common.http.Headers;
+import io.helidon.common.http.Http;
+import io.helidon.common.http.ReadOnlyParameters;
+import io.helidon.config.Config;
+import io.helidon.config.ConfigSources;
+import io.helidon.media.common.InputStreamBodyReader;
+import io.helidon.media.common.MessageBodyReader;
+import io.helidon.webclient.Proxy;
+import io.helidon.webclient.Ssl;
+import io.helidon.webclient.WebClientResponse;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientRequest;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.core.Configuration;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Helidon specific classes and implementations.
+ */
+class HelidonStructures {
+
+    static Headers createHeaders(Map<String, List<String>> data) {
+        return new ReadOnlyHeaders(data);
+    }
+
+    static MessageBodyReader<InputStream> createInputStreamBodyReader() {
+        return InputStreamBodyReader.create();
+    }
+
+    static Optional<Config> helidonConfig(Configuration configuration) {
+        final Object helidonConfig = configuration.getProperty(HelidonProperties.CONFIG);
+        if (helidonConfig != null) {
+            if (!Config.class.isInstance(helidonConfig)) {
+                HelidonConnector.LOGGER.warning(LocalizationMessages.NOT_HELIDON_CONFIG(helidonConfig.getClass().getName()));
+                return Optional.empty();
+            } else {
+                return Optional.of((Config) helidonConfig);
+            }
+        }
+        return Optional.empty();
+    }
+
+    static Optional<Proxy> createProxy(Configuration config) {
+        return ProxyBuilder.createProxy(config);
+    }
+
+    static Optional<Proxy> createProxy(ClientRequest request) {
+        return ProxyBuilder.createProxy(request);
+    }
+
+    static Optional<Ssl> createSSL(SSLContext context) {
+        return context == null ? Optional.empty() : Optional.of(Ssl.builder().sslContext(context).build());
+    }
+
+    static boolean hasEntity(WebClientResponse webClientResponse) {
+        final ReadOnlyParameters headers = webClientResponse.content().readerContext().headers();
+        final Optional<String> contentLenth = headers.first(Http.Header.CONTENT_LENGTH);
+        final Optional<String> encoding = headers.first(Http.Header.TRANSFER_ENCODING);
+
+        return ((contentLenth.isPresent() && !contentLenth.get().equals("0"))
+                || (encoding.isPresent() && encoding.get().equals(HttpHeaderValues.CHUNKED.toString())));
+    }
+
+    private static class ReadOnlyHeaders extends ReadOnlyParameters implements Headers {
+        public ReadOnlyHeaders(Map<String, List<String>> data) {
+            super(data);
+        }
+    }
+
+    private static class ProxyBuilder {
+        private static Optional<Proxy> createProxy(Configuration config) {
+            final Object proxyUri = config.getProperty(ClientProperties.PROXY_URI);
+            final String userName
+                    = ClientProperties.getValue(config.getProperties(), ClientProperties.PROXY_USERNAME, String.class);
+            final String password
+                    = ClientProperties.getValue(config.getProperties(), ClientProperties.PROXY_PASSWORD, String.class);
+            return createProxy(proxyUri, userName, password);
+        }
+
+        private static Optional<Proxy> createProxy(ClientRequest clientRequest) {
+            final Object proxyUri = clientRequest.resolveProperty(ClientProperties.PROXY_URI, Object.class);
+            final String userName = clientRequest.resolveProperty(ClientProperties.PROXY_USERNAME, String.class);
+            final String password = clientRequest.resolveProperty(ClientProperties.PROXY_PASSWORD, String.class);
+            return createProxy(proxyUri, userName, password);
+        }
+
+        private static Optional<Proxy> createProxy(Object proxyUri, String userName, String password) {
+            if (proxyUri != null) {
+                final URI u = getProxyUri(proxyUri);
+                final Proxy.Builder builder = Proxy.builder();
+                Map<String, String> proxyMap;
+                if (u.getScheme().toUpperCase(Locale.ROOT).equals("DIRECT")) {
+                    proxyMap = Map.of("type", "NONE");
+                    //builder.type(Proxy.ProxyType.NONE);
+                } else {
+                    builder.host(u.getHost()).port(u.getPort());
+                    switch (u.getScheme().toUpperCase(Locale.ROOT)) {
+                        case "HTTP":
+                            proxyMap = Map.of("type", "HTTP");
+                            //builder.type(Proxy.ProxyType.HTTP);
+                            break;
+                        case "SOCKS":
+                            proxyMap = Map.of("type", "SOCKS_4");
+                            //builder.type(Proxy.ProxyType.SOCKS_4);
+                            break;
+                        case "SOCKS5":
+                            proxyMap = Map.of("type", "SOCKS_5");
+                            //builder.type(Proxy.ProxyType.SOCKS_5);
+                            break;
+                        default:
+                            HelidonConnector.LOGGER.warning(LocalizationMessages.UNSUPPORTED_PROXY_SCHEMA(u.getScheme()));
+                            return Optional.empty();
+                    }
+                    builder.config(Config.create(ConfigSources.create(proxyMap)));
+                }
+                if (userName != null) {
+                    builder.username(userName);
+
+                    if (password != null) {
+                        builder.password(password.toCharArray());
+                    }
+                }
+                return Optional.of(builder.build());
+            } else {
+                return Optional.empty();
+            }
+        }
+
+        private static URI getProxyUri(final Object proxy) {
+            if (proxy instanceof URI) {
+                return (URI) proxy;
+            } else if (proxy instanceof String) {
+                return URI.create((String) proxy);
+            } else {
+                throw new ProcessingException(LocalizationMessages.WRONG_PROXY_URI_TYPE(ClientProperties.PROXY_URI));
+            }
+        }
+    }
+}
diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/OutputStreamChannel.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/OutputStreamChannel.java
new file mode 100644
index 0000000..152ac7f
--- /dev/null
+++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/OutputStreamChannel.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+class OutputStreamChannel extends OutputStream implements ReadableByteChannel {
+
+    private ReentrantLock lock = new ReentrantLock();
+    private static final ByteBuffer VOID = ByteBuffer.allocate(0);
+    private static final int CAPACITY = Integer.getInteger("jersey.helidon.connector.osc.capacity", 8);
+    private static final int WRITE_TIMEOUT = Integer.getInteger("jersey.helidon.connector.osc.read.timeout", 10000);
+    private static final int READ_TIMEOUT = Integer.getInteger("jersey.helidon.connector.osc.write.timeout", 10000);
+    private final int bufferSize;
+
+    OutputStreamChannel(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+    private final LinkedBlockingDeque<ByteBuffer> queue = new LinkedBlockingDeque<>(CAPACITY);
+
+    private volatile boolean open = true;
+    private ByteBuffer remainingByteBuffer;
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+        if (!open) {
+            throw new ClosedChannelException();
+        }
+
+        int sum = 0;
+
+        do {
+            ByteBuffer top;
+            try {
+                top = poll(READ_TIMEOUT, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                open = false;
+                throw new ClosedByInterruptException();
+            }
+
+            if (top == null) {
+                return sum;
+            }
+
+            if (top == VOID) {
+                if (sum == 0) {
+                    open = false;
+                    return -1;
+                } else {
+                    queue.addFirst(top);
+                    return sum;
+                }
+            }
+
+            final int topSize = top.remaining();
+            final int dstAvailable = dst.remaining();
+            final int minSize = Math.min(topSize, dstAvailable);
+
+            if (top.hasArray()) {
+                dst.put(top.array(), top.arrayOffset() + top.position(), minSize);
+                top.position(top.position() + minSize);
+            } else {
+                while (dst.hasRemaining() && top.hasRemaining()) {
+                    dst.put(top.get());
+                }
+            }
+
+            sum += minSize;
+
+            if (top.hasRemaining()) {
+                remainingByteBuffer = top;
+            }
+        } while (dst.hasRemaining());
+
+        return sum;
+    }
+
+    private ByteBuffer poll(long timeout, TimeUnit unit) throws InterruptedException {
+        if (remainingByteBuffer != null) {
+            final ByteBuffer remaining = remainingByteBuffer;
+            remainingByteBuffer = null;
+            return remaining;
+        } else {
+            // do not modify head
+            lock.lock();
+            final ByteBuffer peek = queue.poll(timeout, unit);
+            // can modify head
+            lock.unlock();
+            return peek;
+        }
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        write(new byte[]{(byte) b}, 0, 1);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        super.write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        checkClosed();
+
+        if (lock.tryLock()) {
+            if (len < bufferSize && queue.size() > 0) {
+                final ByteBuffer buffer = queue.getLast();
+                if (buffer != null && (buffer.capacity() - buffer.limit()) > len) {
+                    //set for write
+                    buffer.position(buffer.limit());
+                    buffer.limit(buffer.capacity());
+                    buffer.put(b, off, len);
+                    //set for read
+                    buffer.flip();
+                    lock.unlock();
+                    return;
+                }
+            }
+            lock.unlock();
+        }
+
+        final int maxLen = Math.max(len, bufferSize);
+        final byte[] bytes = new byte[maxLen];
+        System.arraycopy(b, off, bytes, 0, len);
+
+        final ByteBuffer buffer = ByteBuffer.wrap(bytes);
+        buffer.limit(len);
+        buffer.position(0);
+
+        write(buffer);
+    }
+
+    private void write(ByteBuffer buffer) throws IOException {
+        try {
+            boolean queued = queue.offer(buffer, WRITE_TIMEOUT, TimeUnit.MILLISECONDS);
+            if (!queued) {
+                throw new IOException("Buffer overflow.");
+            }
+
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        boolean offer = false;
+
+        try {
+            offer = queue.offer(VOID, WRITE_TIMEOUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            // ignore.
+        }
+
+        if (!offer) {
+            lock.lock();
+            queue.removeLast();
+            queue.add(VOID);
+            lock.unlock();
+        }
+    }
+
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    private void checkClosed() throws IOException {
+        if (!open) {
+            throw new IOException("Stream already closed.");
+        }
+    }
+}
diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/package-info.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/package-info.java
new file mode 100644
index 0000000..9dc13ef
--- /dev/null
+++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+/**
+ * Jersey client {@link org.glassfish.jersey.client.spi.Connector connector} based on Helidon Web Client.
+ */
+package org.glassfish.jersey.helidon.connector;
diff --git a/connectors/helidon-connector/src/main/resources/org.glassfish.jersey.helidon.connector/localization.properties b/connectors/helidon-connector/src/main/resources/org.glassfish.jersey.helidon.connector/localization.properties
new file mode 100644
index 0000000..f962cbc
--- /dev/null
+++ b/connectors/helidon-connector/src/main/resources/org.glassfish.jersey.helidon.connector/localization.properties
@@ -0,0 +1,21 @@
+#
+# Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+#
+# This program and the accompanying materials are made available under the
+# terms of the Eclipse Public License v. 2.0, which is available at
+# http://www.eclipse.org/legal/epl-2.0.
+#
+# This Source Code may also be made available under the following Secondary
+# Licenses when the conditions for such availability set forth in the
+# Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+# version 2 with the GNU Classpath Exception, which is available at
+# https://www.gnu.org/software/classpath/license.html.
+#
+# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+#
+
+error.writing.entity=Error writing entity: {0}.
+not.helidon.config=Given instance {0} is not Helidon config. Provided HelidonProperties.CONFIG is ignored.
+not.supported=Helidon connector is not supported on JDK version < 11.
+unsupported.proxy.schema=Proxy schema "{0}" not supported.
+wrong.proxy.uri.type=The proxy URI ("{0}") property MUST be an instance of String or URI.
\ No newline at end of file
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/AsyncTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/AsyncTest.java
new file mode 100644
index 0000000..b6189db
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/AsyncTest.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.logging.LoggingFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.container.TimeoutHandler;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Asynchronous connector test.
+ *
+ * @author Arul Dhesiaseelan (aruld at acm.org)
+ * @author Marek Potociar
+ */
+public class AsyncTest extends JerseyTest {
+
+    private static final Logger LOGGER = Logger.getLogger(AsyncTest.class.getName());
+    private static final String PATH = "async";
+
+    /**
+     * Asynchronous test resource.
+     */
+    @Path(PATH)
+    public static class AsyncResource {
+
+        /**
+         * Typical long-running operation duration.
+         */
+        public static final long OPERATION_DURATION = 1000;
+
+        /**
+         * Long-running asynchronous post.
+         *
+         * @param asyncResponse async response.
+         * @param id            post request id (received as request payload).
+         */
+        @POST
+        public void asyncPost(@Suspended final AsyncResponse asyncResponse, final String id) {
+            LOGGER.info("Long running post operation called with id " + id + " on thread " + Thread.currentThread().getName());
+            new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    final String result = veryExpensiveOperation();
+                    asyncResponse.resume(result);
+                }
+
+                private String veryExpensiveOperation() {
+                    // ... very expensive operation that typically finishes within 1 seconds, simulated using sleep()
+                    try {
+                        Thread.sleep(OPERATION_DURATION);
+                        return "DONE-" + id;
+                    } catch (final InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        return "INTERRUPTED-" + id;
+                    } finally {
+                        LOGGER.info("Long running post operation finished on thread " + Thread.currentThread().getName());
+                    }
+                }
+            }, "async-post-runner-" + id).start();
+        }
+
+        /**
+         * Long-running async get request that times out.
+         *
+         * @param asyncResponse async response.
+         */
+        @GET
+        @Path("timeout")
+        public void asyncGetWithTimeout(@Suspended final AsyncResponse asyncResponse) {
+            LOGGER.info("Async long-running get with timeout called on thread " + Thread.currentThread().getName());
+            asyncResponse.setTimeoutHandler(new TimeoutHandler() {
+
+                @Override
+                public void handleTimeout(final AsyncResponse asyncResponse) {
+                    asyncResponse.resume(Response.status(Response.Status.SERVICE_UNAVAILABLE)
+                                                 .entity("Operation time out.").build());
+                }
+            });
+            asyncResponse.setTimeout(1, TimeUnit.SECONDS);
+
+            new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    final String result = veryExpensiveOperation();
+                    asyncResponse.resume(result);
+                }
+
+                private String veryExpensiveOperation() {
+                    // very expensive operation that typically finishes within 1 second but can take up to 5 seconds,
+                    // simulated using sleep()
+                    try {
+                        Thread.sleep(5 * OPERATION_DURATION);
+                        return "DONE";
+                    } catch (final InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        return "INTERRUPTED";
+                    } finally {
+                        LOGGER.info("Async long-running get with timeout finished on thread " + Thread.currentThread().getName());
+                    }
+                }
+            }).start();
+        }
+
+    }
+
+    @Override
+    protected Application configure() {
+        return new ResourceConfig(AsyncResource.class)
+                .register(new LoggingFeature(LOGGER, LoggingFeature.Verbosity.PAYLOAD_ANY));
+    }
+
+    @Override
+    protected void configureClient(final ClientConfig config) {
+        config.register(new LoggingFeature(LOGGER, LoggingFeature.Verbosity.PAYLOAD_ANY));
+        config.connectorProvider(new HelidonConnectorProvider());
+    }
+
+    /**
+     * Test asynchronous POST.
+     * <p/>
+     * Send 3 async POST requests and wait to receive the responses. Check the response content and
+     * assert that the operation did not take more than twice as long as a single long operation duration
+     * (this ensures async request execution).
+     *
+     * @throws Exception in case of a test error.
+     */
+    @Test
+    public void testAsyncPost() throws Exception {
+        final Future<Response> warmUp1 = target(PATH).request().async().post(Entity.text("100"));
+        final Future<Response> warmUp2 = target(PATH).request().async().post(Entity.text("200"));
+        final Future<Response> warmUp3 = target(PATH).request().async().post(Entity.text("300"));
+
+        assertEquals("DONE-100", warmUp1.get().readEntity(String.class));
+        assertEquals("DONE-200", warmUp2.get().readEntity(String.class));
+        assertEquals("DONE-300", warmUp3.get().readEntity(String.class));
+
+        final long tic = System.currentTimeMillis();
+
+        // Submit requests asynchronously.
+        final Future<Response> rf1 = target(PATH).request().async().post(Entity.text("1"));
+        final Future<Response> rf2 = target(PATH).request().async().post(Entity.text("2"));
+        final Future<Response> rf3 = target(PATH).request().async().post(Entity.text("3"));
+
+        // get() waits for the response
+        final String r1 = rf1.get().readEntity(String.class);
+        final String r2 = rf2.get().readEntity(String.class);
+        final String r3 = rf3.get().readEntity(String.class);
+
+        final long toc = System.currentTimeMillis();
+
+        assertEquals("DONE-1", r1);
+        assertEquals("DONE-2", r2);
+        assertEquals("DONE-3", r3);
+
+        assertThat("Async processing took too long.", toc - tic, Matchers.lessThan(3 * AsyncResource.OPERATION_DURATION));
+    }
+
+    /**
+     * Test accessing an operation that times out on the server.
+     *
+     * @throws Exception in case of a test error.
+     */
+    @Test
+    public void testAsyncGetWithTimeout() throws Exception {
+        final Future<Response> responseFuture = target(PATH).path("timeout").request().async().get();
+        // Request is being processed asynchronously.
+        final Response response = responseFuture.get();
+
+        // get() waits for the response
+        assertEquals(503, response.getStatus());
+        assertEquals("Operation time out.", response.readEntity(String.class));
+    }
+}
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/BasicHelidonConnectorTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/BasicHelidonConnectorTest.java
new file mode 100644
index 0000000..8b42df1
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/BasicHelidonConnectorTest.java
@@ -0,0 +1,308 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.JerseyCompletionStageRxInvoker;
+import org.glassfish.jersey.logging.LoggingFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(Parameterized.class)
+public class BasicHelidonConnectorTest extends JerseyTest {
+
+    private final String entityType;
+
+    public BasicHelidonConnectorTest(String entityType) {
+        this.entityType = entityType;
+    }
+
+    @Parameterized.Parameters
+    public static Object[] data() {
+        return new Object[]{"BYTE_ARRAY_OUTPUT_STREAM", "READABLE_BYTE_CHANNEL", "OUTPUT_STREAM_PUBLISHER"};
+    }
+
+    @Path("basic")
+    public static class BasicResource {
+        @Path("get")
+        @GET
+        public String get() {
+            return "ok";
+        }
+
+        @Path("getquery")
+        @GET
+        public String getQuery(@QueryParam("first") String first, @QueryParam("second") String second) {
+            return first + second;
+        }
+
+        @POST
+        @Path("post")
+        public String post(String entity) {
+            return entity + entity;
+        }
+
+        @GET
+        @Path("headers")
+        public Response headers(@Context HttpHeaders headers) {
+            Response.ResponseBuilder response = Response.ok("ok");
+            for (Map.Entry<String, List<String>> set : headers.getRequestHeaders().entrySet()) {
+                if (set.getKey().toUpperCase(Locale.ROOT).startsWith("X-TEST")) {
+                    response.header(set.getKey(), set.getValue().iterator().next());
+                }
+            }
+            return response.build();
+        }
+
+        @PUT
+        @Consumes("test/x-test")
+        @Produces("test/y-test")
+        @Path("produces/consumes")
+        public String putConsumesProduces(String content) {
+            return content + content;
+        }
+    }
+
+    @Path("async")
+    public static class AsyncResource {
+        private static CountDownLatch shortLong = null;
+
+        @GET
+        @Path("reset")
+        public void reset() {
+            shortLong = new CountDownLatch(1);
+        }
+
+        @Path("long")
+        @GET
+        public String longGet() throws InterruptedException {
+            shortLong.await(10000, TimeUnit.MILLISECONDS);
+            return shortLong.getCount() == 0 ? "long" : "shortLong CountDownLatch has not been hit";
+        }
+
+        @Path("short")
+        @GET
+        public String shortGet() {
+            shortLong.countDown();
+            return "short";
+        }
+    }
+
+    @Override
+    protected Application configure() {
+        return new ResourceConfig(BasicResource.class, AsyncResource.class)
+                .property(LoggingFeature.LOGGING_FEATURE_LOGGER_LEVEL_SERVER, "WARNING");
+    }
+
+    @Override
+    protected void configureClient(ClientConfig config) {
+        super.configureClient(config);
+        config.connectorProvider(new HelidonConnectorProvider());
+        config.property("jersey.config.helidon.client.entity.type", entityType);
+    }
+
+    @Test
+    public void testBasicGet() {
+        try (Response response = target("basic").path("get").request().get()) {
+            Assert.assertEquals(200, response.getStatus());
+            Assert.assertEquals("ok", response.readEntity(String.class));
+        }
+    }
+
+    @Test
+    public void testBasicPost() {
+        try (Response response = target("basic").path("post").request()
+                .buildPost(Entity.entity("ok", MediaType.TEXT_PLAIN_TYPE)).invoke()) {
+            Assert.assertEquals(200, response.getStatus());
+            Assert.assertEquals("okok", response.readEntity(String.class));
+        }
+    }
+
+    @Test
+    public void queryGetTest() {
+        try (Response response = target("basic").path("getquery")
+                .queryParam("first", "hello")
+                .queryParam("second", "world")
+                .request().get()) {
+            Assert.assertEquals(200, response.getStatus());
+            Assert.assertEquals("helloworld", response.readEntity(String.class));
+        }
+    }
+
+    @Test
+    public void testHeaders() {
+        String[][] headers = new String[][]{{"X-TEST-ONE", "ONE"}, {"X-TEST-TWO", "TWO"}, {"X-TEST-THREE", "THREE"}};
+        MultivaluedHashMap<String, Object> map = new MultivaluedHashMap<>();
+        Arrays.stream(headers).forEach(a -> map.add(a[0], a[1]));
+        try (Response response = target("basic").path("headers").request().headers(map).get()) {
+            Assert.assertEquals(200, response.getStatus());
+            Assert.assertEquals("ok", response.readEntity(String.class));
+            for (int i = 0; i != headers.length; i++) {
+                Assert.assertTrue(response.getHeaders().containsKey(headers[i][0]));
+                Assert.assertEquals(headers[i][1], response.getStringHeaders().getFirst(headers[i][0]));
+            }
+        }
+    }
+
+    @Test
+    public void testProduces() {
+        try (Response response = target("basic").path("produces/consumes").request("test/z-test")
+                .put(Entity.entity("ok", new MediaType("test", "x-test")))) {
+            Assert.assertEquals(406, response.getStatus());
+        }
+
+        try (Response response = target("basic").path("produces/consumes").request()
+                .put(Entity.entity("ok", new MediaType("test", "x-test")))) {
+            Assert.assertEquals(200, response.getStatus());
+            Assert.assertEquals("okok", response.readEntity(String.class));
+            Assert.assertEquals("test/y-test", response.getStringHeaders().getFirst(HttpHeaders.CONTENT_TYPE));
+        }
+    }
+
+    @Test
+    public void testAsyncGet() throws ExecutionException, InterruptedException {
+        Future<Response> futureResponse = target("basic").path("get").request().async().get();
+        try (Response response = futureResponse.get()) {
+            Assert.assertEquals(200, response.getStatus());
+            Assert.assertEquals("ok", response.readEntity(String.class));
+        }
+    }
+
+    @Test
+    public void testConsumes() {
+        try (Response response = target("basic").path("produces/consumes").request("test/y-test")
+                .put(Entity.entity("ok", new MediaType("test", "z-test")))) {
+            Assert.assertEquals(415, response.getStatus());
+        }
+
+        try (Response response = target("basic").path("produces/consumes").request("test/y-test")
+                .put(Entity.entity("ok", MediaType.WILDCARD_TYPE))) {
+            Assert.assertEquals(200, response.getStatus());
+            Assert.assertEquals("okok", response.readEntity(String.class));
+            Assert.assertEquals("test/y-test", response.getStringHeaders().getFirst(HttpHeaders.CONTENT_TYPE));
+        }
+    }
+
+    @Test
+    public void testRxGet() throws ExecutionException, InterruptedException {
+        CompletableFuture<Response> futureResponse =
+                target("basic").path("get").request().rx(JerseyCompletionStageRxInvoker.class).get();
+
+        try (Response response = futureResponse.get()) {
+            Assert.assertEquals(200, response.getStatus());
+            Assert.assertEquals("ok", response.readEntity(String.class));
+        }
+    }
+
+    @Test
+    public void testInputStreamEntity() throws IOException {
+        try (Response response = target("basic").path("get").request().get()) {
+            Assert.assertEquals(200, response.getStatus());
+            InputStream is = response.readEntity(InputStream.class);
+            Assert.assertEquals('o', is.read());
+            Assert.assertEquals('k', is.read());
+            is.close();
+        }
+    }
+
+    // -----------Async
+
+    @Test
+    public void testTwoClientsAsync() throws ExecutionException, InterruptedException {
+        try (Response resetResponse = target("async").path("reset").request().get()) {
+            Assert.assertEquals(204, resetResponse.getStatus());
+        }
+
+        ClientConfig config = new ClientConfig();
+        config.connectorProvider(new HelidonConnectorProvider());
+
+        Client longClient = ClientBuilder.newClient(config);
+        Invocation.Builder longRequest = longClient.target(getBaseUri()).path("async/long").request();
+
+        Client shortClient = ClientBuilder.newClient(config);
+        Invocation.Builder shortRequest = shortClient.target(getBaseUri()).path("async/short").request();
+
+        Future<Response> futureLongResponse = longRequest.async().get();
+        Future<Response> futureShortResponse = shortRequest.async().get();
+
+        try (Response shortResponse = futureShortResponse.get()) {
+            Assert.assertEquals(200, shortResponse.getStatus());
+            Assert.assertEquals("short", shortResponse.readEntity(String.class));
+        }
+
+        try (Response longResponse = futureLongResponse.get()) {
+            Assert.assertEquals(200, longResponse.getStatus());
+            Assert.assertEquals("long", longResponse.readEntity(String.class));
+        }
+    }
+
+    @Test
+    public void testOneClientsTwoReqestsAsync() throws ExecutionException, InterruptedException {
+        try (Response resetResponse = target("async").path("reset").request().get()) {
+            Assert.assertEquals(204, resetResponse.getStatus());
+        }
+
+        Invocation.Builder longRequest = target().path("async/long").request();
+        Invocation.Builder shortRequest = target().path("async/short").request();
+
+        Future<Response> futureLongResponse = longRequest.async().get();
+        Future<Response> futureShortResponse = shortRequest.async().get();
+
+        try (Response shortResponse = futureShortResponse.get()) {
+            Assert.assertEquals(200, shortResponse.getStatus());
+            Assert.assertEquals("short", shortResponse.readEntity(String.class));
+        }
+
+        try (Response longResponse = futureLongResponse.get()) {
+            Assert.assertEquals(200, longResponse.getStatus());
+            Assert.assertEquals("long", longResponse.readEntity(String.class));
+        }
+    }
+}
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/FollowRedirectsTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/FollowRedirectsTest.java
new file mode 100644
index 0000000..87105d6
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/FollowRedirectsTest.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientResponse;
+import org.glassfish.jersey.logging.LoggingFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Test;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.client.ClientRequestContext;
+import javax.ws.rs.client.ClientResponseContext;
+import javax.ws.rs.client.ClientResponseFilter;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Helidon connector follow redirect tests.
+ *
+ * @author Martin Matula
+ * @author Arul Dhesiaseelan (aruld at acm.org)
+ * @author Marek Potociar
+ */
+public class FollowRedirectsTest extends JerseyTest {
+    private static final Logger LOGGER = Logger.getLogger(TimeoutTest.class.getName());
+
+    @Path("/test")
+    public static class RedirectResource {
+        @GET
+        public String get() {
+            return "GET";
+        }
+
+        @GET
+        @Path("redirect")
+        public Response redirect() {
+            return Response.seeOther(UriBuilder.fromResource(RedirectResource.class).build()).build();
+        }
+    }
+
+    @Override
+    protected Application configure() {
+        ResourceConfig config = new ResourceConfig(RedirectResource.class);
+        config.register(new LoggingFeature(LOGGER, LoggingFeature.Verbosity.PAYLOAD_ANY));
+        return config;
+    }
+
+    @Override
+    protected void configureClient(ClientConfig config) {
+        config.connectorProvider(new HelidonConnectorProvider());
+    }
+
+    private static class RedirectTestFilter implements ClientResponseFilter {
+        public static final String RESOLVED_URI_HEADER = "resolved-uri";
+
+        @Override
+        public void filter(ClientRequestContext requestContext, ClientResponseContext responseContext) throws IOException {
+            if (responseContext instanceof ClientResponse) {
+                ClientResponse clientResponse = (ClientResponse) responseContext;
+                responseContext.getHeaders().putSingle(RESOLVED_URI_HEADER, clientResponse.getResolvedRequestUri().toString());
+            }
+        }
+    }
+
+    @Test
+    public void testDoFollow() {
+        Response r = target("test/redirect").register(RedirectTestFilter.class).request().get();
+        assertEquals(200, r.getStatus());
+        assertEquals("GET", r.readEntity(String.class));
+
+        assertEquals(
+                UriBuilder.fromUri(getBaseUri()).path(RedirectResource.class).build().toString(),
+                r.getHeaderString(RedirectTestFilter.RESOLVED_URI_HEADER));
+    }
+
+    @Test
+    public void testDontFollow() {
+        WebTarget t = target("test/redirect");
+        t.property(ClientProperties.FOLLOW_REDIRECTS, false);
+        assertEquals(303, t.request().get().getStatus());
+    }
+}
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/LargeDataTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/LargeDataTest.java
new file mode 100644
index 0000000..8a7a690
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/LargeDataTest.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.logging.LoggingFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.ServerErrorException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.logging.Logger;
+
+/**
+ * The LargeDataTest reproduces a problem when bytes of large data sent are incorrectly sent.
+ * As a result, the request body is different than what was sent by the client.
+ * <p>
+ * In order to be able to inspect the request body, the generated data is a sequence of numbers
+ * delimited with new lines. Such as
+ * <pre><code>
+ *     1
+ *     2
+ *     3
+ *
+ *     ...
+ *
+ *     57234
+ *     57235
+ *     57236
+ *
+ *     ...
+ * </code></pre>
+ * It is also possible to send the data to netcat: {@code nc -l 8080} and verify the problem is
+ * on the client side.
+ *
+ * @author Stepan Vavra
+ * @author Marek Potociar
+ */
+public class LargeDataTest extends JerseyTest {
+
+    private static final Logger LOGGER = Logger.getLogger(LargeDataTest.class.getName());
+    private static final int LONG_DATA_SIZE = 100_000;  // for large set around 5GB, try e.g.: 536_870_912;
+    private static volatile Throwable exception;
+
+    private static StreamingOutput longData(long sequence) {
+        return out -> {
+            long offset = 0;
+            while (offset < sequence) {
+                out.write(Long.toString(offset).getBytes());
+                out.write('\n');
+                offset++;
+            }
+        };
+    }
+
+    @Override
+    protected Application configure() {
+        ResourceConfig config = new ResourceConfig(HttpMethodResource.class);
+        config.register(new LoggingFeature(LOGGER, LoggingFeature.Verbosity.HEADERS_ONLY));
+        return config;
+    }
+
+    @Override
+    protected void configureClient(ClientConfig config) {
+        config.connectorProvider(new HelidonConnectorProvider());
+    }
+
+    @Test
+    public void postWithLargeData() throws Throwable {
+        long milis = System.currentTimeMillis();
+        WebTarget webTarget = target("test");
+
+        Response response = webTarget.request().post(Entity.entity(longData(LONG_DATA_SIZE), MediaType.TEXT_PLAIN_TYPE));
+
+        try {
+            if (exception != null) {
+
+                // the reason to throw the exception is that IntelliJ gives you an option to compare the expected with the actual
+                throw exception;
+            }
+
+            Assert.assertEquals("Unexpected error: " + response.getStatus(),
+                    Status.Family.SUCCESSFUL,
+                    response.getStatusInfo().getFamily());
+        } finally {
+            response.close();
+        }
+        if (LONG_DATA_SIZE > 9_999) {
+            System.out.println("Large Data Test took " + (System.currentTimeMillis() - milis) + "milis");
+        }
+    }
+
+    @Path("/test")
+    public static class HttpMethodResource {
+
+        @POST
+        public Response post(InputStream content) {
+            try {
+                longData(LONG_DATA_SIZE).write(new OutputStream() {
+
+                    private long position = 0;
+//                    private long mbRead = 0;
+
+                    @Override
+                    public void write(final int generated) throws IOException {
+                        int received = content.read();
+
+                        if (received != generated) {
+                            throw new IOException("Bytes don't match at position " + position
+                                    + ": received=" + received
+                                    + ", generated=" + generated);
+                        }
+
+//                        position++;
+//                        System.out.println("position" + position);
+//                        if (position % (1024 * 1024) == 0) {
+//                            mbRead++;
+//                            System.out.println("MB read: " + mbRead);
+//                        }
+                    }
+                });
+            } catch (IOException e) {
+                exception = e;
+                throw new ServerErrorException(e.getMessage(), 500, e);
+            }
+
+            return Response.ok().build();
+        }
+
+    }
+}
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/ParallelTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/ParallelTest.java
new file mode 100644
index 0000000..94a1d26
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/ParallelTest.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the parallel execution of multiple requests.
+ *
+ * @author Stepan Kopriva
+ */
+public class ParallelTest extends JerseyTest {
+    private static final Logger LOGGER = Logger.getLogger(ParallelTest.class.getName());
+
+    private static final int PARALLEL_CLIENTS = 10;
+    private static final String PATH = "test";
+    private static final AtomicInteger receivedCounter = new AtomicInteger(0);
+    private static final AtomicInteger resourceCounter = new AtomicInteger(0);
+    private static final CyclicBarrier startBarrier = new CyclicBarrier(PARALLEL_CLIENTS + 1);
+    private static final CountDownLatch doneLatch = new CountDownLatch(PARALLEL_CLIENTS);
+
+    @Path(PATH)
+    public static class MyResource {
+
+        @GET
+        public String get() {
+            sleep();
+            resourceCounter.addAndGet(1);
+            return "GET";
+        }
+
+        private void sleep() {
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException ex) {
+                Logger.getLogger(ParallelTest.class.getName()).log(Level.SEVERE, null, ex);
+            }
+        }
+    }
+
+    @Override
+    protected Application configure() {
+        return new ResourceConfig(ParallelTest.MyResource.class);
+    }
+
+    @Override
+    protected void configureClient(ClientConfig config) {
+        config.connectorProvider(new HelidonConnectorProvider());
+    }
+
+    @Test
+    public void testParallel() throws BrokenBarrierException, InterruptedException, TimeoutException {
+        final ScheduledExecutorService executor = Executors.newScheduledThreadPool(PARALLEL_CLIENTS);
+
+        try {
+            final WebTarget target = target();
+            for (int i = 1; i <= PARALLEL_CLIENTS; i++) {
+                final int id = i;
+                executor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            startBarrier.await();
+                            Response response;
+                            response = target.path(PATH).request().get();
+                            assertEquals("GET", response.readEntity(String.class));
+                            receivedCounter.incrementAndGet();
+                        } catch (InterruptedException ex) {
+                            Thread.currentThread().interrupt();
+                            LOGGER.log(Level.WARNING, "Client thread " + id + " interrupted.", ex);
+                        } catch (BrokenBarrierException ex) {
+                            LOGGER.log(Level.INFO, "Client thread " + id + " failed on broken barrier.", ex);
+                        } catch (Throwable t) {
+                            t.printStackTrace();
+                            LOGGER.log(Level.WARNING, "Client thread " + id + " failed on unexpected exception.", t);
+                        } finally {
+                            doneLatch.countDown();
+                        }
+                    }
+                });
+            }
+
+            startBarrier.await(1, TimeUnit.SECONDS);
+
+            assertTrue("Waiting for clients to finish has timed out.", doneLatch.await(5 * getAsyncTimeoutMultiplier(),
+                                                                                       TimeUnit.SECONDS));
+
+            assertEquals("Resource counter", PARALLEL_CLIENTS, resourceCounter.get());
+
+            assertEquals("Received counter", PARALLEL_CLIENTS, receivedCounter.get());
+        } finally {
+            executor.shutdownNow();
+            Assert.assertTrue("Executor termination", executor.awaitTermination(5, TimeUnit.SECONDS));
+        }
+    }
+}
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/TimeoutTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/TimeoutTest.java
new file mode 100644
index 0000000..1fe6374
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/TimeoutTest.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Martin Matula
+ */
+public class TimeoutTest extends JerseyTest {
+    @Path("/test")
+    public static class TimeoutResource {
+        @GET
+        public String get() {
+            return "GET";
+        }
+
+        @GET
+        @Path("timeout")
+        public String getTimeout() {
+            try {
+                Thread.sleep(2000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            return "GET";
+        }
+    }
+
+    @Override
+    protected Application configure() {
+        return new ResourceConfig(TimeoutResource.class);
+    }
+
+    @Override
+    protected void configureClient(ClientConfig config) {
+        config.connectorProvider(new HelidonConnectorProvider());
+    }
+
+    @Test
+    public void testFast() {
+        Response r = target("test").request().get();
+        assertEquals(200, r.getStatus());
+        assertEquals("GET", r.readEntity(String.class));
+    }
+
+    @Test
+    public void testSlow() {
+        try {
+            target("test/timeout").property(ClientProperties.READ_TIMEOUT, 1_000).request().get();
+            fail("Timeout expected.");
+        } catch (ProcessingException e) {
+            assertTimeoutException(e);
+        }
+    }
+
+    @Ignore
+    // TODO - WebClient change request
+    public void testTimeoutInRequest() {
+        try {
+            target("test/timeout").request().property(ClientProperties.READ_TIMEOUT, 1_000).get();
+            fail("Timeout expected.");
+        } catch (ProcessingException e) {
+            assertTimeoutException(e);
+        }
+    }
+
+    private void assertTimeoutException(Exception e) {
+        String exceptionName = "TimeoutException"; // check netty or JDK TimeoutException
+        Throwable t = e.getCause();
+        while (t != null) {
+            if (t.getClass().getSimpleName().contains(exceptionName)) {
+                break;
+            }
+            t = t.getCause();
+        }
+        if (t == null) {
+            if (e.getCause() != null) {
+                if (e.getCause().getCause() != null) {
+                    fail("Unexpected processing exception cause" + e.getCause().getCause().getMessage());
+                } else {
+                    fail("Unexpected processing exception cause" + e.getCause().getMessage());
+                }
+            } else {
+                fail("Unexpected processing exception cause" + e.getMessage());
+            }
+        }
+    }
+}
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/EventOutputTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/EventOutputTest.java
new file mode 100644
index 0000000..31ceb61
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/EventOutputTest.java
@@ -0,0 +1,274 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector.sse;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.helidon.connector.HelidonConnectorProvider;
+import org.glassfish.jersey.media.sse.EventInput;
+import org.glassfish.jersey.media.sse.EventListener;
+import org.glassfish.jersey.media.sse.EventOutput;
+import org.glassfish.jersey.media.sse.EventSource;
+import org.glassfish.jersey.media.sse.InboundEvent;
+import org.glassfish.jersey.media.sse.OutboundEvent;
+import org.glassfish.jersey.media.sse.SseFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Event output tests.
+ *
+ * @author Pavel Bucek
+ * @author Marek Potociar
+ */
+public class EventOutputTest extends JerseyTest {
+
+    @Override
+    protected Application configure() {
+        return new ResourceConfig(SseTestResource.class, SseFeature.class);
+    }
+
+    @Override
+    protected void configureClient(ClientConfig config) {
+        config.register(SseFeature.class);
+        config.connectorProvider(new HelidonConnectorProvider());
+    }
+
+    /**
+     * SSE Test resource.
+     */
+    @Path("test")
+    @Produces(SseFeature.SERVER_SENT_EVENTS)
+    public static class SseTestResource {
+
+        @GET
+        @Path("single")
+        public EventOutput getSingleEvent() {
+            final EventOutput output = new EventOutput();
+            try {
+                return output;
+            } finally {
+                new Thread() {
+                    public void run() {
+                        try {
+                            output.write(new OutboundEvent.Builder().data(String.class, "single").build());
+                            output.close();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                            fail();
+                        }
+                    }
+                }.start();
+            }
+        }
+
+        @GET
+        @Path("closed-single")
+        public EventOutput getClosedSingleEvent() throws IOException {
+            final EventOutput output = new EventOutput();
+            output.write(new OutboundEvent.Builder().data(String.class, "closed").build());
+            output.close();
+            return output;
+        }
+
+        @GET
+        @Path("closed-empty")
+        public EventOutput getClosedEmpty() throws IOException {
+            final EventOutput output = new EventOutput();
+            output.close();
+            return output;
+        }
+
+        @GET
+        @Path("charset")
+        @Produces("text/event-stream;charset=utf-8")
+        public EventOutput getSseWithCharset() throws IOException {
+            final EventOutput output = new EventOutput();
+            output.write(new OutboundEvent.Builder().data(String.class, "charset").build());
+            output.close();
+            return output;
+        }
+
+        @GET
+        @Path("comments-only")
+        public EventOutput getCommentsOnlyStream() throws IOException {
+            final EventOutput output = new EventOutput();
+            output.write(new OutboundEvent.Builder().comment("No comment #1").build());
+            output.write(new OutboundEvent.Builder().comment("No comment #2").build());
+            output.close();
+            return output;
+        }
+    }
+
+    @Test
+    @Ignore //2.0.0-M3
+    public void testReadSseEventAsPlainString() throws Exception {
+        final Response r = target().path("test/single").request().get(Response.class);
+        assertThat(r.readEntity(String.class), containsString("single"));
+    }
+
+    /**
+     * Reproducer for JERSEY-2912: Sending and receiving comments-only events.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testReadCommentsOnlySseEvents() throws Exception {
+        ClientConfig clientConfig = new ClientConfig();
+        clientConfig.property(ClientProperties.CONNECT_TIMEOUT, 15000);
+        clientConfig.property(ClientProperties.READ_TIMEOUT, 0);
+        clientConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
+        clientConfig.connectorProvider(new HelidonConnectorProvider());
+        Client client = ClientBuilder.newBuilder().withConfig(clientConfig).build();
+
+        final CountDownLatch latch = new CountDownLatch(2);
+        final Queue<String> eventComments = new ArrayBlockingQueue<>(2);
+        WebTarget single = client.target(getBaseUri()).path("test/comments-only");
+        EventSource es = EventSource.target(single).build();
+        es.register(new EventListener() {
+            @Override
+            public void onEvent(InboundEvent inboundEvent) {
+                eventComments.add(inboundEvent.getComment());
+                latch.countDown();
+            }
+        });
+
+        boolean latchTimedOut;
+        boolean closeTimedOut;
+        try {
+            es.open();
+            latchTimedOut = latch.await(5 * getAsyncTimeoutMultiplier(), TimeUnit.SECONDS);
+        } finally {
+            closeTimedOut = es.close(5, TimeUnit.SECONDS);
+        }
+
+        assertEquals("Unexpected event count", 2, eventComments.size());
+        for (int i = 1; i <= 2; i++) {
+            assertEquals("Unexpected comment data on event #" + i, "No comment #" + i, eventComments.poll());
+        }
+        assertTrue("Event latch has timed out", latchTimedOut);
+        assertTrue("EventSource.close() has timed out", closeTimedOut);
+    }
+
+    @Test
+    @Ignore // 2.0.0-M3
+    public void testReadFromClosedOutput() throws Exception {
+        /**
+         * Need to disable HTTP Keep-Alive to prevent this test from hanging in HttpURLConnection
+         * due to an attempt to read from a stale, out-of-sync connection closed by the server.
+         * Thus setting the "Connection: close" HTTP header on all requests.
+         */
+        Response r;
+        r = target().path("test/closed-empty").request().header("Connection", "close").get();
+        assertTrue(r.readEntity(String.class).isEmpty());
+
+        r = target().path("test/closed-single").request().header("Connection", "close").get();
+        assertTrue(r.readEntity(String.class).contains("closed"));
+
+        //
+
+        EventInput input;
+        input = target().path("test/closed-single").request().header("Connection", "close").get(EventInput.class);
+        assertEquals("closed", input.read().readData());
+        assertEquals(null, input.read());
+        assertTrue(input.isClosed());
+
+        input = target().path("test/closed-empty").request().header("Connection", "close").get(EventInput.class);
+        assertEquals(null, input.read());
+        assertTrue(input.isClosed());
+    }
+
+    @Test
+    public void testSseContentTypeWithCharset() {
+        /**
+         * Need to disable HTTP Keep-Alive to prevent this test from hanging in HttpURLConnection
+         * due to an attempt to read from a stale, out-of-sync connection closed by the server.
+         * Thus setting the "Connection: close" HTTP header on all requests.
+         */
+        Response r;
+        r = target().path("test/charset").request().header("Connection", "close").get();
+        assertTrue(r.getMediaType().getParameters().get("charset").equalsIgnoreCase("utf-8"));
+        final EventInput eventInput = r.readEntity(EventInput.class);
+        String eventData = eventInput.read().readData();
+        assertEquals("charset", eventData);
+        eventInput.close();
+    }
+
+    @Test
+    public void testConnectorWithEventSource() throws InterruptedException {
+        ClientConfig clientConfig = new ClientConfig();
+        clientConfig.property(ClientProperties.CONNECT_TIMEOUT, 15000);
+        clientConfig.property(ClientProperties.READ_TIMEOUT, 0);
+        clientConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
+        clientConfig.connectorProvider(new HelidonConnectorProvider());
+        Client client = ClientBuilder.newBuilder().withConfig(clientConfig).build();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<String> eventData = new AtomicReference<String>();
+        final AtomicInteger counter = new AtomicInteger(0);
+        WebTarget single = client.target(getBaseUri()).path("test/single");
+        EventSource es = EventSource.target(single).build();
+        es.register(new EventListener() {
+            @Override
+            public void onEvent(InboundEvent inboundEvent) {
+                final int i = counter.incrementAndGet();
+                if (i == 1) {
+                    eventData.set(inboundEvent.readData());
+                }
+                latch.countDown();
+            }
+        });
+
+        boolean latchTimedOut;
+        boolean closeTimedOut;
+        try {
+            es.open();
+            latchTimedOut = latch.await(5 * getAsyncTimeoutMultiplier(), TimeUnit.SECONDS);
+        } finally {
+            closeTimedOut = es.close(5, TimeUnit.SECONDS);
+        }
+
+        // 2.0.0.-M3 assertEquals("Unexpected event count", 1, counter.get());
+        assertEquals("Unexpected event data", "single", eventData.get());
+        assertTrue("Event latch has timed out", latchTimedOut);
+        assertTrue("EventSource.close() has timed out", closeTimedOut);
+    }
+}
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/SseTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/SseTest.java
new file mode 100644
index 0000000..61a03bf
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/SseTest.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector.sse;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.helidon.connector.HelidonConnectorProvider;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.PostConstruct;
+import javax.inject.Singleton;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseEventSink;
+import javax.ws.rs.sse.SseEventSource;
+import java.io.Closeable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class SseTest extends JerseyTest {
+
+    private static String PALINDROME = "neveroddoreven";
+    private static int WAIT_TIME = 5000;
+
+    @Path("simple")
+    public static class SimpleSseResource {
+        @GET
+        @Produces(MediaType.SERVER_SENT_EVENTS)
+        public void send(@Context SseEventSink sink, @Context Sse sse) {
+            try (SseEventSink s = sink) {
+                for (int i = 0; i != 10; i++) {
+                    s.send(sse.newEvent("A"));
+                }
+            }
+        }
+    }
+
+    @Path("broadcast")
+    @Singleton
+    public static class BroadcasterResource {
+        private static final String WELCOME = "Welcome";
+
+        @Context
+        private Sse sse;
+
+        private static SseBroadcaster sseBroadcaster;
+
+        @PostConstruct
+        public void init() {
+            System.out.println("INIT");
+            sseBroadcaster = sse.newBroadcaster();
+        }
+
+        @GET
+        @Path("register")
+        @Produces(MediaType.SERVER_SENT_EVENTS)
+        public void register(@Context SseEventSink sink) {
+            sseBroadcaster.register(sink);
+            sink.send(sse.newEvent(WELCOME));
+        }
+
+        @POST
+        @Path("broadcast")
+        @Consumes(MediaType.TEXT_PLAIN)
+        public void broadcast(String event) {
+            sseBroadcaster.broadcast(sse.newEvent(event));
+        }
+    }
+
+    @Override
+    protected Application configure() {
+        return new ResourceConfig(SimpleSseResource.class, BroadcasterResource.class);
+    }
+
+    @Override
+    protected void configureClient(ClientConfig config) {
+        config.connectorProvider(new HelidonConnectorProvider());
+        //config.property("jersey.config.helidon.client.entity.type", "OUTPUT_STREAM_PUBLISHER");
+    }
+
+    @Test
+    public void testSend() throws InterruptedException {
+        final StringBuilder sb = new StringBuilder();
+        final CountDownLatch latch = new CountDownLatch(10);
+        try (SseEventSource source = SseEventSource.target(target().path("simple")).build()) {
+            source.register((event) -> sb.append(event.readData()));
+            source.register((event) -> latch.countDown());
+            source.open();
+
+            latch.await(WAIT_TIME, TimeUnit.MILLISECONDS);
+        }
+
+        Assert.assertEquals("AAAAAAAAAA", sb.toString());
+        Assert.assertEquals(0, latch.getCount());
+    }
+
+    @Test
+    public void testBroadcast() throws InterruptedException {
+        final BroadcasterClient clientOne = new BroadcasterClient(target());
+        final BroadcasterClient clientTwo = new BroadcasterClient(target());
+
+        clientOne.register();
+        clientTwo.register();
+
+        clientOne.broadcast();
+        clientTwo.broadcast();
+
+        clientOne.messageLatch.await(WAIT_TIME, TimeUnit.MILLISECONDS);
+        clientTwo.messageLatch.await(WAIT_TIME, TimeUnit.MILLISECONDS);
+
+        Assert.assertEquals(0, clientOne.messageLatch.getCount());
+        Assert.assertEquals(0, clientTwo.messageLatch.getCount());
+
+        Assert.assertEquals(BroadcasterResource.WELCOME + PALINDROME + PALINDROME, clientOne.message.toString());
+        Assert.assertEquals(BroadcasterResource.WELCOME + PALINDROME + PALINDROME, clientTwo.message.toString());
+
+        clientOne.close();
+        clientTwo.close();
+    }
+
+    private static class BroadcasterClient implements Closeable {
+        private final WebTarget target;
+        private final CountDownLatch messageLatch = new CountDownLatch(3);
+        private final SseEventSource source;
+        private final StringBuilder message = new StringBuilder();
+
+        private BroadcasterClient(WebTarget target) {
+            this.target = target;
+            source = SseEventSource.target(target.path("broadcast/register")).build();
+        }
+
+        private void register() throws InterruptedException {
+            final CountDownLatch latch = new CountDownLatch(1);
+            source.register((event) -> message.append(event.readData()));
+            source.register((event) -> latch.countDown());
+            source.register((event) -> messageLatch.countDown());
+            source.open();
+
+            latch.await(WAIT_TIME, TimeUnit.MILLISECONDS);
+            Assert.assertEquals(0, latch.getCount());
+        }
+
+        private void broadcast() {
+            try (Response r = target.path("broadcast/broadcast")
+                    .request().buildPost(Entity.entity(PALINDROME, MediaType.TEXT_PLAIN)).invoke()) {
+                Assert.assertEquals(204, r.getStatus());
+            }
+        }
+
+        @Override
+        public void close() {
+            source.close();
+        }
+    }
+}
diff --git a/connectors/pom.xml b/connectors/pom.xml
index 37ecbda..87875d8 100644
--- a/connectors/pom.xml
+++ b/connectors/pom.xml
@@ -75,4 +75,16 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <profiles>
+        <profile>
+            <id>HelidonConnector</id>
+            <activation>
+                <jdk>11</jdk>
+            </activation>
+            <modules>
+                <module>helidon-connector</module>
+            </modules>
+        </profile>
+    </profiles>
 </project>