001/*
002 * GWTEventService
003 * Copyright (c) 2011 and beyond, strawbill UG (haftungsbeschr?nkt)
004 *
005 * This is free software; you can redistribute it and/or modify it
006 * under the terms of the GNU Lesser General Public License as
007 * published by the Free Software Foundation; either version 3 of
008 * the License, or (at your option) any later version.
009 * Other licensing for GWTEventService may also be possible on request.
010 * Please view the license.txt of the project for more information.
011 *
012 * This software is distributed in the hope that it will be useful,
013 * but WITHOUT ANY WARRANTY; without even the implied warranty of
014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015 * Lesser General Public License for more details.
016 *
017 * You should have received a copy of the GNU Lesser General Public
018 * License along with this software; if not, write to the Free
019 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021 */
022package de.novanic.eventservice.service.connection.strategy.connector.streaming;
023
024import com.google.gwt.user.client.rpc.SerializationException;
025import com.google.gwt.user.server.rpc.SerializationPolicy;
026import com.google.gwt.user.server.rpc.impl.ServerSerializationStreamWriter;
027import de.novanic.eventservice.client.event.DomainEvent;
028import de.novanic.eventservice.config.EventServiceConfiguration;
029import de.novanic.eventservice.logger.ServerLogger;
030import de.novanic.eventservice.logger.ServerLoggerFactory;
031import de.novanic.eventservice.service.EventServiceException;
032import de.novanic.eventservice.service.connection.strategy.connector.ConnectionStrategyServerConnectorAdapter;
033import de.novanic.eventservice.service.registry.user.UserInfo;
034
035import javax.servlet.http.HttpServletResponse;
036import java.io.IOException;
037import java.io.OutputStream;
038import java.util.ArrayList;
039import java.util.List;
040
041/**
042 * The {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector} implements
043 * the streaming event listen method. Streaming means that the connection is hold open for a specified time and when an event
044 * occurs, the answer / event is streamed directly to the client without closing and re-open the connection. The connection is
045 * closed and re-opened (by the client) when the configured max. waiting time is reached.
046 *
047 * @author sstrohschein
048 *         <br>Date: 15.03.2010
049 *         <br>Time: 23:00:34
050 */
051public class StreamingServerConnector extends ConnectionStrategyServerConnectorAdapter implements Cloneable
052{
053    private static byte[] SCRIPT_TAG_PREFIX;
054    private static byte[] SCRIPT_TAG_SUFFIX;
055    private static byte[] CYCLE_TAG;
056
057    private static final ServerLogger LOG = ServerLoggerFactory.getServerLogger(StreamingServerConnector.class.getName());
058
059    private HttpServletResponse myResponse;
060    private OutputStream myOutputStream;
061    private SerializationPolicy mySerializationPolicy;
062
063    /**
064     * Creates a new {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector}.
065     * The {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector} implements
066     * the streaming event listen method.
067     * @param aConfiguration configuration
068     */
069    public StreamingServerConnector(EventServiceConfiguration aConfiguration) throws EventServiceException {
070        this(aConfiguration, new EventSerializationPolicy());
071        SCRIPT_TAG_PREFIX = encode("<script type='text/javascript'>window.parent.receiveEvent('");
072        SCRIPT_TAG_SUFFIX = encode("');</script>");
073        CYCLE_TAG = encode("cycle");
074    }
075
076    /**
077     * Creates a new {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector}.
078     * The {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector} implements
079     * the streaming event listen method.
080     * @param aConfiguration configuration
081     * @param aSerializationPolicy serialization policy to define the serialization of event (preparation for the transfer of events)
082     */
083    protected StreamingServerConnector(EventServiceConfiguration aConfiguration, SerializationPolicy aSerializationPolicy) {
084        super(aConfiguration);
085        mySerializationPolicy = aSerializationPolicy;
086    }
087
088    /**
089     * Prepares the {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector} with a response.
090     * The response is required to stream the events to the client. Therefore that method must be called before the listening for events starts.
091     * @param aResponse response
092     * @throws EventServiceException
093     */
094    public void prepare(HttpServletResponse aResponse) throws EventServiceException {
095        myResponse = aResponse;
096        try {
097            myOutputStream = aResponse.getOutputStream();
098        } catch(IOException e) {
099            throw new EventServiceException("Error on using output stream of the response!", e);
100        }
101        myResponse.setContentType("text/html;charset=" + getEncoding());
102        myResponse.setHeader("expires", "0");
103        myResponse.setHeader("cache-control", "no-cache");
104        myResponse.setHeader("transfer-encoding", "chunked");
105    }
106
107    /**
108     * Listens for occurring events (can be retrieved from the {@link de.novanic.eventservice.service.registry.user.UserInfo} with
109     * {@link de.novanic.eventservice.service.registry.user.UserInfo#retrieveEvents(int)}) and should prepare or transfer the retrieved events
110     * directly. The reason for the listen and transfer preparation within one single method is, that the {@link de.novanic.eventservice.service.connection.strategy.connector.ConnectionStrategyServerConnector}
111     * should have the control about listening and transfer of the occurred events.
112     * The streaming implementation needs a response to stream the events to the clients. That can be prepared with
113     * {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector#prepare(javax.servlet.http.HttpServletResponse)}.
114     * @param aUserInfo {@link de.novanic.eventservice.service.registry.user.UserInfo} which holds new occurred events
115     * @return occurred events
116     * @throws EventServiceException
117     */
118    public List<DomainEvent> listen(UserInfo aUserInfo) throws EventServiceException {
119        List<DomainEvent> theEvents = new ArrayList<DomainEvent>();
120        try {
121            //loops until the max. waiting time is exceed
122            do {
123                List<DomainEvent> theCurrentEvents = aUserInfo.retrieveEvents(getConfiguration().getMaxEvents());
124                if(!theCurrentEvents.isEmpty()) {
125                    aUserInfo.reportUserActivity();
126                    theEvents.addAll(theCurrentEvents);
127                    for(DomainEvent theEvent: theCurrentEvents) {
128                        //serialization and escaping
129                        String theSerializedEvent = serialize(theEvent);
130                        theSerializedEvent = escapeSerializedData(theSerializedEvent);
131                        //writing to the stream
132                        printStatement(encode(theSerializedEvent), myOutputStream);
133                    }
134                    aUserInfo.reportUserActivity();
135                }
136            } while(!waitMaxWaitingTime(aUserInfo));
137            //TODO think of a max. connection time, because max. waiting time describes the waiting time max. time between events and another time is required to define the max. connection time to avoid client side timeout detection
138
139            //writing cycle command to the stream
140            printStatement(CYCLE_TAG, myOutputStream);
141        } catch(FlushException e) {
142            LOG.debug(e.getMessage());
143        } finally {
144            try {
145                close(myOutputStream);
146            } catch(CloseException e) {
147                LOG.debug(e.getMessage());
148            }
149        }
150        return theEvents;
151    }
152
153    /**
154     * A {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector} has to be
155     * cloneable, because it isn't stateless caused by the necessary for a client dependent response
156     * (see {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector#prepare(javax.servlet.http.HttpServletResponse)}).
157     * @return the cloned {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector}
158     * @throws CloneNotSupportedException
159     */
160    public Object clone() throws CloneNotSupportedException {
161        return super.clone();
162    }
163
164    /**
165     * Transforms an event to a String to make it streamable.
166     * @param anEvent event to serialize
167     * @return serialized event (the event as a String)
168     * @throws EventServiceException
169     */
170    private String serialize(DomainEvent anEvent) throws EventServiceException {
171        try {
172            ServerSerializationStreamWriter theServerSerializationStreamWriter = new ServerSerializationStreamWriter(mySerializationPolicy);
173                theServerSerializationStreamWriter.setFlags(0);
174                theServerSerializationStreamWriter.prepareToWrite();
175
176                        theServerSerializationStreamWriter.serializeValue(anEvent, DomainEvent.class);
177
178                        return theServerSerializationStreamWriter.toString();
179        } catch(SerializationException e) {
180                        throw new EventServiceException("Error on serializing the event \"" + anEvent
181                    + "\" for domain \"" + anEvent.getDomain() + "\"!", e);
182                }
183    }
184
185    /**
186     * Escapes the serialized data.
187     * @param aSerializedData serialized data to escape
188     * @return escaped serialized data
189     */
190    private String escapeSerializedData(String aSerializedData) {
191        String theEscapedData = aSerializedData;
192        theEscapedData = theEscapedData.replaceAll("\\\\", "\\\\\\\\");
193        theEscapedData = theEscapedData.replaceAll("\\\'", "\\\\\'");
194        return theEscapedData;
195    }
196
197    /**
198     * Prints a statement to a stream.
199     * @param aStatement statement to print
200     * @param anOutputStream stream
201     * @throws EventServiceException
202     */
203    private void printStatement(byte[] aStatement, OutputStream anOutputStream) throws EventServiceException, FlushException {
204        try {
205            anOutputStream.write(SCRIPT_TAG_PREFIX);
206            anOutputStream.write(aStatement);
207            anOutputStream.write(SCRIPT_TAG_SUFFIX);
208        } catch(IOException e) {
209            throw new EventServiceException("Error on printing statement \"" + new String(aStatement) + "\"!", e);
210        } finally {
211            flush(aStatement, anOutputStream);
212        }
213    }
214
215    private void flush(byte[] aStatement, OutputStream anOutputStream) throws FlushException {
216        try {
217            anOutputStream.flush();
218            myResponse.flushBuffer();
219        } catch(IOException e) {
220            throw new FlushException(aStatement, e);
221        }
222    }
223
224    private void close(OutputStream anOutputStream) throws CloseException {
225        try {
226            anOutputStream.close();
227        } catch(IOException e) {
228            throw new CloseException(e);
229        }
230    }
231
232    private static class CloseException extends Exception
233    {
234        private CloseException(Throwable aThrowable) {
235            super("Error on closing output stream!", aThrowable);
236        }
237    }
238
239    private static class FlushException extends Exception
240    {
241        private FlushException(byte[] aFlushingStatement, Throwable aThrowable) {
242            super(createMessage(aFlushingStatement), aThrowable);
243        }
244
245        private static String createMessage(byte[] aFlushingStatement) {
246            return "Flushing wasn't successful (\"" + new String(aFlushingStatement) + "\")!";
247        }
248    }
249}