Connector to Helidon 2 Web Client
Signed-off-by: Jan Supol <jan.supol@oracle.com>
diff --git a/connectors/helidon-connector/pom.xml b/connectors/helidon-connector/pom.xml
new file mode 100644
index 0000000..10d1af6
--- /dev/null
+++ b/connectors/helidon-connector/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v. 2.0, which is available at
+ http://www.eclipse.org/legal/epl-2.0.
+
+ This Source Code may also be made available under the following Secondary
+ Licenses when the conditions for such availability set forth in the
+ Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ version 2 with the GNU Classpath Exception, which is available at
+ https://www.gnu.org/software/classpath/license.html.
+
+ SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>project</artifactId>
+ <groupId>org.glassfish.jersey.connectors</groupId>
+ <version>2.31-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>jersey-helidon-connector</artifactId>
+ <packaging>jar</packaging>
+ <name>jersey-connectors-helidon</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.helidon.webclient</groupId>
+ <artifactId>helidon-webclient</artifactId>
+ <!-- Dear user, please use more stable version -->
+ <version>2.0.0-M3</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+ <artifactId>jersey-test-framework-provider-grizzly2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-sse</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.sun.istack</groupId>
+ <artifactId>istack-commons-maven-plugin</artifactId>
+ <inherited>true</inherited>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <inherited>true</inherited>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <inherited>false</inherited>
+ <configuration>
+ <source>11</source>
+ <target>11</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnector.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnector.java
new file mode 100644
index 0000000..6e8e968
--- /dev/null
+++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnector.java
@@ -0,0 +1,249 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import io.helidon.common.Version;
+import io.helidon.webclient.WebClient;
+import io.helidon.webclient.WebClientRequestBuilder;
+import io.helidon.webclient.WebClientResponse;
+import org.glassfish.jersey.client.ClientAsyncExecutorLiteral;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientRequest;
+import org.glassfish.jersey.client.ClientResponse;
+import org.glassfish.jersey.client.JerseyClient;
+import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
+import org.glassfish.jersey.client.spi.Connector;
+import org.glassfish.jersey.internal.util.PropertiesHelper;
+import org.glassfish.jersey.spi.ExecutorServiceProvider;
+
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.core.Configuration;
+import javax.ws.rs.core.Response;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.logging.Logger;
+
+/**
+ * A {@link Connector} that utilizes the Helidon HTTP Client to send and receive
+ * HTTP request and responses.
+ */
+class HelidonConnector implements Connector {
+
+ private static final String helidonVersion = "Helidon/" + Version.VERSION + " (java " + AccessController
+ .doPrivileged(PropertiesHelper.getSystemProperty("java.runtime.version")) + ")";
+ static final Logger LOGGER = Logger.getLogger(HelidonConnector.class.getName());
+
+ private final WebClient webClient;
+
+ private final ExecutorServiceKeeper executorServiceKeeper;
+ private final HelidonEntity.HelidonEntityType entityType;
+
+ private static final InputStream NO_CONTENT_INPUT_STREAM = new InputStream() {
+ @Override
+ public int read() throws IOException {
+ return -1;
+ }
+ };
+ // internal implementation entity type, can be removed in the future
+ // settable for testing purposes
+ private static final String INTERNAL_ENTITY_TYPE = "jersey.config.helidon.client.entity.type";
+
+ HelidonConnector(final Client client, final Configuration config) {
+ executorServiceKeeper = new ExecutorServiceKeeper(client);
+ entityType = getEntityType(config);
+
+ final WebClient.Builder webClientBuilder = WebClient.builder();
+
+ webClientBuilder.addReader(HelidonStructures.createInputStreamBodyReader());
+ HelidonEntity.helidonWriter(entityType).ifPresent(webClientBuilder::addWriter);
+
+ HelidonStructures.createProxy(config).ifPresent(webClientBuilder::proxy);
+
+ HelidonStructures.helidonConfig(config).ifPresent(webClientBuilder::config);
+
+ webClientBuilder.connectTimeout(ClientProperties.getValue(config.getProperties(),
+ ClientProperties.CONNECT_TIMEOUT, 10000), ChronoUnit.MILLIS);
+
+ HelidonStructures.createSSL(client.getSslContext()).ifPresent(webClientBuilder::ssl);
+
+ webClient = webClientBuilder.build();
+ }
+
+ @Override
+ public ClientResponse apply(ClientRequest request) {
+ try {
+ return applyInternal(request).toCompletableFuture().get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new ProcessingException(e);
+ }
+ }
+
+ @Override
+ public Future<?> apply(ClientRequest request, AsyncConnectorCallback callback) {
+ final BiConsumer<? super ClientResponse, ? super Throwable> action = (r, th) -> {
+ if (th == null) callback.response(r);
+ else callback.failure(th);
+ };
+ return applyInternal(request)
+ .whenCompleteAsync(action, executorServiceKeeper.getExecutorService(request))
+ .toCompletableFuture();
+ }
+
+ @Override
+ public String getName() {
+ return helidonVersion;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ private CompletionStage<ClientResponse> applyInternal(ClientRequest request) {
+ final WebClientRequestBuilder webClientRequestBuilder = webClient.method(request.getMethod());
+ webClientRequestBuilder.uri(request.getUri());
+
+ webClientRequestBuilder.headers(HelidonStructures.createHeaders(request.getRequestHeaders()));
+
+ for (String propertyName : request.getConfiguration().getPropertyNames()) {
+ Object property = request.getConfiguration().getProperty(propertyName);
+ if (!propertyName.startsWith("jersey") && String.class.isInstance(property)) {
+ webClientRequestBuilder.property(propertyName, (String) property);
+ }
+ }
+
+ for (String propertyName : request.getPropertyNames()) {
+ Object property = request.resolveProperty(propertyName, null);
+ if (!propertyName.startsWith("jersey") && String.class.isInstance(property)) {
+ webClientRequestBuilder.property(propertyName, (String) property);
+ }
+ }
+
+ // 2.0.0-M3
+ // HelidonStructures.createProxy(request).ifPresent(webClientRequestBuilder::proxy);
+
+ webClientRequestBuilder.followRedirects(request.resolveProperty(ClientProperties.FOLLOW_REDIRECTS, true));
+ webClientRequestBuilder.readTimeout(request.resolveProperty(ClientProperties.READ_TIMEOUT, 10000), ChronoUnit.MILLIS);
+
+ CompletionStage<WebClientResponse> responseStage = null;
+
+ if (request.hasEntity()) {
+ responseStage = HelidonEntity.submit(
+ entityType, request, webClientRequestBuilder, executorServiceKeeper.getExecutorService(request)
+ );
+ } else {
+ responseStage = webClientRequestBuilder.submit();
+ }
+
+ return responseStage.thenCompose((a) -> convertResponse(request, a));
+ }
+
+ private CompletionStage<ClientResponse> convertResponse(final ClientRequest requestContext,
+ final WebClientResponse webClientResponse) {
+
+ final ClientResponse responseContext = new ClientResponse(new Response.StatusType() {
+ @Override
+ public int getStatusCode() {
+ return webClientResponse.status().code();
+ }
+
+ @Override
+ public Response.Status.Family getFamily() {
+ return Response.Status.Family.familyOf(getStatusCode());
+ }
+
+ @Override
+ public String getReasonPhrase() {
+ return webClientResponse.status().reasonPhrase();
+ }
+ }, requestContext);
+
+ for (Map.Entry<String, List<String>> entry : webClientResponse.headers().toMap().entrySet()) {
+ for (String value : entry.getValue()) {
+ responseContext.getHeaders().add(entry.getKey(), value);
+ }
+ }
+
+ responseContext.setResolvedRequestUri(webClientResponse.lastEndpointURI());
+
+ final CompletionStage<InputStream> stream = HelidonStructures.hasEntity(webClientResponse)
+ ? webClientResponse.content().as(InputStream.class)
+ : CompletableFuture.supplyAsync(() -> NO_CONTENT_INPUT_STREAM);
+
+ return stream.thenApply((a) -> {
+ responseContext.setEntityStream(new FilterInputStream(a) {
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ @Override
+ public void close() throws IOException {
+ // Avoid idempotent close in the underlying input stream
+ if (!closed.compareAndSet(false, true)) {
+ super.close();
+ }
+ }
+ });
+ return responseContext;
+ });
+ }
+
+ private static HelidonEntity.HelidonEntityType getEntityType(final Configuration config) {
+ final String helidonType = ClientProperties.getValue(config.getProperties(),
+ INTERNAL_ENTITY_TYPE, HelidonEntity.HelidonEntityType.READABLE_BYTE_CHANNEL.name());
+ final HelidonEntity.HelidonEntityType entityType = HelidonEntity.HelidonEntityType.valueOf(helidonType);
+
+// if (entityType != HelidonEntity.HelidonEntityType.READABLE_BYTE_CHANNEL) {
+// // log warning for internal feature - no localization.properties
+// LOGGER.warning(INTERNAL_ENTITY_TYPE + " is " + entityType.name());
+// }
+
+ return entityType;
+ }
+
+ private static class ExecutorServiceKeeper {
+ private Optional<ExecutorService> executorService;
+
+ private ExecutorServiceKeeper(Client client) {
+ final ClientConfig config = ((JerseyClient) client).getConfiguration();
+ executorService = Optional.ofNullable(config.getExecutorService());
+ }
+
+ private ExecutorService getExecutorService(ClientRequest request) {
+ if (!executorService.isPresent()) {
+ // cache for multiple requests
+ executorService = Optional.ofNullable(request.getInjectionManager()
+ .getInstance(ExecutorServiceProvider.class, ClientAsyncExecutorLiteral.INSTANCE).getExecutorService());
+ }
+
+ return executorService.get();
+ }
+ }
+}
diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnectorProvider.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnectorProvider.java
new file mode 100644
index 0000000..36092f4
--- /dev/null
+++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnectorProvider.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.Beta;
+import org.glassfish.jersey.client.spi.Connector;
+import org.glassfish.jersey.client.spi.ConnectorProvider;
+import org.glassfish.jersey.internal.util.JdkVersion;
+
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.core.Configuration;
+import java.io.OutputStream;
+
+/**
+ * Provider for Helidon WebClient {@link Connector} that utilizes the Helidon HTTP Client to send and receive
+ * HTTP request and responses. JDK 8 is not supported by the Helidon Connector.
+ * <p/>
+ * The following properties are only supported at construction of this class:
+ * <ul>
+ * <li>{@link org.glassfish.jersey.client.ClientProperties#CONNECT_TIMEOUT}</li>
+ * <li>{@link org.glassfish.jersey.client.ClientProperties#FOLLOW_REDIRECTS}</li>
+ * <li>{@link org.glassfish.jersey.client.ClientProperties#PROXY_URI}</li>
+ * <li>{@link org.glassfish.jersey.client.ClientProperties#PROXY_USERNAME}</li>
+ * <li>{@link org.glassfish.jersey.client.ClientProperties#PROXY_PASSWORD}</li>
+ * <li>{@link org.glassfish.jersey.client.ClientProperties#READ_TIMEOUT}</li>
+ * <li>{@link HelidonProperties#CONFIG}</li>
+ * </ul>
+ * <p>
+ * If a {@link org.glassfish.jersey.client.ClientResponse} is obtained and an
+ * entity is not read from the response then
+ * {@link org.glassfish.jersey.client.ClientResponse#close()} MUST be called
+ * after processing the response to release connection-based resources.
+ * </p>
+ * <p>
+ * Client operations are thread safe, the HTTP connection may
+ * be shared between different threads.
+ * </p>
+ * <p>
+ * If a response entity is obtained that is an instance of {@link java.io.Closeable}
+ * then the instance MUST be closed after processing the entity to release
+ * connection-based resources.
+ * </p>
+ * <p>
+ * This connector uses {@link org.glassfish.jersey.client.ClientProperties#OUTBOUND_CONTENT_LENGTH_BUFFER} to buffer the entity
+ * written for instance by {@link javax.ws.rs.core.StreamingOutput}. Should the buffer be small and
+ * {@link javax.ws.rs.core.StreamingOutput#write(OutputStream)} be called many times, the performance can drop. The Content-Length
+ * or the Content_Encoding header is set by the underlaying Helidon WebClient regardless of the
+ * {@link org.glassfish.jersey.client.ClientProperties#OUTBOUND_CONTENT_LENGTH_BUFFER} size, however.
+ * </p>
+ *
+ * @since 2.31
+ */
+@Beta
+public class HelidonConnectorProvider implements ConnectorProvider {
+ @Override
+ public Connector getConnector(Client client, Configuration runtimeConfig) {
+ if (JdkVersion.getJdkVersion().getMajor() < 11) {
+ throw new ProcessingException(LocalizationMessages.NOT_SUPPORTED());
+ }
+ return new HelidonConnector(client, runtimeConfig);
+ }
+}
diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonEntity.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonEntity.java
new file mode 100644
index 0000000..386cb62
--- /dev/null
+++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonEntity.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import io.helidon.common.GenericType;
+import io.helidon.common.http.DataChunk;
+import io.helidon.common.http.MediaType;
+import io.helidon.common.reactive.Multi;
+import io.helidon.common.reactive.OutputStreamPublisher;
+import io.helidon.common.reactive.Single;
+import io.helidon.media.common.ByteChannelBodyWriter;
+import io.helidon.media.common.ContentWriters;
+import io.helidon.media.common.MessageBodyContext;
+import io.helidon.media.common.MessageBodyWriter;
+import io.helidon.media.common.MessageBodyWriterContext;
+import io.helidon.webclient.WebClientRequestBuilder;
+import io.helidon.webclient.WebClientResponse;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientRequest;
+
+import javax.ws.rs.ProcessingException;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Flow;
+import java.util.function.Function;
+
+/**
+ * A utility class that converts outbound client entity to a class understandable by Helidon.
+ * Based on the {@link HelidonEntityType} an entity writer is provided to be registered by Helidon client
+ * and an Entity is provided to be submitted by the Helidon Client.
+ */
+class HelidonEntity {
+ /**
+ * HelidonEnity type chosen by HelidonEntityType
+ */
+ enum HelidonEntityType {
+ /**
+ * Simplest structure. Loads all data to the memory.
+ */
+ BYTE_ARRAY_OUTPUT_STREAM,
+ /**
+ * Readable ByteChannel that is capable of sending data in chunks.
+ * Capable of caching of bytes before the data are consumed by Helidon.
+ */
+ READABLE_BYTE_CHANNEL,
+ /**
+ * Helidon most native entity. Could be slower than {@link #READABLE_BYTE_CHANNEL}.
+ */
+ OUTPUT_STREAM_PUBLISHER
+ }
+
+ /**
+ * Get optional entity writer to be registered by the Helidon Client. For some default providers,
+ * nothing is needed to be registered.
+ * @param type the type of the entity class that works best for the Http Client request use case.
+ * @return possible writer to be registerd by the Helidon Client.
+ */
+ static Optional<MessageBodyWriter<?>> helidonWriter(HelidonEntityType type) {
+ switch (type) {
+ case BYTE_ARRAY_OUTPUT_STREAM:
+ return Optional.of(new OutputStreamBodyWriter());
+ case OUTPUT_STREAM_PUBLISHER:
+ //Helidon default
+ return Optional.empty();
+ case READABLE_BYTE_CHANNEL:
+ return Optional.of(ByteChannelBodyWriter.create());
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Convert Jersey {@code OutputStream} to an entity based on the client request use case and submits to the provided
+ * {@code WebClientRequestBuilder}.
+ * @param type the type of the Helidon entity.
+ * @param requestContext Jersey {@link ClientRequest} providing the entity {@code OutputStream}.
+ * @param requestBuilder Helidon {@code WebClientRequestBuilder} which is used to submit the entity
+ * @param executorService {@link ExecutorService} that fills the entity instance for Helidon with data from Jersey
+ * {@code OutputStream}.
+ * @return Helidon Client response completion stage.
+ */
+ static CompletionStage<WebClientResponse> submit(HelidonEntityType type,
+ ClientRequest requestContext,
+ WebClientRequestBuilder requestBuilder,
+ ExecutorService executorService) {
+ CompletionStage<WebClientResponse> stage = null;
+ if (type != null) {
+ final int bufferSize = requestContext.resolveProperty(
+ ClientProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 8192);
+ switch (type) {
+ case BYTE_ARRAY_OUTPUT_STREAM:
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
+ requestContext.setStreamProvider(contentLength -> baos);
+ ((ProcessingRunnable) () -> requestContext.writeEntity()).run();
+ stage = requestBuilder.submit(baos);
+ break;
+ case READABLE_BYTE_CHANNEL:
+ final OutputStreamChannel channel = new OutputStreamChannel(bufferSize);
+ requestContext.setStreamProvider(contentLength -> channel);
+ executorService.execute((ProcessingRunnable) () -> requestContext.writeEntity());
+ stage = requestBuilder.submit(channel);
+ break;
+ case OUTPUT_STREAM_PUBLISHER:
+ final OutputStreamPublisher publisher = new OutputStreamPublisher();
+ requestContext.setStreamProvider(contentLength -> publisher);
+ executorService.execute((ProcessingRunnable) () -> {
+ requestContext.writeEntity();
+ publisher.close();
+ });
+ stage = requestBuilder.submit(Multi.from(publisher).map(DataChunk::create));
+ break;
+ }
+ }
+ return stage;
+ }
+
+ @FunctionalInterface
+ private interface ProcessingRunnable extends Runnable {
+ void runOrThrow() throws IOException;
+
+ @Override
+ default void run() {
+ try {
+ runOrThrow();
+ } catch (IOException e) {
+ throw new ProcessingException(LocalizationMessages.ERROR_WRITING_ENTITY(e.getMessage()), e);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static class OutputStreamBodyWriter implements MessageBodyWriter {
+ private OutputStreamBodyWriter() {
+ }
+
+ @Override
+ public Flow.Publisher<DataChunk> write(Single content, GenericType type, MessageBodyWriterContext context) {
+ context.contentType(MediaType.APPLICATION_OCTET_STREAM);
+ return content.flatMap(new ByteArrayOutputStreamToChunks());
+ }
+
+ @Override
+ public boolean accept(GenericType type, MessageBodyContext context) {
+ return ByteArrayOutputStream.class.isAssignableFrom(type.rawType());
+ }
+
+ private static class ByteArrayOutputStreamToChunks implements Function<ByteArrayOutputStream, Flow.Publisher<DataChunk>> {
+ @Override
+ public Flow.Publisher<DataChunk> apply(ByteArrayOutputStream byteArrayOutputStream) {
+ return ContentWriters.writeBytes(byteArrayOutputStream.toByteArray(), false);
+ }
+ }
+ }
+}
diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonProperties.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonProperties.java
new file mode 100644
index 0000000..62d0dbd
--- /dev/null
+++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonProperties.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.internal.util.PropertiesClass;
+
+import io.helidon.config.Config;
+import io.helidon.webclient.WebClient;
+
+/**
+ * Configuration options specific to the Client API that utilizes {@link HelidonConnector}
+ */
+@PropertiesClass
+public final class HelidonProperties {
+
+ /**
+ * A Helidon {@link Config} instance that is passed to {@link WebClient.Builder#config(Config)} if available
+ */
+ public static final String CONFIG = "jersey.config.helidon.client.config";
+}
diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonStructures.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonStructures.java
new file mode 100644
index 0000000..fbaffec
--- /dev/null
+++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonStructures.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import io.helidon.common.http.Headers;
+import io.helidon.common.http.Http;
+import io.helidon.common.http.ReadOnlyParameters;
+import io.helidon.config.Config;
+import io.helidon.config.ConfigSources;
+import io.helidon.media.common.InputStreamBodyReader;
+import io.helidon.media.common.MessageBodyReader;
+import io.helidon.webclient.Proxy;
+import io.helidon.webclient.Ssl;
+import io.helidon.webclient.WebClientResponse;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientRequest;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.core.Configuration;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Helidon specific classes and implementations.
+ */
+class HelidonStructures {
+
+ static Headers createHeaders(Map<String, List<String>> data) {
+ return new ReadOnlyHeaders(data);
+ }
+
+ static MessageBodyReader<InputStream> createInputStreamBodyReader() {
+ return InputStreamBodyReader.create();
+ }
+
+ static Optional<Config> helidonConfig(Configuration configuration) {
+ final Object helidonConfig = configuration.getProperty(HelidonProperties.CONFIG);
+ if (helidonConfig != null) {
+ if (!Config.class.isInstance(helidonConfig)) {
+ HelidonConnector.LOGGER.warning(LocalizationMessages.NOT_HELIDON_CONFIG(helidonConfig.getClass().getName()));
+ return Optional.empty();
+ } else {
+ return Optional.of((Config) helidonConfig);
+ }
+ }
+ return Optional.empty();
+ }
+
+ static Optional<Proxy> createProxy(Configuration config) {
+ return ProxyBuilder.createProxy(config);
+ }
+
+ static Optional<Proxy> createProxy(ClientRequest request) {
+ return ProxyBuilder.createProxy(request);
+ }
+
+ static Optional<Ssl> createSSL(SSLContext context) {
+ return context == null ? Optional.empty() : Optional.of(Ssl.builder().sslContext(context).build());
+ }
+
+ static boolean hasEntity(WebClientResponse webClientResponse) {
+ final ReadOnlyParameters headers = webClientResponse.content().readerContext().headers();
+ final Optional<String> contentLenth = headers.first(Http.Header.CONTENT_LENGTH);
+ final Optional<String> encoding = headers.first(Http.Header.TRANSFER_ENCODING);
+
+ return ((contentLenth.isPresent() && !contentLenth.get().equals("0"))
+ || (encoding.isPresent() && encoding.get().equals(HttpHeaderValues.CHUNKED.toString())));
+ }
+
+ private static class ReadOnlyHeaders extends ReadOnlyParameters implements Headers {
+ public ReadOnlyHeaders(Map<String, List<String>> data) {
+ super(data);
+ }
+ }
+
+ private static class ProxyBuilder {
+ private static Optional<Proxy> createProxy(Configuration config) {
+ final Object proxyUri = config.getProperty(ClientProperties.PROXY_URI);
+ final String userName
+ = ClientProperties.getValue(config.getProperties(), ClientProperties.PROXY_USERNAME, String.class);
+ final String password
+ = ClientProperties.getValue(config.getProperties(), ClientProperties.PROXY_PASSWORD, String.class);
+ return createProxy(proxyUri, userName, password);
+ }
+
+ private static Optional<Proxy> createProxy(ClientRequest clientRequest) {
+ final Object proxyUri = clientRequest.resolveProperty(ClientProperties.PROXY_URI, Object.class);
+ final String userName = clientRequest.resolveProperty(ClientProperties.PROXY_USERNAME, String.class);
+ final String password = clientRequest.resolveProperty(ClientProperties.PROXY_PASSWORD, String.class);
+ return createProxy(proxyUri, userName, password);
+ }
+
+ private static Optional<Proxy> createProxy(Object proxyUri, String userName, String password) {
+ if (proxyUri != null) {
+ final URI u = getProxyUri(proxyUri);
+ final Proxy.Builder builder = Proxy.builder();
+ Map<String, String> proxyMap;
+ if (u.getScheme().toUpperCase(Locale.ROOT).equals("DIRECT")) {
+ proxyMap = Map.of("type", "NONE");
+ //builder.type(Proxy.ProxyType.NONE);
+ } else {
+ builder.host(u.getHost()).port(u.getPort());
+ switch (u.getScheme().toUpperCase(Locale.ROOT)) {
+ case "HTTP":
+ proxyMap = Map.of("type", "HTTP");
+ //builder.type(Proxy.ProxyType.HTTP);
+ break;
+ case "SOCKS":
+ proxyMap = Map.of("type", "SOCKS_4");
+ //builder.type(Proxy.ProxyType.SOCKS_4);
+ break;
+ case "SOCKS5":
+ proxyMap = Map.of("type", "SOCKS_5");
+ //builder.type(Proxy.ProxyType.SOCKS_5);
+ break;
+ default:
+ HelidonConnector.LOGGER.warning(LocalizationMessages.UNSUPPORTED_PROXY_SCHEMA(u.getScheme()));
+ return Optional.empty();
+ }
+ builder.config(Config.create(ConfigSources.create(proxyMap)));
+ }
+ if (userName != null) {
+ builder.username(userName);
+
+ if (password != null) {
+ builder.password(password.toCharArray());
+ }
+ }
+ return Optional.of(builder.build());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ private static URI getProxyUri(final Object proxy) {
+ if (proxy instanceof URI) {
+ return (URI) proxy;
+ } else if (proxy instanceof String) {
+ return URI.create((String) proxy);
+ } else {
+ throw new ProcessingException(LocalizationMessages.WRONG_PROXY_URI_TYPE(ClientProperties.PROXY_URI));
+ }
+ }
+ }
+}
diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/OutputStreamChannel.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/OutputStreamChannel.java
new file mode 100644
index 0000000..152ac7f
--- /dev/null
+++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/OutputStreamChannel.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+class OutputStreamChannel extends OutputStream implements ReadableByteChannel {
+
+ private ReentrantLock lock = new ReentrantLock();
+ private static final ByteBuffer VOID = ByteBuffer.allocate(0);
+ private static final int CAPACITY = Integer.getInteger("jersey.helidon.connector.osc.capacity", 8);
+ private static final int WRITE_TIMEOUT = Integer.getInteger("jersey.helidon.connector.osc.read.timeout", 10000);
+ private static final int READ_TIMEOUT = Integer.getInteger("jersey.helidon.connector.osc.write.timeout", 10000);
+ private final int bufferSize;
+
+ OutputStreamChannel(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ private final LinkedBlockingDeque<ByteBuffer> queue = new LinkedBlockingDeque<>(CAPACITY);
+
+ private volatile boolean open = true;
+ private ByteBuffer remainingByteBuffer;
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ if (!open) {
+ throw new ClosedChannelException();
+ }
+
+ int sum = 0;
+
+ do {
+ ByteBuffer top;
+ try {
+ top = poll(READ_TIMEOUT, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ open = false;
+ throw new ClosedByInterruptException();
+ }
+
+ if (top == null) {
+ return sum;
+ }
+
+ if (top == VOID) {
+ if (sum == 0) {
+ open = false;
+ return -1;
+ } else {
+ queue.addFirst(top);
+ return sum;
+ }
+ }
+
+ final int topSize = top.remaining();
+ final int dstAvailable = dst.remaining();
+ final int minSize = Math.min(topSize, dstAvailable);
+
+ if (top.hasArray()) {
+ dst.put(top.array(), top.arrayOffset() + top.position(), minSize);
+ top.position(top.position() + minSize);
+ } else {
+ while (dst.hasRemaining() && top.hasRemaining()) {
+ dst.put(top.get());
+ }
+ }
+
+ sum += minSize;
+
+ if (top.hasRemaining()) {
+ remainingByteBuffer = top;
+ }
+ } while (dst.hasRemaining());
+
+ return sum;
+ }
+
+ private ByteBuffer poll(long timeout, TimeUnit unit) throws InterruptedException {
+ if (remainingByteBuffer != null) {
+ final ByteBuffer remaining = remainingByteBuffer;
+ remainingByteBuffer = null;
+ return remaining;
+ } else {
+ // do not modify head
+ lock.lock();
+ final ByteBuffer peek = queue.poll(timeout, unit);
+ // can modify head
+ lock.unlock();
+ return peek;
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ write(new byte[]{(byte) b}, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ super.write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ checkClosed();
+
+ if (lock.tryLock()) {
+ if (len < bufferSize && queue.size() > 0) {
+ final ByteBuffer buffer = queue.getLast();
+ if (buffer != null && (buffer.capacity() - buffer.limit()) > len) {
+ //set for write
+ buffer.position(buffer.limit());
+ buffer.limit(buffer.capacity());
+ buffer.put(b, off, len);
+ //set for read
+ buffer.flip();
+ lock.unlock();
+ return;
+ }
+ }
+ lock.unlock();
+ }
+
+ final int maxLen = Math.max(len, bufferSize);
+ final byte[] bytes = new byte[maxLen];
+ System.arraycopy(b, off, bytes, 0, len);
+
+ final ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ buffer.limit(len);
+ buffer.position(0);
+
+ write(buffer);
+ }
+
+ private void write(ByteBuffer buffer) throws IOException {
+ try {
+ boolean queued = queue.offer(buffer, WRITE_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (!queued) {
+ throw new IOException("Buffer overflow.");
+ }
+
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ boolean offer = false;
+
+ try {
+ offer = queue.offer(VOID, WRITE_TIMEOUT, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // ignore.
+ }
+
+ if (!offer) {
+ lock.lock();
+ queue.removeLast();
+ queue.add(VOID);
+ lock.unlock();
+ }
+ }
+
+
+ @Override
+ public boolean isOpen() {
+ return open;
+ }
+
+ private void checkClosed() throws IOException {
+ if (!open) {
+ throw new IOException("Stream already closed.");
+ }
+ }
+}
diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/package-info.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/package-info.java
new file mode 100644
index 0000000..9dc13ef
--- /dev/null
+++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+/**
+ * Jersey client {@link org.glassfish.jersey.client.spi.Connector connector} based on Helidon Web Client.
+ */
+package org.glassfish.jersey.helidon.connector;
diff --git a/connectors/helidon-connector/src/main/resources/org.glassfish.jersey.helidon.connector/localization.properties b/connectors/helidon-connector/src/main/resources/org.glassfish.jersey.helidon.connector/localization.properties
new file mode 100644
index 0000000..f962cbc
--- /dev/null
+++ b/connectors/helidon-connector/src/main/resources/org.glassfish.jersey.helidon.connector/localization.properties
@@ -0,0 +1,21 @@
+#
+# Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+#
+# This program and the accompanying materials are made available under the
+# terms of the Eclipse Public License v. 2.0, which is available at
+# http://www.eclipse.org/legal/epl-2.0.
+#
+# This Source Code may also be made available under the following Secondary
+# Licenses when the conditions for such availability set forth in the
+# Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+# version 2 with the GNU Classpath Exception, which is available at
+# https://www.gnu.org/software/classpath/license.html.
+#
+# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+#
+
+error.writing.entity=Error writing entity: {0}.
+not.helidon.config=Given instance {0} is not Helidon config. Provided HelidonProperties.CONFIG is ignored.
+not.supported=Helidon connector is not supported on JDK version < 11.
+unsupported.proxy.schema=Proxy schema "{0}" not supported.
+wrong.proxy.uri.type=The proxy URI ("{0}") property MUST be an instance of String or URI.
\ No newline at end of file
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/AsyncTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/AsyncTest.java
new file mode 100644
index 0000000..b6189db
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/AsyncTest.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.logging.LoggingFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.container.TimeoutHandler;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Asynchronous connector test.
+ *
+ * @author Arul Dhesiaseelan (aruld at acm.org)
+ * @author Marek Potociar
+ */
+public class AsyncTest extends JerseyTest {
+
+ private static final Logger LOGGER = Logger.getLogger(AsyncTest.class.getName());
+ private static final String PATH = "async";
+
+ /**
+ * Asynchronous test resource.
+ */
+ @Path(PATH)
+ public static class AsyncResource {
+
+ /**
+ * Typical long-running operation duration.
+ */
+ public static final long OPERATION_DURATION = 1000;
+
+ /**
+ * Long-running asynchronous post.
+ *
+ * @param asyncResponse async response.
+ * @param id post request id (received as request payload).
+ */
+ @POST
+ public void asyncPost(@Suspended final AsyncResponse asyncResponse, final String id) {
+ LOGGER.info("Long running post operation called with id " + id + " on thread " + Thread.currentThread().getName());
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ final String result = veryExpensiveOperation();
+ asyncResponse.resume(result);
+ }
+
+ private String veryExpensiveOperation() {
+ // ... very expensive operation that typically finishes within 1 seconds, simulated using sleep()
+ try {
+ Thread.sleep(OPERATION_DURATION);
+ return "DONE-" + id;
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return "INTERRUPTED-" + id;
+ } finally {
+ LOGGER.info("Long running post operation finished on thread " + Thread.currentThread().getName());
+ }
+ }
+ }, "async-post-runner-" + id).start();
+ }
+
+ /**
+ * Long-running async get request that times out.
+ *
+ * @param asyncResponse async response.
+ */
+ @GET
+ @Path("timeout")
+ public void asyncGetWithTimeout(@Suspended final AsyncResponse asyncResponse) {
+ LOGGER.info("Async long-running get with timeout called on thread " + Thread.currentThread().getName());
+ asyncResponse.setTimeoutHandler(new TimeoutHandler() {
+
+ @Override
+ public void handleTimeout(final AsyncResponse asyncResponse) {
+ asyncResponse.resume(Response.status(Response.Status.SERVICE_UNAVAILABLE)
+ .entity("Operation time out.").build());
+ }
+ });
+ asyncResponse.setTimeout(1, TimeUnit.SECONDS);
+
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ final String result = veryExpensiveOperation();
+ asyncResponse.resume(result);
+ }
+
+ private String veryExpensiveOperation() {
+ // very expensive operation that typically finishes within 1 second but can take up to 5 seconds,
+ // simulated using sleep()
+ try {
+ Thread.sleep(5 * OPERATION_DURATION);
+ return "DONE";
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return "INTERRUPTED";
+ } finally {
+ LOGGER.info("Async long-running get with timeout finished on thread " + Thread.currentThread().getName());
+ }
+ }
+ }).start();
+ }
+
+ }
+
+ @Override
+ protected Application configure() {
+ return new ResourceConfig(AsyncResource.class)
+ .register(new LoggingFeature(LOGGER, LoggingFeature.Verbosity.PAYLOAD_ANY));
+ }
+
+ @Override
+ protected void configureClient(final ClientConfig config) {
+ config.register(new LoggingFeature(LOGGER, LoggingFeature.Verbosity.PAYLOAD_ANY));
+ config.connectorProvider(new HelidonConnectorProvider());
+ }
+
+ /**
+ * Test asynchronous POST.
+ * <p/>
+ * Send 3 async POST requests and wait to receive the responses. Check the response content and
+ * assert that the operation did not take more than twice as long as a single long operation duration
+ * (this ensures async request execution).
+ *
+ * @throws Exception in case of a test error.
+ */
+ @Test
+ public void testAsyncPost() throws Exception {
+ final Future<Response> warmUp1 = target(PATH).request().async().post(Entity.text("100"));
+ final Future<Response> warmUp2 = target(PATH).request().async().post(Entity.text("200"));
+ final Future<Response> warmUp3 = target(PATH).request().async().post(Entity.text("300"));
+
+ assertEquals("DONE-100", warmUp1.get().readEntity(String.class));
+ assertEquals("DONE-200", warmUp2.get().readEntity(String.class));
+ assertEquals("DONE-300", warmUp3.get().readEntity(String.class));
+
+ final long tic = System.currentTimeMillis();
+
+ // Submit requests asynchronously.
+ final Future<Response> rf1 = target(PATH).request().async().post(Entity.text("1"));
+ final Future<Response> rf2 = target(PATH).request().async().post(Entity.text("2"));
+ final Future<Response> rf3 = target(PATH).request().async().post(Entity.text("3"));
+
+ // get() waits for the response
+ final String r1 = rf1.get().readEntity(String.class);
+ final String r2 = rf2.get().readEntity(String.class);
+ final String r3 = rf3.get().readEntity(String.class);
+
+ final long toc = System.currentTimeMillis();
+
+ assertEquals("DONE-1", r1);
+ assertEquals("DONE-2", r2);
+ assertEquals("DONE-3", r3);
+
+ assertThat("Async processing took too long.", toc - tic, Matchers.lessThan(3 * AsyncResource.OPERATION_DURATION));
+ }
+
+ /**
+ * Test accessing an operation that times out on the server.
+ *
+ * @throws Exception in case of a test error.
+ */
+ @Test
+ public void testAsyncGetWithTimeout() throws Exception {
+ final Future<Response> responseFuture = target(PATH).path("timeout").request().async().get();
+ // Request is being processed asynchronously.
+ final Response response = responseFuture.get();
+
+ // get() waits for the response
+ assertEquals(503, response.getStatus());
+ assertEquals("Operation time out.", response.readEntity(String.class));
+ }
+}
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/BasicHelidonConnectorTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/BasicHelidonConnectorTest.java
new file mode 100644
index 0000000..8b42df1
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/BasicHelidonConnectorTest.java
@@ -0,0 +1,308 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.JerseyCompletionStageRxInvoker;
+import org.glassfish.jersey.logging.LoggingFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(Parameterized.class)
+public class BasicHelidonConnectorTest extends JerseyTest {
+
+ private final String entityType;
+
+ public BasicHelidonConnectorTest(String entityType) {
+ this.entityType = entityType;
+ }
+
+ @Parameterized.Parameters
+ public static Object[] data() {
+ return new Object[]{"BYTE_ARRAY_OUTPUT_STREAM", "READABLE_BYTE_CHANNEL", "OUTPUT_STREAM_PUBLISHER"};
+ }
+
+ @Path("basic")
+ public static class BasicResource {
+ @Path("get")
+ @GET
+ public String get() {
+ return "ok";
+ }
+
+ @Path("getquery")
+ @GET
+ public String getQuery(@QueryParam("first") String first, @QueryParam("second") String second) {
+ return first + second;
+ }
+
+ @POST
+ @Path("post")
+ public String post(String entity) {
+ return entity + entity;
+ }
+
+ @GET
+ @Path("headers")
+ public Response headers(@Context HttpHeaders headers) {
+ Response.ResponseBuilder response = Response.ok("ok");
+ for (Map.Entry<String, List<String>> set : headers.getRequestHeaders().entrySet()) {
+ if (set.getKey().toUpperCase(Locale.ROOT).startsWith("X-TEST")) {
+ response.header(set.getKey(), set.getValue().iterator().next());
+ }
+ }
+ return response.build();
+ }
+
+ @PUT
+ @Consumes("test/x-test")
+ @Produces("test/y-test")
+ @Path("produces/consumes")
+ public String putConsumesProduces(String content) {
+ return content + content;
+ }
+ }
+
+ @Path("async")
+ public static class AsyncResource {
+ private static CountDownLatch shortLong = null;
+
+ @GET
+ @Path("reset")
+ public void reset() {
+ shortLong = new CountDownLatch(1);
+ }
+
+ @Path("long")
+ @GET
+ public String longGet() throws InterruptedException {
+ shortLong.await(10000, TimeUnit.MILLISECONDS);
+ return shortLong.getCount() == 0 ? "long" : "shortLong CountDownLatch has not been hit";
+ }
+
+ @Path("short")
+ @GET
+ public String shortGet() {
+ shortLong.countDown();
+ return "short";
+ }
+ }
+
+ @Override
+ protected Application configure() {
+ return new ResourceConfig(BasicResource.class, AsyncResource.class)
+ .property(LoggingFeature.LOGGING_FEATURE_LOGGER_LEVEL_SERVER, "WARNING");
+ }
+
+ @Override
+ protected void configureClient(ClientConfig config) {
+ super.configureClient(config);
+ config.connectorProvider(new HelidonConnectorProvider());
+ config.property("jersey.config.helidon.client.entity.type", entityType);
+ }
+
+ @Test
+ public void testBasicGet() {
+ try (Response response = target("basic").path("get").request().get()) {
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals("ok", response.readEntity(String.class));
+ }
+ }
+
+ @Test
+ public void testBasicPost() {
+ try (Response response = target("basic").path("post").request()
+ .buildPost(Entity.entity("ok", MediaType.TEXT_PLAIN_TYPE)).invoke()) {
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals("okok", response.readEntity(String.class));
+ }
+ }
+
+ @Test
+ public void queryGetTest() {
+ try (Response response = target("basic").path("getquery")
+ .queryParam("first", "hello")
+ .queryParam("second", "world")
+ .request().get()) {
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals("helloworld", response.readEntity(String.class));
+ }
+ }
+
+ @Test
+ public void testHeaders() {
+ String[][] headers = new String[][]{{"X-TEST-ONE", "ONE"}, {"X-TEST-TWO", "TWO"}, {"X-TEST-THREE", "THREE"}};
+ MultivaluedHashMap<String, Object> map = new MultivaluedHashMap<>();
+ Arrays.stream(headers).forEach(a -> map.add(a[0], a[1]));
+ try (Response response = target("basic").path("headers").request().headers(map).get()) {
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals("ok", response.readEntity(String.class));
+ for (int i = 0; i != headers.length; i++) {
+ Assert.assertTrue(response.getHeaders().containsKey(headers[i][0]));
+ Assert.assertEquals(headers[i][1], response.getStringHeaders().getFirst(headers[i][0]));
+ }
+ }
+ }
+
+ @Test
+ public void testProduces() {
+ try (Response response = target("basic").path("produces/consumes").request("test/z-test")
+ .put(Entity.entity("ok", new MediaType("test", "x-test")))) {
+ Assert.assertEquals(406, response.getStatus());
+ }
+
+ try (Response response = target("basic").path("produces/consumes").request()
+ .put(Entity.entity("ok", new MediaType("test", "x-test")))) {
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals("okok", response.readEntity(String.class));
+ Assert.assertEquals("test/y-test", response.getStringHeaders().getFirst(HttpHeaders.CONTENT_TYPE));
+ }
+ }
+
+ @Test
+ public void testAsyncGet() throws ExecutionException, InterruptedException {
+ Future<Response> futureResponse = target("basic").path("get").request().async().get();
+ try (Response response = futureResponse.get()) {
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals("ok", response.readEntity(String.class));
+ }
+ }
+
+ @Test
+ public void testConsumes() {
+ try (Response response = target("basic").path("produces/consumes").request("test/y-test")
+ .put(Entity.entity("ok", new MediaType("test", "z-test")))) {
+ Assert.assertEquals(415, response.getStatus());
+ }
+
+ try (Response response = target("basic").path("produces/consumes").request("test/y-test")
+ .put(Entity.entity("ok", MediaType.WILDCARD_TYPE))) {
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals("okok", response.readEntity(String.class));
+ Assert.assertEquals("test/y-test", response.getStringHeaders().getFirst(HttpHeaders.CONTENT_TYPE));
+ }
+ }
+
+ @Test
+ public void testRxGet() throws ExecutionException, InterruptedException {
+ CompletableFuture<Response> futureResponse =
+ target("basic").path("get").request().rx(JerseyCompletionStageRxInvoker.class).get();
+
+ try (Response response = futureResponse.get()) {
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals("ok", response.readEntity(String.class));
+ }
+ }
+
+ @Test
+ public void testInputStreamEntity() throws IOException {
+ try (Response response = target("basic").path("get").request().get()) {
+ Assert.assertEquals(200, response.getStatus());
+ InputStream is = response.readEntity(InputStream.class);
+ Assert.assertEquals('o', is.read());
+ Assert.assertEquals('k', is.read());
+ is.close();
+ }
+ }
+
+ // -----------Async
+
+ @Test
+ public void testTwoClientsAsync() throws ExecutionException, InterruptedException {
+ try (Response resetResponse = target("async").path("reset").request().get()) {
+ Assert.assertEquals(204, resetResponse.getStatus());
+ }
+
+ ClientConfig config = new ClientConfig();
+ config.connectorProvider(new HelidonConnectorProvider());
+
+ Client longClient = ClientBuilder.newClient(config);
+ Invocation.Builder longRequest = longClient.target(getBaseUri()).path("async/long").request();
+
+ Client shortClient = ClientBuilder.newClient(config);
+ Invocation.Builder shortRequest = shortClient.target(getBaseUri()).path("async/short").request();
+
+ Future<Response> futureLongResponse = longRequest.async().get();
+ Future<Response> futureShortResponse = shortRequest.async().get();
+
+ try (Response shortResponse = futureShortResponse.get()) {
+ Assert.assertEquals(200, shortResponse.getStatus());
+ Assert.assertEquals("short", shortResponse.readEntity(String.class));
+ }
+
+ try (Response longResponse = futureLongResponse.get()) {
+ Assert.assertEquals(200, longResponse.getStatus());
+ Assert.assertEquals("long", longResponse.readEntity(String.class));
+ }
+ }
+
+ @Test
+ public void testOneClientsTwoReqestsAsync() throws ExecutionException, InterruptedException {
+ try (Response resetResponse = target("async").path("reset").request().get()) {
+ Assert.assertEquals(204, resetResponse.getStatus());
+ }
+
+ Invocation.Builder longRequest = target().path("async/long").request();
+ Invocation.Builder shortRequest = target().path("async/short").request();
+
+ Future<Response> futureLongResponse = longRequest.async().get();
+ Future<Response> futureShortResponse = shortRequest.async().get();
+
+ try (Response shortResponse = futureShortResponse.get()) {
+ Assert.assertEquals(200, shortResponse.getStatus());
+ Assert.assertEquals("short", shortResponse.readEntity(String.class));
+ }
+
+ try (Response longResponse = futureLongResponse.get()) {
+ Assert.assertEquals(200, longResponse.getStatus());
+ Assert.assertEquals("long", longResponse.readEntity(String.class));
+ }
+ }
+}
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/FollowRedirectsTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/FollowRedirectsTest.java
new file mode 100644
index 0000000..87105d6
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/FollowRedirectsTest.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientResponse;
+import org.glassfish.jersey.logging.LoggingFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Test;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.client.ClientRequestContext;
+import javax.ws.rs.client.ClientResponseContext;
+import javax.ws.rs.client.ClientResponseFilter;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Helidon connector follow redirect tests.
+ *
+ * @author Martin Matula
+ * @author Arul Dhesiaseelan (aruld at acm.org)
+ * @author Marek Potociar
+ */
+public class FollowRedirectsTest extends JerseyTest {
+ private static final Logger LOGGER = Logger.getLogger(TimeoutTest.class.getName());
+
+ @Path("/test")
+ public static class RedirectResource {
+ @GET
+ public String get() {
+ return "GET";
+ }
+
+ @GET
+ @Path("redirect")
+ public Response redirect() {
+ return Response.seeOther(UriBuilder.fromResource(RedirectResource.class).build()).build();
+ }
+ }
+
+ @Override
+ protected Application configure() {
+ ResourceConfig config = new ResourceConfig(RedirectResource.class);
+ config.register(new LoggingFeature(LOGGER, LoggingFeature.Verbosity.PAYLOAD_ANY));
+ return config;
+ }
+
+ @Override
+ protected void configureClient(ClientConfig config) {
+ config.connectorProvider(new HelidonConnectorProvider());
+ }
+
+ private static class RedirectTestFilter implements ClientResponseFilter {
+ public static final String RESOLVED_URI_HEADER = "resolved-uri";
+
+ @Override
+ public void filter(ClientRequestContext requestContext, ClientResponseContext responseContext) throws IOException {
+ if (responseContext instanceof ClientResponse) {
+ ClientResponse clientResponse = (ClientResponse) responseContext;
+ responseContext.getHeaders().putSingle(RESOLVED_URI_HEADER, clientResponse.getResolvedRequestUri().toString());
+ }
+ }
+ }
+
+ @Test
+ public void testDoFollow() {
+ Response r = target("test/redirect").register(RedirectTestFilter.class).request().get();
+ assertEquals(200, r.getStatus());
+ assertEquals("GET", r.readEntity(String.class));
+
+ assertEquals(
+ UriBuilder.fromUri(getBaseUri()).path(RedirectResource.class).build().toString(),
+ r.getHeaderString(RedirectTestFilter.RESOLVED_URI_HEADER));
+ }
+
+ @Test
+ public void testDontFollow() {
+ WebTarget t = target("test/redirect");
+ t.property(ClientProperties.FOLLOW_REDIRECTS, false);
+ assertEquals(303, t.request().get().getStatus());
+ }
+}
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/LargeDataTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/LargeDataTest.java
new file mode 100644
index 0000000..8a7a690
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/LargeDataTest.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.logging.LoggingFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.ServerErrorException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.logging.Logger;
+
+/**
+ * The LargeDataTest reproduces a problem when bytes of large data sent are incorrectly sent.
+ * As a result, the request body is different than what was sent by the client.
+ * <p>
+ * In order to be able to inspect the request body, the generated data is a sequence of numbers
+ * delimited with new lines. Such as
+ * <pre><code>
+ * 1
+ * 2
+ * 3
+ *
+ * ...
+ *
+ * 57234
+ * 57235
+ * 57236
+ *
+ * ...
+ * </code></pre>
+ * It is also possible to send the data to netcat: {@code nc -l 8080} and verify the problem is
+ * on the client side.
+ *
+ * @author Stepan Vavra
+ * @author Marek Potociar
+ */
+public class LargeDataTest extends JerseyTest {
+
+ private static final Logger LOGGER = Logger.getLogger(LargeDataTest.class.getName());
+ private static final int LONG_DATA_SIZE = 100_000; // for large set around 5GB, try e.g.: 536_870_912;
+ private static volatile Throwable exception;
+
+ private static StreamingOutput longData(long sequence) {
+ return out -> {
+ long offset = 0;
+ while (offset < sequence) {
+ out.write(Long.toString(offset).getBytes());
+ out.write('\n');
+ offset++;
+ }
+ };
+ }
+
+ @Override
+ protected Application configure() {
+ ResourceConfig config = new ResourceConfig(HttpMethodResource.class);
+ config.register(new LoggingFeature(LOGGER, LoggingFeature.Verbosity.HEADERS_ONLY));
+ return config;
+ }
+
+ @Override
+ protected void configureClient(ClientConfig config) {
+ config.connectorProvider(new HelidonConnectorProvider());
+ }
+
+ @Test
+ public void postWithLargeData() throws Throwable {
+ long milis = System.currentTimeMillis();
+ WebTarget webTarget = target("test");
+
+ Response response = webTarget.request().post(Entity.entity(longData(LONG_DATA_SIZE), MediaType.TEXT_PLAIN_TYPE));
+
+ try {
+ if (exception != null) {
+
+ // the reason to throw the exception is that IntelliJ gives you an option to compare the expected with the actual
+ throw exception;
+ }
+
+ Assert.assertEquals("Unexpected error: " + response.getStatus(),
+ Status.Family.SUCCESSFUL,
+ response.getStatusInfo().getFamily());
+ } finally {
+ response.close();
+ }
+ if (LONG_DATA_SIZE > 9_999) {
+ System.out.println("Large Data Test took " + (System.currentTimeMillis() - milis) + "milis");
+ }
+ }
+
+ @Path("/test")
+ public static class HttpMethodResource {
+
+ @POST
+ public Response post(InputStream content) {
+ try {
+ longData(LONG_DATA_SIZE).write(new OutputStream() {
+
+ private long position = 0;
+// private long mbRead = 0;
+
+ @Override
+ public void write(final int generated) throws IOException {
+ int received = content.read();
+
+ if (received != generated) {
+ throw new IOException("Bytes don't match at position " + position
+ + ": received=" + received
+ + ", generated=" + generated);
+ }
+
+// position++;
+// System.out.println("position" + position);
+// if (position % (1024 * 1024) == 0) {
+// mbRead++;
+// System.out.println("MB read: " + mbRead);
+// }
+ }
+ });
+ } catch (IOException e) {
+ exception = e;
+ throw new ServerErrorException(e.getMessage(), 500, e);
+ }
+
+ return Response.ok().build();
+ }
+
+ }
+}
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/ParallelTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/ParallelTest.java
new file mode 100644
index 0000000..94a1d26
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/ParallelTest.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the parallel execution of multiple requests.
+ *
+ * @author Stepan Kopriva
+ */
+public class ParallelTest extends JerseyTest {
+ private static final Logger LOGGER = Logger.getLogger(ParallelTest.class.getName());
+
+ private static final int PARALLEL_CLIENTS = 10;
+ private static final String PATH = "test";
+ private static final AtomicInteger receivedCounter = new AtomicInteger(0);
+ private static final AtomicInteger resourceCounter = new AtomicInteger(0);
+ private static final CyclicBarrier startBarrier = new CyclicBarrier(PARALLEL_CLIENTS + 1);
+ private static final CountDownLatch doneLatch = new CountDownLatch(PARALLEL_CLIENTS);
+
+ @Path(PATH)
+ public static class MyResource {
+
+ @GET
+ public String get() {
+ sleep();
+ resourceCounter.addAndGet(1);
+ return "GET";
+ }
+
+ private void sleep() {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ex) {
+ Logger.getLogger(ParallelTest.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+ }
+
+ @Override
+ protected Application configure() {
+ return new ResourceConfig(ParallelTest.MyResource.class);
+ }
+
+ @Override
+ protected void configureClient(ClientConfig config) {
+ config.connectorProvider(new HelidonConnectorProvider());
+ }
+
+ @Test
+ public void testParallel() throws BrokenBarrierException, InterruptedException, TimeoutException {
+ final ScheduledExecutorService executor = Executors.newScheduledThreadPool(PARALLEL_CLIENTS);
+
+ try {
+ final WebTarget target = target();
+ for (int i = 1; i <= PARALLEL_CLIENTS; i++) {
+ final int id = i;
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ startBarrier.await();
+ Response response;
+ response = target.path(PATH).request().get();
+ assertEquals("GET", response.readEntity(String.class));
+ receivedCounter.incrementAndGet();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ LOGGER.log(Level.WARNING, "Client thread " + id + " interrupted.", ex);
+ } catch (BrokenBarrierException ex) {
+ LOGGER.log(Level.INFO, "Client thread " + id + " failed on broken barrier.", ex);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ LOGGER.log(Level.WARNING, "Client thread " + id + " failed on unexpected exception.", t);
+ } finally {
+ doneLatch.countDown();
+ }
+ }
+ });
+ }
+
+ startBarrier.await(1, TimeUnit.SECONDS);
+
+ assertTrue("Waiting for clients to finish has timed out.", doneLatch.await(5 * getAsyncTimeoutMultiplier(),
+ TimeUnit.SECONDS));
+
+ assertEquals("Resource counter", PARALLEL_CLIENTS, resourceCounter.get());
+
+ assertEquals("Received counter", PARALLEL_CLIENTS, receivedCounter.get());
+ } finally {
+ executor.shutdownNow();
+ Assert.assertTrue("Executor termination", executor.awaitTermination(5, TimeUnit.SECONDS));
+ }
+ }
+}
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/TimeoutTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/TimeoutTest.java
new file mode 100644
index 0000000..1fe6374
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/TimeoutTest.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Martin Matula
+ */
+public class TimeoutTest extends JerseyTest {
+ @Path("/test")
+ public static class TimeoutResource {
+ @GET
+ public String get() {
+ return "GET";
+ }
+
+ @GET
+ @Path("timeout")
+ public String getTimeout() {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return "GET";
+ }
+ }
+
+ @Override
+ protected Application configure() {
+ return new ResourceConfig(TimeoutResource.class);
+ }
+
+ @Override
+ protected void configureClient(ClientConfig config) {
+ config.connectorProvider(new HelidonConnectorProvider());
+ }
+
+ @Test
+ public void testFast() {
+ Response r = target("test").request().get();
+ assertEquals(200, r.getStatus());
+ assertEquals("GET", r.readEntity(String.class));
+ }
+
+ @Test
+ public void testSlow() {
+ try {
+ target("test/timeout").property(ClientProperties.READ_TIMEOUT, 1_000).request().get();
+ fail("Timeout expected.");
+ } catch (ProcessingException e) {
+ assertTimeoutException(e);
+ }
+ }
+
+ @Ignore
+ // TODO - WebClient change request
+ public void testTimeoutInRequest() {
+ try {
+ target("test/timeout").request().property(ClientProperties.READ_TIMEOUT, 1_000).get();
+ fail("Timeout expected.");
+ } catch (ProcessingException e) {
+ assertTimeoutException(e);
+ }
+ }
+
+ private void assertTimeoutException(Exception e) {
+ String exceptionName = "TimeoutException"; // check netty or JDK TimeoutException
+ Throwable t = e.getCause();
+ while (t != null) {
+ if (t.getClass().getSimpleName().contains(exceptionName)) {
+ break;
+ }
+ t = t.getCause();
+ }
+ if (t == null) {
+ if (e.getCause() != null) {
+ if (e.getCause().getCause() != null) {
+ fail("Unexpected processing exception cause" + e.getCause().getCause().getMessage());
+ } else {
+ fail("Unexpected processing exception cause" + e.getCause().getMessage());
+ }
+ } else {
+ fail("Unexpected processing exception cause" + e.getMessage());
+ }
+ }
+ }
+}
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/EventOutputTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/EventOutputTest.java
new file mode 100644
index 0000000..31ceb61
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/EventOutputTest.java
@@ -0,0 +1,274 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector.sse;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.helidon.connector.HelidonConnectorProvider;
+import org.glassfish.jersey.media.sse.EventInput;
+import org.glassfish.jersey.media.sse.EventListener;
+import org.glassfish.jersey.media.sse.EventOutput;
+import org.glassfish.jersey.media.sse.EventSource;
+import org.glassfish.jersey.media.sse.InboundEvent;
+import org.glassfish.jersey.media.sse.OutboundEvent;
+import org.glassfish.jersey.media.sse.SseFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Event output tests.
+ *
+ * @author Pavel Bucek
+ * @author Marek Potociar
+ */
+public class EventOutputTest extends JerseyTest {
+
+ @Override
+ protected Application configure() {
+ return new ResourceConfig(SseTestResource.class, SseFeature.class);
+ }
+
+ @Override
+ protected void configureClient(ClientConfig config) {
+ config.register(SseFeature.class);
+ config.connectorProvider(new HelidonConnectorProvider());
+ }
+
+ /**
+ * SSE Test resource.
+ */
+ @Path("test")
+ @Produces(SseFeature.SERVER_SENT_EVENTS)
+ public static class SseTestResource {
+
+ @GET
+ @Path("single")
+ public EventOutput getSingleEvent() {
+ final EventOutput output = new EventOutput();
+ try {
+ return output;
+ } finally {
+ new Thread() {
+ public void run() {
+ try {
+ output.write(new OutboundEvent.Builder().data(String.class, "single").build());
+ output.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+ }.start();
+ }
+ }
+
+ @GET
+ @Path("closed-single")
+ public EventOutput getClosedSingleEvent() throws IOException {
+ final EventOutput output = new EventOutput();
+ output.write(new OutboundEvent.Builder().data(String.class, "closed").build());
+ output.close();
+ return output;
+ }
+
+ @GET
+ @Path("closed-empty")
+ public EventOutput getClosedEmpty() throws IOException {
+ final EventOutput output = new EventOutput();
+ output.close();
+ return output;
+ }
+
+ @GET
+ @Path("charset")
+ @Produces("text/event-stream;charset=utf-8")
+ public EventOutput getSseWithCharset() throws IOException {
+ final EventOutput output = new EventOutput();
+ output.write(new OutboundEvent.Builder().data(String.class, "charset").build());
+ output.close();
+ return output;
+ }
+
+ @GET
+ @Path("comments-only")
+ public EventOutput getCommentsOnlyStream() throws IOException {
+ final EventOutput output = new EventOutput();
+ output.write(new OutboundEvent.Builder().comment("No comment #1").build());
+ output.write(new OutboundEvent.Builder().comment("No comment #2").build());
+ output.close();
+ return output;
+ }
+ }
+
+ @Test
+ @Ignore //2.0.0-M3
+ public void testReadSseEventAsPlainString() throws Exception {
+ final Response r = target().path("test/single").request().get(Response.class);
+ assertThat(r.readEntity(String.class), containsString("single"));
+ }
+
+ /**
+ * Reproducer for JERSEY-2912: Sending and receiving comments-only events.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReadCommentsOnlySseEvents() throws Exception {
+ ClientConfig clientConfig = new ClientConfig();
+ clientConfig.property(ClientProperties.CONNECT_TIMEOUT, 15000);
+ clientConfig.property(ClientProperties.READ_TIMEOUT, 0);
+ clientConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
+ clientConfig.connectorProvider(new HelidonConnectorProvider());
+ Client client = ClientBuilder.newBuilder().withConfig(clientConfig).build();
+
+ final CountDownLatch latch = new CountDownLatch(2);
+ final Queue<String> eventComments = new ArrayBlockingQueue<>(2);
+ WebTarget single = client.target(getBaseUri()).path("test/comments-only");
+ EventSource es = EventSource.target(single).build();
+ es.register(new EventListener() {
+ @Override
+ public void onEvent(InboundEvent inboundEvent) {
+ eventComments.add(inboundEvent.getComment());
+ latch.countDown();
+ }
+ });
+
+ boolean latchTimedOut;
+ boolean closeTimedOut;
+ try {
+ es.open();
+ latchTimedOut = latch.await(5 * getAsyncTimeoutMultiplier(), TimeUnit.SECONDS);
+ } finally {
+ closeTimedOut = es.close(5, TimeUnit.SECONDS);
+ }
+
+ assertEquals("Unexpected event count", 2, eventComments.size());
+ for (int i = 1; i <= 2; i++) {
+ assertEquals("Unexpected comment data on event #" + i, "No comment #" + i, eventComments.poll());
+ }
+ assertTrue("Event latch has timed out", latchTimedOut);
+ assertTrue("EventSource.close() has timed out", closeTimedOut);
+ }
+
+ @Test
+ @Ignore // 2.0.0-M3
+ public void testReadFromClosedOutput() throws Exception {
+ /**
+ * Need to disable HTTP Keep-Alive to prevent this test from hanging in HttpURLConnection
+ * due to an attempt to read from a stale, out-of-sync connection closed by the server.
+ * Thus setting the "Connection: close" HTTP header on all requests.
+ */
+ Response r;
+ r = target().path("test/closed-empty").request().header("Connection", "close").get();
+ assertTrue(r.readEntity(String.class).isEmpty());
+
+ r = target().path("test/closed-single").request().header("Connection", "close").get();
+ assertTrue(r.readEntity(String.class).contains("closed"));
+
+ //
+
+ EventInput input;
+ input = target().path("test/closed-single").request().header("Connection", "close").get(EventInput.class);
+ assertEquals("closed", input.read().readData());
+ assertEquals(null, input.read());
+ assertTrue(input.isClosed());
+
+ input = target().path("test/closed-empty").request().header("Connection", "close").get(EventInput.class);
+ assertEquals(null, input.read());
+ assertTrue(input.isClosed());
+ }
+
+ @Test
+ public void testSseContentTypeWithCharset() {
+ /**
+ * Need to disable HTTP Keep-Alive to prevent this test from hanging in HttpURLConnection
+ * due to an attempt to read from a stale, out-of-sync connection closed by the server.
+ * Thus setting the "Connection: close" HTTP header on all requests.
+ */
+ Response r;
+ r = target().path("test/charset").request().header("Connection", "close").get();
+ assertTrue(r.getMediaType().getParameters().get("charset").equalsIgnoreCase("utf-8"));
+ final EventInput eventInput = r.readEntity(EventInput.class);
+ String eventData = eventInput.read().readData();
+ assertEquals("charset", eventData);
+ eventInput.close();
+ }
+
+ @Test
+ public void testConnectorWithEventSource() throws InterruptedException {
+ ClientConfig clientConfig = new ClientConfig();
+ clientConfig.property(ClientProperties.CONNECT_TIMEOUT, 15000);
+ clientConfig.property(ClientProperties.READ_TIMEOUT, 0);
+ clientConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
+ clientConfig.connectorProvider(new HelidonConnectorProvider());
+ Client client = ClientBuilder.newBuilder().withConfig(clientConfig).build();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<String> eventData = new AtomicReference<String>();
+ final AtomicInteger counter = new AtomicInteger(0);
+ WebTarget single = client.target(getBaseUri()).path("test/single");
+ EventSource es = EventSource.target(single).build();
+ es.register(new EventListener() {
+ @Override
+ public void onEvent(InboundEvent inboundEvent) {
+ final int i = counter.incrementAndGet();
+ if (i == 1) {
+ eventData.set(inboundEvent.readData());
+ }
+ latch.countDown();
+ }
+ });
+
+ boolean latchTimedOut;
+ boolean closeTimedOut;
+ try {
+ es.open();
+ latchTimedOut = latch.await(5 * getAsyncTimeoutMultiplier(), TimeUnit.SECONDS);
+ } finally {
+ closeTimedOut = es.close(5, TimeUnit.SECONDS);
+ }
+
+ // 2.0.0.-M3 assertEquals("Unexpected event count", 1, counter.get());
+ assertEquals("Unexpected event data", "single", eventData.get());
+ assertTrue("Event latch has timed out", latchTimedOut);
+ assertTrue("EventSource.close() has timed out", closeTimedOut);
+ }
+}
diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/SseTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/SseTest.java
new file mode 100644
index 0000000..61a03bf
--- /dev/null
+++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/SseTest.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.helidon.connector.sse;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.helidon.connector.HelidonConnectorProvider;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.PostConstruct;
+import javax.inject.Singleton;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseEventSink;
+import javax.ws.rs.sse.SseEventSource;
+import java.io.Closeable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class SseTest extends JerseyTest {
+
+ private static String PALINDROME = "neveroddoreven";
+ private static int WAIT_TIME = 5000;
+
+ @Path("simple")
+ public static class SimpleSseResource {
+ @GET
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void send(@Context SseEventSink sink, @Context Sse sse) {
+ try (SseEventSink s = sink) {
+ for (int i = 0; i != 10; i++) {
+ s.send(sse.newEvent("A"));
+ }
+ }
+ }
+ }
+
+ @Path("broadcast")
+ @Singleton
+ public static class BroadcasterResource {
+ private static final String WELCOME = "Welcome";
+
+ @Context
+ private Sse sse;
+
+ private static SseBroadcaster sseBroadcaster;
+
+ @PostConstruct
+ public void init() {
+ System.out.println("INIT");
+ sseBroadcaster = sse.newBroadcaster();
+ }
+
+ @GET
+ @Path("register")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void register(@Context SseEventSink sink) {
+ sseBroadcaster.register(sink);
+ sink.send(sse.newEvent(WELCOME));
+ }
+
+ @POST
+ @Path("broadcast")
+ @Consumes(MediaType.TEXT_PLAIN)
+ public void broadcast(String event) {
+ sseBroadcaster.broadcast(sse.newEvent(event));
+ }
+ }
+
+ @Override
+ protected Application configure() {
+ return new ResourceConfig(SimpleSseResource.class, BroadcasterResource.class);
+ }
+
+ @Override
+ protected void configureClient(ClientConfig config) {
+ config.connectorProvider(new HelidonConnectorProvider());
+ //config.property("jersey.config.helidon.client.entity.type", "OUTPUT_STREAM_PUBLISHER");
+ }
+
+ @Test
+ public void testSend() throws InterruptedException {
+ final StringBuilder sb = new StringBuilder();
+ final CountDownLatch latch = new CountDownLatch(10);
+ try (SseEventSource source = SseEventSource.target(target().path("simple")).build()) {
+ source.register((event) -> sb.append(event.readData()));
+ source.register((event) -> latch.countDown());
+ source.open();
+
+ latch.await(WAIT_TIME, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.assertEquals("AAAAAAAAAA", sb.toString());
+ Assert.assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ public void testBroadcast() throws InterruptedException {
+ final BroadcasterClient clientOne = new BroadcasterClient(target());
+ final BroadcasterClient clientTwo = new BroadcasterClient(target());
+
+ clientOne.register();
+ clientTwo.register();
+
+ clientOne.broadcast();
+ clientTwo.broadcast();
+
+ clientOne.messageLatch.await(WAIT_TIME, TimeUnit.MILLISECONDS);
+ clientTwo.messageLatch.await(WAIT_TIME, TimeUnit.MILLISECONDS);
+
+ Assert.assertEquals(0, clientOne.messageLatch.getCount());
+ Assert.assertEquals(0, clientTwo.messageLatch.getCount());
+
+ Assert.assertEquals(BroadcasterResource.WELCOME + PALINDROME + PALINDROME, clientOne.message.toString());
+ Assert.assertEquals(BroadcasterResource.WELCOME + PALINDROME + PALINDROME, clientTwo.message.toString());
+
+ clientOne.close();
+ clientTwo.close();
+ }
+
+ private static class BroadcasterClient implements Closeable {
+ private final WebTarget target;
+ private final CountDownLatch messageLatch = new CountDownLatch(3);
+ private final SseEventSource source;
+ private final StringBuilder message = new StringBuilder();
+
+ private BroadcasterClient(WebTarget target) {
+ this.target = target;
+ source = SseEventSource.target(target.path("broadcast/register")).build();
+ }
+
+ private void register() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ source.register((event) -> message.append(event.readData()));
+ source.register((event) -> latch.countDown());
+ source.register((event) -> messageLatch.countDown());
+ source.open();
+
+ latch.await(WAIT_TIME, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(0, latch.getCount());
+ }
+
+ private void broadcast() {
+ try (Response r = target.path("broadcast/broadcast")
+ .request().buildPost(Entity.entity(PALINDROME, MediaType.TEXT_PLAIN)).invoke()) {
+ Assert.assertEquals(204, r.getStatus());
+ }
+ }
+
+ @Override
+ public void close() {
+ source.close();
+ }
+ }
+}
diff --git a/connectors/pom.xml b/connectors/pom.xml
index 37ecbda..87875d8 100644
--- a/connectors/pom.xml
+++ b/connectors/pom.xml
@@ -75,4 +75,16 @@
<scope>test</scope>
</dependency>
</dependencies>
+
+ <profiles>
+ <profile>
+ <id>HelidonConnector</id>
+ <activation>
+ <jdk>11</jdk>
+ </activation>
+ <modules>
+ <module>helidon-connector</module>
+ </modules>
+ </profile>
+ </profiles>
</project>