Initial Contribution

Signed-off-by: Jan Supol <jan.supol@oracle.com>
diff --git a/examples/sse-item-store-jaxrs-webapp/README.MD b/examples/sse-item-store-jaxrs-webapp/README.MD
new file mode 100644
index 0000000..2948b07
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/README.MD
@@ -0,0 +1,67 @@
+[//]: # " Copyright (c) 2015, 2018 Oracle and/or its affiliates. All rights reserved. "
+[//]: # " "
+[//]: # " This program and the accompanying materials are made available under the "
+[//]: # " terms of the Eclipse Distribution License v. 1.0, which is available at "
+[//]: # " http://www.eclipse.org/org/documents/edl-v10.php. "
+[//]: # " "
+[//]: # " SPDX-License-Identifier: BSD-3-Clause "
+
+JAX-RS SSE Item Store Jersey Example
+====================================
+
+An example demonstrating how new Server Sent Events (SSE) JAX-RS API can
+be used to notify clients about changes in server-side managed data
+(collection of items). The example also outlines how SSE events can be
+consumed using javascript browser-based clients.
+
+Contents
+--------
+
+The example consists of a web-based client application and a server-side
+application deployed on a [Jetty servlet container](http://www.eclipse.org/jetty/documentation/current/).
+
+The server side part of the application consists of a JAX-RS resource
+managing collection of string items and a simple HTML page that includes
+a browser SSE client written in Javascript as well as a basic CSS
+stylesheet. The SSE Javascript client connects to the JAX-RS resource
+and transforms the streamed messages into HTML code that is rendered by
+the browser. The javascript client also demonstrates how named and
+unnamed SSE events are handled by HTML5 `EventSource` component. The
+mapping of the URI path space of the server-side part of the application
+is presented in the following table:
+
+URI path                        |  Resource class      | HTTP methods
+------------------------------- |  ------------------- | --------------
+**_/resources/items_**          |  ItemStoreResource   | GET, POST
+**_/resources/items/events_**   |  ItemStoreResource   | GET (SSE)
+
+Application is configured to run using Jetty maven plugin under base
+path `sse-item-store-jaxrs-webapp`.
+
+Running the Example
+-------------------
+
+>     mvn clean compile jetty:run
+
+The command above deploys the current example. After successful
+deployment, you should be able to access the browser SSE client page at
+<http://localhost:8080/sse-item-store-jaxrs-webapp/index.html>. To see the raw
+SSE event stream, you may also point your browser directly at the
+[`ItemStoreResource`](http://localhost:8080/sse-item-store-jaxrs-webapp/resources/items/events).
+
+Deploying the example to another servlet container.
+---------------------------------------------------
+
+>     mvn clean package
+
+The command above creates a Servlet 3.x compliant WAR located in the
+target directory. The WAR can be then deployed to your Servlet 3.x
+compliant container.
+
+Running Test Client
+-------------------
+
+After deploying the application into a Servlet 3.x compliant container,
+you can run the attached test by executing:
+
+>     mvn test -Djersey.config.test.container.factory=org.glassfish.jersey.test.external.ExternalTestContainerFactory -Djersey.config.test.container.port=<port>
diff --git a/examples/sse-item-store-jaxrs-webapp/pom.xml b/examples/sse-item-store-jaxrs-webapp/pom.xml
new file mode 100644
index 0000000..afba6c1
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+
+    This program and the accompanying materials are made available under the
+    terms of the Eclipse Distribution License v. 1.0, which is available at
+    http://www.eclipse.org/org/documents/edl-v10.php.
+
+    SPDX-License-Identifier: BSD-3-Clause
+
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.glassfish.jersey.examples</groupId>
+        <artifactId>webapp-example-parent</artifactId>
+        <relativePath>../webapp-example-parent/pom.xml</relativePath>
+        <version>2.28-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sse-item-store-jaxrs-webapp</artifactId>
+    <packaging>war</packaging>
+    <name>jersey-examples-sse-item-store-jaxrs-webapp</name>
+
+    <description>Jersey JAX-RS 2.1 SSE API-based item store example.</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId>
+            <artifactId>jersey-container-servlet</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.inject</groupId>
+            <artifactId>jersey-hk2</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.media</groupId>
+            <artifactId>jersey-media-sse</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.connectors</groupId>
+            <artifactId>jersey-apache-connector</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- Run the application using "mvn jetty:run" -->
+            <plugin>
+                <!-- TODO unify Jetty in all examples -->
+                <groupId>org.eclipse.jetty</groupId>
+                <artifactId>jetty-maven-plugin</artifactId>
+                <configuration>
+                    <scanIntervalSeconds>10</scanIntervalSeconds>
+                    <stopPort>9999</stopPort>
+                    <stopKey>STOP</stopKey>
+                    <webApp>
+                        <contextPath>/${project.artifactId}</contextPath>
+                    </webApp>
+                    <systemProperties>
+                        <systemProperty>
+                            <name>jetty.port</name>
+                            <value>${jersey.config.test.container.port}</value>
+                        </systemProperty>
+                    </systemProperties>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>release</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>xml-maven-plugin</artifactId>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+    <properties>
+        <jersey.config.test.container.port>8080</jersey.config.test.container.port>
+    </properties>
+</project>
diff --git a/examples/sse-item-store-jaxrs-webapp/src/main/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreApp.java b/examples/sse-item-store-jaxrs-webapp/src/main/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreApp.java
new file mode 100644
index 0000000..ed76ecf
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/src/main/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreApp.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Distribution License v. 1.0, which is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+package org.glassfish.jersey.examples.sseitemstore.jaxrs;
+
+import javax.ws.rs.ApplicationPath;
+
+import org.glassfish.jersey.server.ResourceConfig;
+
+/**
+ * SSE item store JAX-RS application class.
+ *
+ * @author Adam Lindenthal (adam.lindenthal at oracle.com)
+ */
+@ApplicationPath("resources")
+public class JaxrsItemStoreApp extends ResourceConfig {
+    /**
+     * Create new SSE Item Store Example JAX-RS application.
+     */
+    public JaxrsItemStoreApp() {
+        super(JaxrsItemStoreResource.class);
+    }
+}
diff --git a/examples/sse-item-store-jaxrs-webapp/src/main/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreResource.java b/examples/sse-item-store-jaxrs-webapp/src/main/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreResource.java
new file mode 100644
index 0000000..a8c193d
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/src/main/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreResource.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Distribution License v. 1.0, which is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+package org.glassfish.jersey.examples.sseitemstore.jaxrs;
+
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.logging.Logger;
+
+import javax.ws.rs.BadRequestException;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.ServiceUnavailableException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.glassfish.jersey.media.sse.SseFeature;
+
+
+/**
+ * A resource for storing named items.
+ *
+ * @author Marek Potociar (marek.potociar at oracle.com)
+ */
+@Path("items")
+public class JaxrsItemStoreResource {
+    private static final Logger LOGGER = Logger.getLogger(JaxrsItemStoreResource.class.getName());
+
+    private static final ReentrantReadWriteLock storeLock = new ReentrantReadWriteLock();
+    private static final LinkedList<String> itemStore = new LinkedList<>();
+
+    private static final AtomicReference<SseBroadcaster> BROADCASTER = new AtomicReference<>(null);
+
+    @Context
+    private Sse sse;
+
+    private static volatile long reconnectDelay = 0;
+
+    /**
+     * List all stored items.
+     *
+     * @return list of all stored items.
+     */
+    @GET
+    @Produces(MediaType.TEXT_PLAIN)
+    public String listItems() {
+        try {
+            storeLock.readLock().lock();
+            return itemStore.toString();
+        } finally {
+            storeLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Receive & process commands sent by the test client that control the internal resource state.
+     * <p>
+     * Following is the list of recognized commands:
+     * <ul>
+     * <li><b>disconnect</b> - disconnect all registered event streams.</li>
+     * <li><b>reconnect now</b> - enable client reconnecting.</li>
+     * <li><b>reconnect &lt;seconds&gt;</b> - disable client reconnecting.
+     * Reconnecting clients will receive a HTTP 503 response with
+     * {@value javax.ws.rs.core.HttpHeaders#RETRY_AFTER} set to the amount of
+     * milliseconds specified.</li>
+     * </ul>
+     *
+     * @param command command to be processed.
+     * @return message about processing result.
+     * @throws BadRequestException in case the command is not recognized or not specified.
+     */
+    @POST
+    @Path("commands")
+    public String processCommand(String command) {
+        if (command == null || command.isEmpty()) {
+            throw new BadRequestException("No command specified.");
+        }
+
+        if ("disconnect".equals(command)) {
+            closeBroadcaster();
+            return "Disconnected.";
+        } else if ("reconnect ".length() < command.length() && command.startsWith("reconnect ")) {
+            final String when = command.substring("reconnect ".length());
+            try {
+                reconnectDelay = "now".equals(when) ? 0 : Long.parseLong(when);
+                return "Reconnect strategy updated: " + when;
+            } catch (NumberFormatException ignore) {
+                // ignored
+            }
+        }
+
+        throw new BadRequestException("Command not recognized: '" + command + "'");
+    }
+
+    /**
+     * TODO - rewrite Connect or re-connect to SSE event stream.
+     *
+     * @param lastEventId Value of custom SSE HTTP <tt>{@value SseFeature#LAST_EVENT_ID_HEADER}</tt> header.
+     *                    Defaults to {@code -1} if not set.
+     * @throws InternalServerErrorException in case replaying missed events to the reconnected output stream fails.
+     * @throws ServiceUnavailableException  in case the reconnect delay is set to a positive value.
+     */
+    @GET
+    @Path("events")
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    public void itemEvents(@HeaderParam(SseFeature.LAST_EVENT_ID_HEADER) @DefaultValue("-1") int lastEventId,
+                           @Context SseEventSink eventSink) {
+
+        if (lastEventId >= 0) {
+            LOGGER.info("Received last event id :" + lastEventId);
+
+            // decide the reconnect handling strategy based on current reconnect delay value.
+            final long delay = reconnectDelay;
+            if (0 < delay) {
+                LOGGER.info("Non-zero reconnect delay [" + delay + "] - responding with HTTP 503.");
+                throw new ServiceUnavailableException(delay);
+            } else {
+                LOGGER.info("Zero reconnect delay - reconnecting.");
+                replayMissedEvents(lastEventId, eventSink);
+            }
+        }
+
+        getBroadcaster().register(eventSink);
+    }
+
+    private void replayMissedEvents(int lastEventId, SseEventSink eventSink) {
+        try {
+            storeLock.readLock().lock();
+            final int firstUnreceived = lastEventId + 1;
+            final int missingCount = itemStore.size() - firstUnreceived;
+            if (missingCount > 0) {
+                LOGGER.info("Replaying events - starting with id " + firstUnreceived);
+                final ListIterator<String> it = itemStore.subList(firstUnreceived, itemStore.size()).listIterator();
+                while (it.hasNext()) {
+                    eventSink.send(createItemEvent(it.nextIndex() + firstUnreceived, it.next()));
+                }
+            } else {
+                LOGGER.info("No events to replay.");
+            }
+        } finally {
+            storeLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Add new item to the item store.
+     * <p>
+     * Invoking this method will fire 2 new SSE events - 1st about newly added item and 2nd about the new item store size.
+     *
+     * @param name item name.
+     */
+    @POST
+    public void addItem(@FormParam("name") String name) {
+        // Ignore if the request was sent without name parameter.
+        if (name == null) {
+            return;
+        }
+
+        final int eventId;
+        try {
+            storeLock.writeLock().lock();
+            eventId = itemStore.size();
+            itemStore.add(name);
+
+            SseBroadcaster sseBroadcaster = getBroadcaster();
+
+            // Broadcasting an un-named event with the name of the newly added item in data
+            sseBroadcaster.broadcast(createItemEvent(eventId, name));
+
+            // Broadcasting a named "size" event with the current size of the items collection in data
+            final OutboundSseEvent event = sse.newEventBuilder().name("size").data(Integer.class, eventId + 1).build();
+            sseBroadcaster.broadcast(event);
+        } finally {
+            storeLock.writeLock().unlock();
+        }
+    }
+
+    private OutboundSseEvent createItemEvent(final int eventId, final String name) {
+        Logger.getLogger(JaxrsItemStoreResource.class.getName())
+                .info("Creating event id [" + eventId + "] name [" + name + "]");
+        return sse.newEventBuilder().id("" + eventId).data(String.class, name).build();
+    }
+
+    /**
+     * Get stored broadcaster or create a new one and store it.
+     *
+     * @return broadcaster instance.
+     */
+    private SseBroadcaster getBroadcaster() {
+        SseBroadcaster sseBroadcaster = BROADCASTER.get();
+        if (sseBroadcaster == null) {
+            BROADCASTER.compareAndSet(null, sse.newBroadcaster());
+        }
+
+        return BROADCASTER.get();
+    }
+
+    /**
+     * Close currently stored broadcaster.
+     */
+    private void closeBroadcaster() {
+        SseBroadcaster sseBroadcaster = BROADCASTER.getAndSet(null);
+        if (sseBroadcaster == null) {
+            return;
+        }
+        sseBroadcaster.close();
+    }
+}
diff --git a/examples/sse-item-store-jaxrs-webapp/src/main/webapp/css/main.css b/examples/sse-item-store-jaxrs-webapp/src/main/webapp/css/main.css
new file mode 100644
index 0000000..5e31320
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/src/main/webapp/css/main.css
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Distribution License v. 1.0, which is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+div.message {
+    border-radius: 10px;
+    border: thin solid #444444;
+    margin: 10px;
+    width: 300px;
+    padding: 10px;
+    box-shadow: 5px 5px 5px #888888;
+    font-family: Helvetica, serif;
+    color: #333333;
+}
+
+div.items {
+    border-radius: 10px;
+    border: thin dotted #AAAAFF;
+    margin: 10px;
+    padding: 10px;
+    font-family: Helvetica, serif;
+    color: #4444FF;
+}
+
+div.messags {
+    border-radius: 10px;
+    border: thin dotted #888888;
+    margin: 10px;
+    padding: 10px;
+}
diff --git a/examples/sse-item-store-jaxrs-webapp/src/main/webapp/index.html b/examples/sse-item-store-jaxrs-webapp/src/main/webapp/index.html
new file mode 100644
index 0000000..0c488a2
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/src/main/webapp/index.html
@@ -0,0 +1,27 @@
+<!--
+
+    Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+
+    This program and the accompanying materials are made available under the
+    terms of the Eclipse Distribution License v. 1.0, which is available at
+    http://www.eclipse.org/org/documents/edl-v10.php.
+
+    SPDX-License-Identifier: BSD-3-Clause
+
+-->
+
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<head>
+    <title>Jersey SSE Item Store Example</title>
+    <link rel="stylesheet" href="css/main.css"/>
+    <script src="js/engine.js" type="text/javascript"></script>
+</head>
+<body>
+<div>
+    Enter item: <input id="name" type="text"/><input value="Add" type="button" onclick="addItem()"/>
+</div>
+<div id="items" class="items"></div>
+<div id="messages" class="messages"></div>
+</body>
+</html>
diff --git a/examples/sse-item-store-jaxrs-webapp/src/main/webapp/js/engine.js b/examples/sse-item-store-jaxrs-webapp/src/main/webapp/js/engine.js
new file mode 100644
index 0000000..3b1f42d
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/src/main/webapp/js/engine.js
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Distribution License v. 1.0, which is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+"use strict";
+
+function addItem() {
+    var itemInput = document.getElementById("name");
+
+    var req = new XMLHttpRequest();
+    req.open("POST", "resources/items", true);
+    req.setRequestHeader("Content-Type", "application/x-www-form-urlencoded");
+    req.onreadystatechange = function () {
+        if (req.readyState == 4 && req.status == 204) {
+            //Call a function when the state changes.
+            itemInput.value = "";
+            getItems();
+        }
+    };
+    req.send("name=" + itemInput.value);
+}
+
+function getItems() {
+    var req = new XMLHttpRequest();
+    req.open("GET", "resources/items", true);
+    req.setRequestHeader("Accept", "text/plain");
+    req.onreadystatechange = function () {
+        //Call a function when the state changes.
+        if (req.readyState == 4 && req.status == 200) {
+            document.getElementById("items").innerHTML = req.responseText;
+        }
+    };
+    req.send();
+}
+
+function display(data, rgb) {
+    var msgSpan = document.createElement("span");
+    msgSpan.style.color = rgb;
+    msgSpan.innerHTML = data;
+    var msgDiv = document.createElement("div");
+    msgDiv.className = "message";
+    msgDiv.appendChild(msgSpan);
+
+    var messages = document.getElementById("messages");
+    messages.insertBefore(msgDiv, messages.firstChild);
+}
+
+function receiveMessages() {
+    if (typeof(EventSource) !== "undefined") {
+        // Yes! Server-sent events support!
+        var source = new EventSource("resources/items/events");
+        source.onmessage = function (event) {
+            console.log('Received unnamed event: ' + event.data);
+            display("Added new item: " + event.data, "#444444");
+        };
+
+        source.addEventListener("size", function(e) {
+            console.log('Received event ' + event.name + ': ' + event.data);
+            display("New items size: " + event.data, "#0000FF");
+        }, false);
+
+        source.onopen = function (event) {
+            console.log("event source opened");
+        };
+
+        source.onerror = function (event) {
+            console.log('Received error event: ' + event.data);
+            display(event.data, "#FF0000");
+        };
+    } else {
+        // Sorry! No server-sent events support..
+        display('SSE not supported by browser.', "#FF0000");
+    }
+}
+
+window.onload = receiveMessages;
diff --git a/examples/sse-item-store-jaxrs-webapp/src/test/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreResourceTest.java b/examples/sse-item-store-jaxrs-webapp/src/test/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreResourceTest.java
new file mode 100644
index 0000000..0f352a6
--- /dev/null
+++ b/examples/sse-item-store-jaxrs-webapp/src/test/java/org/glassfish/jersey/examples/sseitemstore/jaxrs/JaxrsItemStoreResourceTest.java
@@ -0,0 +1,301 @@
+/*
+ * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Distribution License v. 1.0, which is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+package org.glassfish.jersey.examples.sseitemstore.jaxrs;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Form;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.sse.SseEventSource;
+
+import org.glassfish.jersey.apache.connector.ApacheClientProperties;
+import org.glassfish.jersey.apache.connector.ApacheConnectorProvider;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;
+import org.glassfish.jersey.test.JerseyTest;
+import org.glassfish.jersey.test.external.ExternalTestContainerFactory;
+
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.describedAs;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Item store test.
+ *
+ * @author Marek Potociar (marek.potociar at oracle.com)
+ */
+public class JaxrsItemStoreResourceTest extends JerseyTest {
+
+    private static final int RECONNECT_DEFAULT = 500;
+    private static final Logger LOGGER = Logger.getLogger(JaxrsItemStoreResourceTest.class.getName());
+    private static final int MAX_LISTENERS = 5;
+    private static final int MAX_ITEMS = 10;
+
+    private final ExecutorService executorService =
+            Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("JaxrsItemStoreResourceTest-%d").build());
+    private final AtomicReference<Client> client = new AtomicReference<>(null);
+
+    @Override
+    protected Application configure() {
+        return new JaxrsItemStoreApp();
+    }
+
+    protected void configureClient(ClientConfig config) {
+        // using AHC as a test client connector to avoid issues with HttpUrlConnection socket management.
+        PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
+
+        // adjusting max. connections just to be safe - the testEventSourceReconnect is quite greedy...
+        cm.setMaxTotal(MAX_LISTENERS * MAX_ITEMS);
+        cm.setDefaultMaxPerRoute(MAX_LISTENERS * MAX_ITEMS);
+
+        config.property(ApacheClientProperties.CONNECTION_MANAGER, cm)
+                .property(ClientProperties.READ_TIMEOUT, 2000)
+                .connectorProvider(new ApacheConnectorProvider());
+    }
+
+    @Override
+    protected Client getClient() {
+        if (client.get() == null) {
+            ClientConfig clientConfig = new ClientConfig();
+            configureClient(clientConfig);
+            client.compareAndSet(null,
+                                 ClientBuilder.newBuilder()
+                                              .withConfig(clientConfig)
+                                              .executorService(executorService).build());
+        }
+
+        return client.get();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        client.get().close();
+        executorService.shutdown();
+    }
+
+    @Override
+    protected URI getBaseUri() {
+        final UriBuilder baseUriBuilder = UriBuilder.fromUri(super.getBaseUri()).path("sse-item-store-jaxrs-webapp");
+        final boolean externalFactoryInUse = getTestContainerFactory() instanceof ExternalTestContainerFactory;
+        return externalFactoryInUse ? baseUriBuilder.path("resources").build() : baseUriBuilder.build();
+    }
+
+    /**
+     * Test the item addition, addition event broadcasting and item retrieval from {@link JaxrsItemStoreResource}.
+     *
+     * @throws Exception in case of a test failure.
+     */
+    @Test
+    public void testItemsStore() throws Exception {
+        final List<String> items = Collections.unmodifiableList(Arrays.asList("foo", "bar", "baz"));
+        final WebTarget itemsTarget = target("items");
+        final CountDownLatch latch = new CountDownLatch(items.size() * MAX_LISTENERS * 2); // countdown on all events
+        final List<Queue<Integer>> indexQueues = new ArrayList<>(MAX_LISTENERS);
+        final SseEventSource[] sources = new SseEventSource[MAX_LISTENERS];
+        final AtomicInteger sizeEventsCount = new AtomicInteger(0);
+
+        for (int i = 0; i < MAX_LISTENERS; i++) {
+            final int id = i;
+            final SseEventSource es = SseEventSource.target(itemsTarget.path("events")).build();
+            sources[id] = es;
+
+            final Queue<Integer> indexes = new ConcurrentLinkedQueue<>();
+            indexQueues.add(indexes);
+
+            es.register(inboundEvent -> {
+                try {
+                    if (null == inboundEvent.getName()) {
+                        final String data = inboundEvent.readData();
+                        LOGGER.info("[-i-] SOURCE " + id + ": Received event id=" + inboundEvent.getId() + " data=" + data);
+                        indexes.add(items.indexOf(data));
+                    } else if ("size".equals(inboundEvent.getName())) {
+                        sizeEventsCount.incrementAndGet();
+                    }
+                } catch (Exception ex) {
+                    LOGGER.log(Level.SEVERE, "[-x-] SOURCE " + id + ": Error getting event data.", ex);
+                    indexes.add(-999);
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+
+        try {
+            open(sources);
+            items.forEach((item) -> postItem(itemsTarget, item));
+
+            assertTrue("Waiting to receive all events has timed out.",
+                    latch.await((1000 + MAX_LISTENERS * RECONNECT_DEFAULT) * getAsyncTimeoutMultiplier(),
+                            TimeUnit.MILLISECONDS));
+
+            // need to force disconnect on server in order for EventSource.close(...) to succeed with HttpUrlConnection
+            sendCommand(itemsTarget, "disconnect");
+        } finally {
+            close(sources);
+        }
+
+        String postedItems = itemsTarget.request().get(String.class);
+        items.forEach((item) -> assertTrue("Item '" + item + "' not stored on server.", postedItems.contains(item)));
+
+        final AtomicInteger queueId = new AtomicInteger(0);
+        indexQueues.forEach((indexes) -> {
+            for (int i = 0; i < items.size(); i++) {
+                assertTrue("Event for '" + items.get(i) + "' not received in queue " + queueId.get(), indexes.contains(i));
+            }
+            assertEquals("Not received the expected number of events in queue " + queueId.get(), items.size(), indexes.size());
+            queueId.incrementAndGet();
+        });
+
+        assertEquals("Number of received 'size' events does not match.", items.size() * MAX_LISTENERS, sizeEventsCount.get());
+    }
+
+    /**
+     * Test the {@link SseEventSource} reconnect feature.
+     *
+     * @throws Exception in case of a test failure.
+     */
+    @Test
+    public void testEventSourceReconnect() throws Exception {
+        final WebTarget itemsTarget = target("items");
+        final CountDownLatch latch = new CountDownLatch(MAX_ITEMS * MAX_LISTENERS * 2); // countdown only on new item events
+        final List<Queue<String>> receivedQueues = new ArrayList<>(MAX_LISTENERS);
+        final SseEventSource[] sources = new SseEventSource[MAX_LISTENERS];
+
+        for (int i = 0; i < MAX_LISTENERS; i++) {
+            final int id = i;
+            final SseEventSource es = SseEventSource.target(itemsTarget.path("events"))
+                    .reconnectingEvery(1, TimeUnit.MILLISECONDS).build();
+            sources[id] = es;
+
+            final Queue<String> received = new ConcurrentLinkedQueue<>();
+            receivedQueues.add(received);
+
+            es.register(inboundEvent -> {
+                try {
+                    if (null == inboundEvent.getName()) {
+                        final String data = inboundEvent.readData();
+                        LOGGER.info("[-i-] SOURCE " + id + ": Received event id=" + inboundEvent.getId() + " data=" + data);
+                        received.add(data);
+                        latch.countDown();
+                    }
+                } catch (Exception ex) {
+                    LOGGER.log(Level.SEVERE, "[-x-] SOURCE " + id + ": Error getting event data.", ex);
+                    received.add("[data processing error]");
+                }
+            });
+        }
+
+        final String[] postedItems = new String[MAX_ITEMS * 2];
+        try {
+            open(sources);
+
+            for (int i = 0; i < MAX_ITEMS; i++) {
+                final String item = String.format("round-1-%02d", i);
+                postItem(itemsTarget, item);
+                postedItems[i] = item;
+                sendCommand(itemsTarget, "disconnect");
+                Thread.sleep(200);
+            }
+
+            final int reconnectDelay = 1;
+            sendCommand(itemsTarget, "reconnect " + reconnectDelay);
+            sendCommand(itemsTarget, "disconnect");
+
+            Thread.sleep(reconnectDelay * 1000);
+
+            for (int i = 0; i < MAX_ITEMS; i++) {
+                final String item = String.format("round-2-%02d", i);
+                postedItems[i + MAX_ITEMS] = item;
+                postItem(itemsTarget, item);
+            }
+
+            sendCommand(itemsTarget, "reconnect now");
+
+            assertTrue("Waiting to receive all events has timed out.",
+                    latch.await((1 + MAX_LISTENERS * (MAX_ITEMS + 1) * reconnectDelay) * getAsyncTimeoutMultiplier(),
+                            TimeUnit.SECONDS));
+
+            // need to force disconnect on server in order for EventSource.close(...) to succeed with HttpUrlConnection
+            sendCommand(itemsTarget, "disconnect");
+        } finally {
+            close(sources);
+        }
+
+        final String storedItems = itemsTarget.request().get(String.class);
+        for (String item : postedItems) {
+            assertThat("Posted item '" + item + "' stored on server", storedItems, containsString(item));
+        }
+
+        int sourceId = 0;
+        for (Queue<String> queue : receivedQueues) {
+            assertThat("Received events in source " + sourceId, queue,
+                    describedAs("Collection containing %0", hasItems(postedItems), Arrays.asList(postedItems).toString()));
+            assertThat("Size of received queue for source " + sourceId, queue.size(), equalTo(postedItems.length));
+            sourceId++;
+        }
+    }
+
+    private static void postItem(final WebTarget itemsTarget, final String item) {
+        final Response response = itemsTarget.request().post(Entity.form(new Form("name", item)));
+        assertEquals("Posting new item has failed.", 204, response.getStatus());
+        LOGGER.info("[-i-] POSTed item: '" + item + "'");
+    }
+
+    private static void open(final SseEventSource[] sources) {
+        Arrays.stream(sources).forEach(SseEventSource::open);
+    }
+
+    private static void close(final SseEventSource[] sources) {
+        int i = 0;
+        for (SseEventSource source : sources) {
+            if (source.isOpen()) {
+                assertTrue("Waiting to close a source has timed out.", source.close(1, TimeUnit.SECONDS));
+//                    source.close(100, TimeUnit.MILLISECONDS);
+                LOGGER.info("[<--] SOURCE " + i++ + " closed.");
+            }
+        }
+    }
+
+    private static void sendCommand(final WebTarget itemsTarget, final String command) {
+        final Response response = itemsTarget.path("commands").request().post(Entity.text(command));
+        assertEquals("'" + command + "' command has failed.", 200, response.getStatus());
+        LOGGER.info("[-!-] COMMAND '" + command + "' has been processed.");
+    }
+}