Initial Contribution
Signed-off-by: Jan Supol <jan.supol@oracle.com>
diff --git a/examples/sse-item-store-jaxrs-webapp/README.MD b/examples/sse-item-store-jaxrs-webapp/README.MD
new file mode 100644
index 0000000..2948b07
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/README.MD
@@ -0,0 +1,67 @@
+[//]: # " Copyright (c) 2015, 2018 Oracle and/or its affiliates. All rights reserved. "
+[//]: # " "
+[//]: # " This program and the accompanying materials are made available under the "
+[//]: # " terms of the Eclipse Distribution License v. 1.0, which is available at "
+[//]: # " http://www.eclipse.org/org/documents/edl-v10.php. "
+[//]: # " "
+[//]: # " SPDX-License-Identifier: BSD-3-Clause "
+
+JAX-RS SSE Item Store Jersey Example
+====================================
+
+An example demonstrating how new Server Sent Events (SSE) JAX-RS API can
+be used to notify clients about changes in server-side managed data
+(collection of items). The example also outlines how SSE events can be
+consumed using javascript browser-based clients.
+
+Contents
+--------
+
+The example consists of a web-based client application and a server-side
+application deployed on a [Jetty servlet container](http://www.eclipse.org/jetty/documentation/current/).
+
+The server side part of the application consists of a JAX-RS resource
+managing collection of string items and a simple HTML page that includes
+a browser SSE client written in Javascript as well as a basic CSS
+stylesheet. The SSE Javascript client connects to the JAX-RS resource
+and transforms the streamed messages into HTML code that is rendered by
+the browser. The javascript client also demonstrates how named and
+unnamed SSE events are handled by HTML5 `EventSource` component. The
+mapping of the URI path space of the server-side part of the application
+is presented in the following table:
+
+URI path | Resource class | HTTP methods
+------------------------------- | ------------------- | --------------
+**_/resources/items_** | ItemStoreResource | GET, POST
+**_/resources/items/events_** | ItemStoreResource | GET (SSE)
+
+Application is configured to run using Jetty maven plugin under base
+path `sse-item-store-jaxrs-webapp`.
+
+Running the Example
+-------------------
+
+> mvn clean compile jetty:run
+
+The command above deploys the current example. After successful
+deployment, you should be able to access the browser SSE client page at
+<http://localhost:8080/sse-item-store-jaxrs-webapp/index.html>. To see the raw
+SSE event stream, you may also point your browser directly at the
+[`ItemStoreResource`](http://localhost:8080/sse-item-store-jaxrs-webapp/resources/items/events).
+
+Deploying the example to another servlet container.
+---------------------------------------------------
+
+> mvn clean package
+
+The command above creates a Servlet 3.x compliant WAR located in the
+target directory. The WAR can be then deployed to your Servlet 3.x
+compliant container.
+
+Running Test Client
+-------------------
+
+After deploying the application into a Servlet 3.x compliant container,
+you can run the attached test by executing:
+
+> mvn test -Djersey.config.test.container.factory=org.glassfish.jersey.test.external.ExternalTestContainerFactory -Djersey.config.test.container.port=<port>
diff --git a/examples/sse-item-store-jaxrs-webapp/pom.xml b/examples/sse-item-store-jaxrs-webapp/pom.xml
new file mode 100644
index 0000000..afba6c1
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Distribution License v. 1.0, which is available at
+ http://www.eclipse.org/org/documents/edl-v10.php.
+
+ SPDX-License-Identifier: BSD-3-Clause
+
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.glassfish.jersey.examples</groupId>
+ <artifactId>webapp-example-parent</artifactId>
+ <relativePath>../webapp-example-parent/pom.xml</relativePath>
+ <version>2.28-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>sse-item-store-jaxrs-webapp</artifactId>
+ <packaging>war</packaging>
+ <name>jersey-examples-sse-item-store-jaxrs-webapp</name>
+
+ <description>Jersey JAX-RS 2.1 SSE API-based item store example.</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>jersey-hk2</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-sse</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.connectors</groupId>
+ <artifactId>jersey-apache-connector</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Run the application using "mvn jetty:run" -->
+ <plugin>
+ <!-- TODO unify Jetty in all examples -->
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-maven-plugin</artifactId>
+ <configuration>
+ <scanIntervalSeconds>10</scanIntervalSeconds>
+ <stopPort>9999</stopPort>
+ <stopKey>STOP</stopKey>
+ <webApp>
+ <contextPath>/${project.artifactId}</contextPath>
+ </webApp>
+ <systemProperties>
+ <systemProperty>
+ <name>jetty.port</name>
+ <value>${jersey.config.test.container.port}</value>
+ </systemProperty>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>release</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>xml-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <properties>
+ <jersey.config.test.container.port>8080</jersey.config.test.container.port>
+ </properties>
+</project>
diff --git a/examples/sse-item-store-jaxrs-webapp/src/main/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreApp.java b/examples/sse-item-store-jaxrs-webapp/src/main/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreApp.java
new file mode 100644
index 0000000..ed76ecf
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/src/main/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreApp.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Distribution License v. 1.0, which is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+package org.glassfish.jersey.examples.sseitemstore.jaxrs;
+
+import javax.ws.rs.ApplicationPath;
+
+import org.glassfish.jersey.server.ResourceConfig;
+
+/**
+ * SSE item store JAX-RS application class.
+ *
+ * @author Adam Lindenthal (adam.lindenthal at oracle.com)
+ */
+@ApplicationPath("resources")
+public class JaxrsItemStoreApp extends ResourceConfig {
+ /**
+ * Create new SSE Item Store Example JAX-RS application.
+ */
+ public JaxrsItemStoreApp() {
+ super(JaxrsItemStoreResource.class);
+ }
+}
diff --git a/examples/sse-item-store-jaxrs-webapp/src/main/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreResource.java b/examples/sse-item-store-jaxrs-webapp/src/main/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreResource.java
new file mode 100644
index 0000000..a8c193d
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/src/main/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreResource.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Distribution License v. 1.0, which is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+package org.glassfish.jersey.examples.sseitemstore.jaxrs;
+
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.logging.Logger;
+
+import javax.ws.rs.BadRequestException;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.ServiceUnavailableException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.glassfish.jersey.media.sse.SseFeature;
+
+
+/**
+ * A resource for storing named items.
+ *
+ * @author Marek Potociar (marek.potociar at oracle.com)
+ */
+@Path("items")
+public class JaxrsItemStoreResource {
+ private static final Logger LOGGER = Logger.getLogger(JaxrsItemStoreResource.class.getName());
+
+ private static final ReentrantReadWriteLock storeLock = new ReentrantReadWriteLock();
+ private static final LinkedList<String> itemStore = new LinkedList<>();
+
+ private static final AtomicReference<SseBroadcaster> BROADCASTER = new AtomicReference<>(null);
+
+ @Context
+ private Sse sse;
+
+ private static volatile long reconnectDelay = 0;
+
+ /**
+ * List all stored items.
+ *
+ * @return list of all stored items.
+ */
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public String listItems() {
+ try {
+ storeLock.readLock().lock();
+ return itemStore.toString();
+ } finally {
+ storeLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Receive & process commands sent by the test client that control the internal resource state.
+ * <p>
+ * Following is the list of recognized commands:
+ * <ul>
+ * <li><b>disconnect</b> - disconnect all registered event streams.</li>
+ * <li><b>reconnect now</b> - enable client reconnecting.</li>
+ * <li><b>reconnect <seconds></b> - disable client reconnecting.
+ * Reconnecting clients will receive a HTTP 503 response with
+ * {@value javax.ws.rs.core.HttpHeaders#RETRY_AFTER} set to the amount of
+ * milliseconds specified.</li>
+ * </ul>
+ *
+ * @param command command to be processed.
+ * @return message about processing result.
+ * @throws BadRequestException in case the command is not recognized or not specified.
+ */
+ @POST
+ @Path("commands")
+ public String processCommand(String command) {
+ if (command == null || command.isEmpty()) {
+ throw new BadRequestException("No command specified.");
+ }
+
+ if ("disconnect".equals(command)) {
+ closeBroadcaster();
+ return "Disconnected.";
+ } else if ("reconnect ".length() < command.length() && command.startsWith("reconnect ")) {
+ final String when = command.substring("reconnect ".length());
+ try {
+ reconnectDelay = "now".equals(when) ? 0 : Long.parseLong(when);
+ return "Reconnect strategy updated: " + when;
+ } catch (NumberFormatException ignore) {
+ // ignored
+ }
+ }
+
+ throw new BadRequestException("Command not recognized: '" + command + "'");
+ }
+
+ /**
+ * TODO - rewrite Connect or re-connect to SSE event stream.
+ *
+ * @param lastEventId Value of custom SSE HTTP <tt>{@value SseFeature#LAST_EVENT_ID_HEADER}</tt> header.
+ * Defaults to {@code -1} if not set.
+ * @throws InternalServerErrorException in case replaying missed events to the reconnected output stream fails.
+ * @throws ServiceUnavailableException in case the reconnect delay is set to a positive value.
+ */
+ @GET
+ @Path("events")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void itemEvents(@HeaderParam(SseFeature.LAST_EVENT_ID_HEADER) @DefaultValue("-1") int lastEventId,
+ @Context SseEventSink eventSink) {
+
+ if (lastEventId >= 0) {
+ LOGGER.info("Received last event id :" + lastEventId);
+
+ // decide the reconnect handling strategy based on current reconnect delay value.
+ final long delay = reconnectDelay;
+ if (0 < delay) {
+ LOGGER.info("Non-zero reconnect delay [" + delay + "] - responding with HTTP 503.");
+ throw new ServiceUnavailableException(delay);
+ } else {
+ LOGGER.info("Zero reconnect delay - reconnecting.");
+ replayMissedEvents(lastEventId, eventSink);
+ }
+ }
+
+ getBroadcaster().register(eventSink);
+ }
+
+ private void replayMissedEvents(int lastEventId, SseEventSink eventSink) {
+ try {
+ storeLock.readLock().lock();
+ final int firstUnreceived = lastEventId + 1;
+ final int missingCount = itemStore.size() - firstUnreceived;
+ if (missingCount > 0) {
+ LOGGER.info("Replaying events - starting with id " + firstUnreceived);
+ final ListIterator<String> it = itemStore.subList(firstUnreceived, itemStore.size()).listIterator();
+ while (it.hasNext()) {
+ eventSink.send(createItemEvent(it.nextIndex() + firstUnreceived, it.next()));
+ }
+ } else {
+ LOGGER.info("No events to replay.");
+ }
+ } finally {
+ storeLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Add new item to the item store.
+ * <p>
+ * Invoking this method will fire 2 new SSE events - 1st about newly added item and 2nd about the new item store size.
+ *
+ * @param name item name.
+ */
+ @POST
+ public void addItem(@FormParam("name") String name) {
+ // Ignore if the request was sent without name parameter.
+ if (name == null) {
+ return;
+ }
+
+ final int eventId;
+ try {
+ storeLock.writeLock().lock();
+ eventId = itemStore.size();
+ itemStore.add(name);
+
+ SseBroadcaster sseBroadcaster = getBroadcaster();
+
+ // Broadcasting an un-named event with the name of the newly added item in data
+ sseBroadcaster.broadcast(createItemEvent(eventId, name));
+
+ // Broadcasting a named "size" event with the current size of the items collection in data
+ final OutboundSseEvent event = sse.newEventBuilder().name("size").data(Integer.class, eventId + 1).build();
+ sseBroadcaster.broadcast(event);
+ } finally {
+ storeLock.writeLock().unlock();
+ }
+ }
+
+ private OutboundSseEvent createItemEvent(final int eventId, final String name) {
+ Logger.getLogger(JaxrsItemStoreResource.class.getName())
+ .info("Creating event id [" + eventId + "] name [" + name + "]");
+ return sse.newEventBuilder().id("" + eventId).data(String.class, name).build();
+ }
+
+ /**
+ * Get stored broadcaster or create a new one and store it.
+ *
+ * @return broadcaster instance.
+ */
+ private SseBroadcaster getBroadcaster() {
+ SseBroadcaster sseBroadcaster = BROADCASTER.get();
+ if (sseBroadcaster == null) {
+ BROADCASTER.compareAndSet(null, sse.newBroadcaster());
+ }
+
+ return BROADCASTER.get();
+ }
+
+ /**
+ * Close currently stored broadcaster.
+ */
+ private void closeBroadcaster() {
+ SseBroadcaster sseBroadcaster = BROADCASTER.getAndSet(null);
+ if (sseBroadcaster == null) {
+ return;
+ }
+ sseBroadcaster.close();
+ }
+}
diff --git a/examples/sse-item-store-jaxrs-webapp/src/main/webapp/css/main.css b/examples/sse-item-store-jaxrs-webapp/src/main/webapp/css/main.css
new file mode 100644
index 0000000..5e31320
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/src/main/webapp/css/main.css
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Distribution License v. 1.0, which is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+div.message {
+ border-radius: 10px;
+ border: thin solid #444444;
+ margin: 10px;
+ width: 300px;
+ padding: 10px;
+ box-shadow: 5px 5px 5px #888888;
+ font-family: Helvetica, serif;
+ color: #333333;
+}
+
+div.items {
+ border-radius: 10px;
+ border: thin dotted #AAAAFF;
+ margin: 10px;
+ padding: 10px;
+ font-family: Helvetica, serif;
+ color: #4444FF;
+}
+
+div.messags {
+ border-radius: 10px;
+ border: thin dotted #888888;
+ margin: 10px;
+ padding: 10px;
+}
diff --git a/examples/sse-item-store-jaxrs-webapp/src/main/webapp/index.html b/examples/sse-item-store-jaxrs-webapp/src/main/webapp/index.html
new file mode 100644
index 0000000..0c488a2
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/src/main/webapp/index.html
@@ -0,0 +1,27 @@
+<!--
+
+ Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Distribution License v. 1.0, which is available at
+ http://www.eclipse.org/org/documents/edl-v10.php.
+
+ SPDX-License-Identifier: BSD-3-Clause
+
+-->
+
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<head>
+ <title>Jersey SSE Item Store Example</title>
+ <link rel="stylesheet" href="css/main.css"/>
+ <script src="js/engine.js" type="text/javascript"></script>
+</head>
+<body>
+<div>
+ Enter item: <input id="name" type="text"/><input value="Add" type="button" onclick="addItem()"/>
+</div>
+<div id="items" class="items"></div>
+<div id="messages" class="messages"></div>
+</body>
+</html>
diff --git a/examples/sse-item-store-jaxrs-webapp/src/main/webapp/js/engine.js b/examples/sse-item-store-jaxrs-webapp/src/main/webapp/js/engine.js
new file mode 100644
index 0000000..3b1f42d
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/src/main/webapp/js/engine.js
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Distribution License v. 1.0, which is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+"use strict";
+
+function addItem() {
+ var itemInput = document.getElementById("name");
+
+ var req = new XMLHttpRequest();
+ req.open("POST", "resources/items", true);
+ req.setRequestHeader("Content-Type", "application/x-www-form-urlencoded");
+ req.onreadystatechange = function () {
+ if (req.readyState == 4 && req.status == 204) {
+ //Call a function when the state changes.
+ itemInput.value = "";
+ getItems();
+ }
+ };
+ req.send("name=" + itemInput.value);
+}
+
+function getItems() {
+ var req = new XMLHttpRequest();
+ req.open("GET", "resources/items", true);
+ req.setRequestHeader("Accept", "text/plain");
+ req.onreadystatechange = function () {
+ //Call a function when the state changes.
+ if (req.readyState == 4 && req.status == 200) {
+ document.getElementById("items").innerHTML = req.responseText;
+ }
+ };
+ req.send();
+}
+
+function display(data, rgb) {
+ var msgSpan = document.createElement("span");
+ msgSpan.style.color = rgb;
+ msgSpan.innerHTML = data;
+ var msgDiv = document.createElement("div");
+ msgDiv.className = "message";
+ msgDiv.appendChild(msgSpan);
+
+ var messages = document.getElementById("messages");
+ messages.insertBefore(msgDiv, messages.firstChild);
+}
+
+function receiveMessages() {
+ if (typeof(EventSource) !== "undefined") {
+ // Yes! Server-sent events support!
+ var source = new EventSource("resources/items/events");
+ source.onmessage = function (event) {
+ console.log('Received unnamed event: ' + event.data);
+ display("Added new item: " + event.data, "#444444");
+ };
+
+ source.addEventListener("size", function(e) {
+ console.log('Received event ' + event.name + ': ' + event.data);
+ display("New items size: " + event.data, "#0000FF");
+ }, false);
+
+ source.onopen = function (event) {
+ console.log("event source opened");
+ };
+
+ source.onerror = function (event) {
+ console.log('Received error event: ' + event.data);
+ display(event.data, "#FF0000");
+ };
+ } else {
+ // Sorry! No server-sent events support..
+ display('SSE not supported by browser.', "#FF0000");
+ }
+}
+
+window.onload = receiveMessages;
diff --git a/examples/sse-item-store-jaxrs-webapp/src/test/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreResourceTest.java b/examples/sse-item-store-jaxrs-webapp/src/test/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreResourceTest.java
new file mode 100644
index 0000000..0f352a6
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/src/test/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreResourceTest.java
@@ -0,0 +1,301 @@
+/*
+ * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Distribution License v. 1.0, which is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+package org.glassfish.jersey.examples.sseitemstore.jaxrs;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Form;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.sse.SseEventSource;
+
+import org.glassfish.jersey.apache.connector.ApacheClientProperties;
+import org.glassfish.jersey.apache.connector.ApacheConnectorProvider;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;
+import org.glassfish.jersey.test.JerseyTest;
+import org.glassfish.jersey.test.external.ExternalTestContainerFactory;
+
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.describedAs;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Item store test.
+ *
+ * @author Marek Potociar (marek.potociar at oracle.com)
+ */
+public class JaxrsItemStoreResourceTest extends JerseyTest {
+
+ private static final int RECONNECT_DEFAULT = 500;
+ private static final Logger LOGGER = Logger.getLogger(JaxrsItemStoreResourceTest.class.getName());
+ private static final int MAX_LISTENERS = 5;
+ private static final int MAX_ITEMS = 10;
+
+ private final ExecutorService executorService =
+ Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("JaxrsItemStoreResourceTest-%d").build());
+ private final AtomicReference<Client> client = new AtomicReference<>(null);
+
+ @Override
+ protected Application configure() {
+ return new JaxrsItemStoreApp();
+ }
+
+ protected void configureClient(ClientConfig config) {
+ // using AHC as a test client connector to avoid issues with HttpUrlConnection socket management.
+ PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
+
+ // adjusting max. connections just to be safe - the testEventSourceReconnect is quite greedy...
+ cm.setMaxTotal(MAX_LISTENERS * MAX_ITEMS);
+ cm.setDefaultMaxPerRoute(MAX_LISTENERS * MAX_ITEMS);
+
+ config.property(ApacheClientProperties.CONNECTION_MANAGER, cm)
+ .property(ClientProperties.READ_TIMEOUT, 2000)
+ .connectorProvider(new ApacheConnectorProvider());
+ }
+
+ @Override
+ protected Client getClient() {
+ if (client.get() == null) {
+ ClientConfig clientConfig = new ClientConfig();
+ configureClient(clientConfig);
+ client.compareAndSet(null,
+ ClientBuilder.newBuilder()
+ .withConfig(clientConfig)
+ .executorService(executorService).build());
+ }
+
+ return client.get();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ client.get().close();
+ executorService.shutdown();
+ }
+
+ @Override
+ protected URI getBaseUri() {
+ final UriBuilder baseUriBuilder = UriBuilder.fromUri(super.getBaseUri()).path("sse-item-store-jaxrs-webapp");
+ final boolean externalFactoryInUse = getTestContainerFactory() instanceof ExternalTestContainerFactory;
+ return externalFactoryInUse ? baseUriBuilder.path("resources").build() : baseUriBuilder.build();
+ }
+
+ /**
+ * Test the item addition, addition event broadcasting and item retrieval from {@link JaxrsItemStoreResource}.
+ *
+ * @throws Exception in case of a test failure.
+ */
+ @Test
+ public void testItemsStore() throws Exception {
+ final List<String> items = Collections.unmodifiableList(Arrays.asList("foo", "bar", "baz"));
+ final WebTarget itemsTarget = target("items");
+ final CountDownLatch latch = new CountDownLatch(items.size() * MAX_LISTENERS * 2); // countdown on all events
+ final List<Queue<Integer>> indexQueues = new ArrayList<>(MAX_LISTENERS);
+ final SseEventSource[] sources = new SseEventSource[MAX_LISTENERS];
+ final AtomicInteger sizeEventsCount = new AtomicInteger(0);
+
+ for (int i = 0; i < MAX_LISTENERS; i++) {
+ final int id = i;
+ final SseEventSource es = SseEventSource.target(itemsTarget.path("events")).build();
+ sources[id] = es;
+
+ final Queue<Integer> indexes = new ConcurrentLinkedQueue<>();
+ indexQueues.add(indexes);
+
+ es.register(inboundEvent -> {
+ try {
+ if (null == inboundEvent.getName()) {
+ final String data = inboundEvent.readData();
+ LOGGER.info("[-i-] SOURCE " + id + ": Received event id=" + inboundEvent.getId() + " data=" + data);
+ indexes.add(items.indexOf(data));
+ } else if ("size".equals(inboundEvent.getName())) {
+ sizeEventsCount.incrementAndGet();
+ }
+ } catch (Exception ex) {
+ LOGGER.log(Level.SEVERE, "[-x-] SOURCE " + id + ": Error getting event data.", ex);
+ indexes.add(-999);
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ try {
+ open(sources);
+ items.forEach((item) -> postItem(itemsTarget, item));
+
+ assertTrue("Waiting to receive all events has timed out.",
+ latch.await((1000 + MAX_LISTENERS * RECONNECT_DEFAULT) * getAsyncTimeoutMultiplier(),
+ TimeUnit.MILLISECONDS));
+
+ // need to force disconnect on server in order for EventSource.close(...) to succeed with HttpUrlConnection
+ sendCommand(itemsTarget, "disconnect");
+ } finally {
+ close(sources);
+ }
+
+ String postedItems = itemsTarget.request().get(String.class);
+ items.forEach((item) -> assertTrue("Item '" + item + "' not stored on server.", postedItems.contains(item)));
+
+ final AtomicInteger queueId = new AtomicInteger(0);
+ indexQueues.forEach((indexes) -> {
+ for (int i = 0; i < items.size(); i++) {
+ assertTrue("Event for '" + items.get(i) + "' not received in queue " + queueId.get(), indexes.contains(i));
+ }
+ assertEquals("Not received the expected number of events in queue " + queueId.get(), items.size(), indexes.size());
+ queueId.incrementAndGet();
+ });
+
+ assertEquals("Number of received 'size' events does not match.", items.size() * MAX_LISTENERS, sizeEventsCount.get());
+ }
+
+ /**
+ * Test the {@link SseEventSource} reconnect feature.
+ *
+ * @throws Exception in case of a test failure.
+ */
+ @Test
+ public void testEventSourceReconnect() throws Exception {
+ final WebTarget itemsTarget = target("items");
+ final CountDownLatch latch = new CountDownLatch(MAX_ITEMS * MAX_LISTENERS * 2); // countdown only on new item events
+ final List<Queue<String>> receivedQueues = new ArrayList<>(MAX_LISTENERS);
+ final SseEventSource[] sources = new SseEventSource[MAX_LISTENERS];
+
+ for (int i = 0; i < MAX_LISTENERS; i++) {
+ final int id = i;
+ final SseEventSource es = SseEventSource.target(itemsTarget.path("events"))
+ .reconnectingEvery(1, TimeUnit.MILLISECONDS).build();
+ sources[id] = es;
+
+ final Queue<String> received = new ConcurrentLinkedQueue<>();
+ receivedQueues.add(received);
+
+ es.register(inboundEvent -> {
+ try {
+ if (null == inboundEvent.getName()) {
+ final String data = inboundEvent.readData();
+ LOGGER.info("[-i-] SOURCE " + id + ": Received event id=" + inboundEvent.getId() + " data=" + data);
+ received.add(data);
+ latch.countDown();
+ }
+ } catch (Exception ex) {
+ LOGGER.log(Level.SEVERE, "[-x-] SOURCE " + id + ": Error getting event data.", ex);
+ received.add("[data processing error]");
+ }
+ });
+ }
+
+ final String[] postedItems = new String[MAX_ITEMS * 2];
+ try {
+ open(sources);
+
+ for (int i = 0; i < MAX_ITEMS; i++) {
+ final String item = String.format("round-1-%02d", i);
+ postItem(itemsTarget, item);
+ postedItems[i] = item;
+ sendCommand(itemsTarget, "disconnect");
+ Thread.sleep(200);
+ }
+
+ final int reconnectDelay = 1;
+ sendCommand(itemsTarget, "reconnect " + reconnectDelay);
+ sendCommand(itemsTarget, "disconnect");
+
+ Thread.sleep(reconnectDelay * 1000);
+
+ for (int i = 0; i < MAX_ITEMS; i++) {
+ final String item = String.format("round-2-%02d", i);
+ postedItems[i + MAX_ITEMS] = item;
+ postItem(itemsTarget, item);
+ }
+
+ sendCommand(itemsTarget, "reconnect now");
+
+ assertTrue("Waiting to receive all events has timed out.",
+ latch.await((1 + MAX_LISTENERS * (MAX_ITEMS + 1) * reconnectDelay) * getAsyncTimeoutMultiplier(),
+ TimeUnit.SECONDS));
+
+ // need to force disconnect on server in order for EventSource.close(...) to succeed with HttpUrlConnection
+ sendCommand(itemsTarget, "disconnect");
+ } finally {
+ close(sources);
+ }
+
+ final String storedItems = itemsTarget.request().get(String.class);
+ for (String item : postedItems) {
+ assertThat("Posted item '" + item + "' stored on server", storedItems, containsString(item));
+ }
+
+ int sourceId = 0;
+ for (Queue<String> queue : receivedQueues) {
+ assertThat("Received events in source " + sourceId, queue,
+ describedAs("Collection containing %0", hasItems(postedItems), Arrays.asList(postedItems).toString()));
+ assertThat("Size of received queue for source " + sourceId, queue.size(), equalTo(postedItems.length));
+ sourceId++;
+ }
+ }
+
+ private static void postItem(final WebTarget itemsTarget, final String item) {
+ final Response response = itemsTarget.request().post(Entity.form(new Form("name", item)));
+ assertEquals("Posting new item has failed.", 204, response.getStatus());
+ LOGGER.info("[-i-] POSTed item: '" + item + "'");
+ }
+
+ private static void open(final SseEventSource[] sources) {
+ Arrays.stream(sources).forEach(SseEventSource::open);
+ }
+
+ private static void close(final SseEventSource[] sources) {
+ int i = 0;
+ for (SseEventSource source : sources) {
+ if (source.isOpen()) {
+ assertTrue("Waiting to close a source has timed out.", source.close(1, TimeUnit.SECONDS));
+// source.close(100, TimeUnit.MILLISECONDS);
+ LOGGER.info("[<--] SOURCE " + i++ + " closed.");
+ }
+ }
+ }
+
+ private static void sendCommand(final WebTarget itemsTarget, final String command) {
+ final Response response = itemsTarget.path("commands").request().post(Entity.text(command));
+ assertEquals("'" + command + "' command has failed.", 200, response.getStatus());
+ LOGGER.info("[-!-] COMMAND '" + command + "' has been processed.");
+ }
+}