001/* 002 * GWTEventService 003 * Copyright (c) 2011 and beyond, strawbill UG (haftungsbeschr?nkt) 004 * 005 * This is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Lesser General Public License as 007 * published by the Free Software Foundation; either version 3 of 008 * the License, or (at your option) any later version. 009 * Other licensing for GWTEventService may also be possible on request. 010 * Please view the license.txt of the project for more information. 011 * 012 * This software is distributed in the hope that it will be useful, 013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 015 * Lesser General Public License for more details. 016 * 017 * You should have received a copy of the GNU Lesser General Public 018 * License along with this software; if not, write to the Free 019 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 020 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 021 */ 022package de.novanic.eventservice.service.connection.strategy.connector.streaming; 023 024import com.google.gwt.user.client.rpc.SerializationException; 025import com.google.gwt.user.server.rpc.SerializationPolicy; 026import com.google.gwt.user.server.rpc.impl.ServerSerializationStreamWriter; 027import de.novanic.eventservice.client.event.DomainEvent; 028import de.novanic.eventservice.config.EventServiceConfiguration; 029import de.novanic.eventservice.logger.ServerLogger; 030import de.novanic.eventservice.logger.ServerLoggerFactory; 031import de.novanic.eventservice.service.EventServiceException; 032import de.novanic.eventservice.service.connection.strategy.connector.ConnectionStrategyServerConnectorAdapter; 033import de.novanic.eventservice.service.registry.user.UserInfo; 034 035import javax.servlet.http.HttpServletResponse; 036import java.io.IOException; 037import java.io.OutputStream; 038import java.util.ArrayList; 039import java.util.List; 040 041/** 042 * The {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector} implements 043 * the streaming event listen method. Streaming means that the connection is hold open for a specified time and when an event 044 * occurs, the answer / event is streamed directly to the client without closing and re-open the connection. The connection is 045 * closed and re-opened (by the client) when the configured max. waiting time is reached. 046 * 047 * @author sstrohschein 048 * <br>Date: 15.03.2010 049 * <br>Time: 23:00:34 050 */ 051public class StreamingServerConnector extends ConnectionStrategyServerConnectorAdapter implements Cloneable 052{ 053 private static byte[] SCRIPT_TAG_PREFIX; 054 private static byte[] SCRIPT_TAG_SUFFIX; 055 private static byte[] CYCLE_TAG; 056 057 private static final ServerLogger LOG = ServerLoggerFactory.getServerLogger(StreamingServerConnector.class.getName()); 058 059 private HttpServletResponse myResponse; 060 private OutputStream myOutputStream; 061 private SerializationPolicy mySerializationPolicy; 062 063 /** 064 * Creates a new {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector}. 065 * The {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector} implements 066 * the streaming event listen method. 067 * @param aConfiguration configuration 068 */ 069 public StreamingServerConnector(EventServiceConfiguration aConfiguration) throws EventServiceException { 070 this(aConfiguration, new EventSerializationPolicy()); 071 SCRIPT_TAG_PREFIX = encode("<script type='text/javascript'>window.parent.receiveEvent('"); 072 SCRIPT_TAG_SUFFIX = encode("');</script>"); 073 CYCLE_TAG = encode("cycle"); 074 } 075 076 /** 077 * Creates a new {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector}. 078 * The {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector} implements 079 * the streaming event listen method. 080 * @param aConfiguration configuration 081 * @param aSerializationPolicy serialization policy to define the serialization of event (preparation for the transfer of events) 082 */ 083 protected StreamingServerConnector(EventServiceConfiguration aConfiguration, SerializationPolicy aSerializationPolicy) { 084 super(aConfiguration); 085 mySerializationPolicy = aSerializationPolicy; 086 } 087 088 /** 089 * Prepares the {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector} with a response. 090 * The response is required to stream the events to the client. Therefore that method must be called before the listening for events starts. 091 * @param aResponse response 092 * @throws EventServiceException 093 */ 094 public void prepare(HttpServletResponse aResponse) throws EventServiceException { 095 myResponse = aResponse; 096 try { 097 myOutputStream = aResponse.getOutputStream(); 098 } catch(IOException e) { 099 throw new EventServiceException("Error on using output stream of the response!", e); 100 } 101 myResponse.setContentType("text/html;charset=" + getEncoding()); 102 myResponse.setHeader("expires", "0"); 103 myResponse.setHeader("cache-control", "no-cache"); 104 myResponse.setHeader("transfer-encoding", "chunked"); 105 } 106 107 /** 108 * Listens for occurring events (can be retrieved from the {@link de.novanic.eventservice.service.registry.user.UserInfo} with 109 * {@link de.novanic.eventservice.service.registry.user.UserInfo#retrieveEvents(int)}) and should prepare or transfer the retrieved events 110 * directly. The reason for the listen and transfer preparation within one single method is, that the {@link de.novanic.eventservice.service.connection.strategy.connector.ConnectionStrategyServerConnector} 111 * should have the control about listening and transfer of the occurred events. 112 * The streaming implementation needs a response to stream the events to the clients. That can be prepared with 113 * {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector#prepare(javax.servlet.http.HttpServletResponse)}. 114 * @param aUserInfo {@link de.novanic.eventservice.service.registry.user.UserInfo} which holds new occurred events 115 * @return occurred events 116 * @throws EventServiceException 117 */ 118 public List<DomainEvent> listen(UserInfo aUserInfo) throws EventServiceException { 119 List<DomainEvent> theEvents = new ArrayList<DomainEvent>(); 120 try { 121 //loops until the max. waiting time is exceed 122 do { 123 List<DomainEvent> theCurrentEvents = aUserInfo.retrieveEvents(getConfiguration().getMaxEvents()); 124 if(!theCurrentEvents.isEmpty()) { 125 aUserInfo.reportUserActivity(); 126 theEvents.addAll(theCurrentEvents); 127 for(DomainEvent theEvent: theCurrentEvents) { 128 //serialization and escaping 129 String theSerializedEvent = serialize(theEvent); 130 theSerializedEvent = escapeSerializedData(theSerializedEvent); 131 //writing to the stream 132 printStatement(encode(theSerializedEvent), myOutputStream); 133 } 134 aUserInfo.reportUserActivity(); 135 } 136 } while(!waitMaxWaitingTime(aUserInfo)); 137 //TODO think of a max. connection time, because max. waiting time describes the waiting time max. time between events and another time is required to define the max. connection time to avoid client side timeout detection 138 139 //writing cycle command to the stream 140 printStatement(CYCLE_TAG, myOutputStream); 141 } catch(FlushException e) { 142 LOG.debug(e.getMessage()); 143 } finally { 144 try { 145 close(myOutputStream); 146 } catch(CloseException e) { 147 LOG.debug(e.getMessage()); 148 } 149 } 150 return theEvents; 151 } 152 153 /** 154 * A {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector} has to be 155 * cloneable, because it isn't stateless caused by the necessary for a client dependent response 156 * (see {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector#prepare(javax.servlet.http.HttpServletResponse)}). 157 * @return the cloned {@link de.novanic.eventservice.service.connection.strategy.connector.streaming.StreamingServerConnector} 158 * @throws CloneNotSupportedException 159 */ 160 public Object clone() throws CloneNotSupportedException { 161 return super.clone(); 162 } 163 164 /** 165 * Transforms an event to a String to make it streamable. 166 * @param anEvent event to serialize 167 * @return serialized event (the event as a String) 168 * @throws EventServiceException 169 */ 170 private String serialize(DomainEvent anEvent) throws EventServiceException { 171 try { 172 ServerSerializationStreamWriter theServerSerializationStreamWriter = new ServerSerializationStreamWriter(mySerializationPolicy); 173 theServerSerializationStreamWriter.setFlags(0); 174 theServerSerializationStreamWriter.prepareToWrite(); 175 176 theServerSerializationStreamWriter.serializeValue(anEvent, DomainEvent.class); 177 178 return theServerSerializationStreamWriter.toString(); 179 } catch(SerializationException e) { 180 throw new EventServiceException("Error on serializing the event \"" + anEvent 181 + "\" for domain \"" + anEvent.getDomain() + "\"!", e); 182 } 183 } 184 185 /** 186 * Escapes the serialized data. 187 * @param aSerializedData serialized data to escape 188 * @return escaped serialized data 189 */ 190 private String escapeSerializedData(String aSerializedData) { 191 String theEscapedData = aSerializedData; 192 theEscapedData = theEscapedData.replaceAll("\\\\", "\\\\\\\\"); 193 theEscapedData = theEscapedData.replaceAll("\\\'", "\\\\\'"); 194 return theEscapedData; 195 } 196 197 /** 198 * Prints a statement to a stream. 199 * @param aStatement statement to print 200 * @param anOutputStream stream 201 * @throws EventServiceException 202 */ 203 private void printStatement(byte[] aStatement, OutputStream anOutputStream) throws EventServiceException, FlushException { 204 try { 205 anOutputStream.write(SCRIPT_TAG_PREFIX); 206 anOutputStream.write(aStatement); 207 anOutputStream.write(SCRIPT_TAG_SUFFIX); 208 } catch(IOException e) { 209 throw new EventServiceException("Error on printing statement \"" + new String(aStatement) + "\"!", e); 210 } finally { 211 flush(aStatement, anOutputStream); 212 } 213 } 214 215 private void flush(byte[] aStatement, OutputStream anOutputStream) throws FlushException { 216 try { 217 anOutputStream.flush(); 218 myResponse.flushBuffer(); 219 } catch(IOException e) { 220 throw new FlushException(aStatement, e); 221 } 222 } 223 224 private void close(OutputStream anOutputStream) throws CloseException { 225 try { 226 anOutputStream.close(); 227 } catch(IOException e) { 228 throw new CloseException(e); 229 } 230 } 231 232 private static class CloseException extends Exception 233 { 234 private CloseException(Throwable aThrowable) { 235 super("Error on closing output stream!", aThrowable); 236 } 237 } 238 239 private static class FlushException extends Exception 240 { 241 private FlushException(byte[] aFlushingStatement, Throwable aThrowable) { 242 super(createMessage(aFlushingStatement), aThrowable); 243 } 244 245 private static String createMessage(byte[] aFlushingStatement) { 246 return "Flushing wasn't successful (\"" + new String(aFlushingStatement) + "\")!"; 247 } 248 } 249}