001 package org.otfeed;
002
003 import java.util.HashMap;
004 import java.util.LinkedList;
005 import java.util.List;
006 import java.util.Map;
007 import java.util.concurrent.atomic.AtomicBoolean;
008
009 import org.otfeed.IConnection;
010 import org.otfeed.IConnectionFactory;
011 import org.otfeed.IRequest;
012 import org.otfeed.event.IConnectionStateListener;
013 import org.otfeed.event.OTError;
014 import org.otfeed.event.OTHost;
015 import org.otfeed.protocol.ICommand;
016
017 /**
018 * Class that facilitates connection pooling (sharing).
019 * This class maintains one physical connection per specification
020 * and routes all {@link #connect(OTConnectionSpec, IConnectionStateListener)}
021 * requests to the connection that matches the specification.
022 * This is convenient as all clients
023 * can view their copy of connection as an independent one.
024 * This is efficient, because only minimal number of system resources
025 * is consumed.
026 * It is recommended that this class should be used whenever there is
027 * a chance that factory's {@link #connect(OTConnectionSpec,IConnectionStateListener)} method can be called more
028 * than once.
029 */
030 public class OTPooledConnectionFactory implements IConnectionFactory {
031
032 private IConnectionFactory engine;
033
034 /**
035 * The factory of shared connection (typically an instance of {@link org.otfeed.OTConnectionFactory}).
036 * @param engine
037 */
038 public void setConnectionFactory(IConnectionFactory engine) {
039 this.engine = engine;
040 }
041 public IConnectionFactory getConnectionFactory() {
042 return engine;
043 }
044
045 private Map<OTConnectionSpec,PooledConnection> pooledConnections = new HashMap<OTConnectionSpec,PooledConnection>();
046 private boolean isGlobalShutdown = false; // global shutdown flag, object no longer usable if "on"
047
048 private static final OTConnectionSpec NULL_SPEC = new OTConnectionSpec();
049
050 public synchronized IConnection connect(IConnectionStateListener listener) {
051 return connect(NULL_SPEC, listener);
052 }
053
054 public synchronized IConnection connect(final OTConnectionSpec spec, IConnectionStateListener listener) {
055
056 if(isGlobalShutdown) {
057 throw new IllegalStateException("trying to connect after global shutdown");
058 }
059
060 if(spec == null) {
061 throw new IllegalArgumentException("'spec' parameter can not be null");
062 }
063
064 PooledConnection pooledConnection = pooledConnections.get(spec);
065 if(pooledConnection == null) {
066 pooledConnection = new PooledConnection(
067 new OTConnectionSpec(spec) // clone content
068 );
069 }
070
071 pooledConnection.addConnectionStateListener(listener);
072 pooledConnection.refCount++;
073 final PooledConnection connection = pooledConnection;
074
075 // this wrapper ensures that shutdown is seen only once per connection
076 // otherwise refcounting could be confused
077 return new IConnection () {
078
079 private final AtomicBoolean isShutdown = new AtomicBoolean();
080 private final AtomicBoolean isLastToShutdown = new AtomicBoolean(false);
081
082 public IRequest prepareRequest(ICommand arg0) {
083 return connection.prepareRequest(arg0);
084 }
085
086 public void runInEventThread(Runnable arg0) {
087 connection.runInEventThread(arg0);
088 }
089
090 public void shutdown() {
091 if(isShutdown.getAndSet(true) == false) {
092 System.out.println("shutting down " + this);
093 synchronized(OTPooledConnectionFactory.this) {
094 if(isGlobalShutdown) return;
095 System.out.println("refcount=" + connection.refCount);
096 if(--connection.refCount == 0) {
097 connection.shutdown();
098 isLastToShutdown.set(true);
099 pooledConnections.remove(spec);
100 }
101 }
102
103 synchronized(isShutdown) {
104 isShutdown.notifyAll();
105 }
106 }
107 }
108
109 public void waitForCompletion() {
110 waitForCompletion(Integer.MAX_VALUE);
111 }
112
113 public boolean waitForCompletion(long millis) {
114 long target = System.currentTimeMillis() + millis;
115
116 while(isShutdown.get() == false) {
117 long toWait = target - System.currentTimeMillis();
118
119 if(toWait <= 0) return false;
120
121 synchronized(isShutdown) {
122 try {
123 isShutdown.wait(toWait);
124 } catch(InterruptedException ex) {
125 return false; // not sure
126 }
127 }
128 }
129
130 if(isLastToShutdown.get()) {
131 return connection.waitForCompletion(target - System.currentTimeMillis());
132 }
133
134 return true;
135 }
136
137 };
138 }
139
140 /**
141 * Forces the shutdown of the pooled connection. Useful for hooking to the
142 * Spring's "deinit" bean lifecycle to make sure that even if users of this bean
143 * did not shutdown their clones of the connection, this method will actually
144 * disconnect from Opentick and release all resources.
145 */
146 public synchronized void shutdownAll() {
147 if(isGlobalShutdown) return; // ignore repeated shutdowns
148
149 for(PooledConnection pc : pooledConnections.values()) {
150 pc.shutdown();
151 }
152
153 pooledConnections.clear();
154 }
155
156 private interface IStateTransitionEvent {
157 public void applyEvent(IConnectionStateListener l);
158 }
159
160 // since new connection state listener can be added at any time
161 // during the life cycle of pooled connection, this class will keep
162 // the history of all connection state events seen so far, and replay them
163 // for the incoming listener.
164 private class PooledConnection implements IConnection {
165
166 int refCount = 0;
167
168 private final OTConnectionSpec spec;
169 private final IConnection connection;
170 private final List<IConnectionStateListener> listeners = new LinkedList<IConnectionStateListener>();
171 private final List<IStateTransitionEvent> events = new LinkedList<IStateTransitionEvent>();
172
173 private final IConnectionStateListener stateListener = new IConnectionStateListener() {
174
175 public void onConnected() {
176 for(IConnectionStateListener l : listeners) {
177 l.onConnected();
178 }
179 }
180
181 public void onConnecting(final OTHost arg0) {
182 for(IConnectionStateListener l : listeners) {
183 l.onConnecting(arg0);
184 }
185 }
186
187 public void onError(OTError error) {
188 for(IConnectionStateListener l : listeners) {
189 l.onError(error);
190 }
191 }
192
193 public void onLogin() {
194 for(IConnectionStateListener l : listeners) {
195 l.onLogin();
196 }
197 }
198
199 public void onRedirect(OTHost arg0) {
200 for(IConnectionStateListener l : listeners) {
201 l.onRedirect(arg0);
202 }
203 }
204 };
205
206 public PooledConnection(OTConnectionSpec s) {
207 spec = s;
208 listeners.add(new IConnectionStateListener() {
209
210 public void onConnected() {
211 events.add(new IStateTransitionEvent() {
212 public void applyEvent(IConnectionStateListener l) {
213 l.onConnected();
214 }
215 });
216 }
217
218 public void onConnecting(final OTHost arg0) {
219 events.add(new IStateTransitionEvent() {
220 public void applyEvent(IConnectionStateListener l) {
221 l.onConnecting(arg0);
222 }
223 });
224 }
225
226 public void onError(final OTError arg0) {
227 events.add(new IStateTransitionEvent() {
228 public void applyEvent(IConnectionStateListener l) {
229 l.onError(arg0);
230 }
231 });
232 }
233
234 public void onLogin() {
235 events.add(new IStateTransitionEvent() {
236 public void applyEvent(IConnectionStateListener l) {
237 l.onLogin();
238 }
239 });
240 }
241
242 public void onRedirect(final OTHost arg0) {
243 events.add(new IStateTransitionEvent() {
244 public void applyEvent(IConnectionStateListener l) {
245 l.onRedirect(arg0);
246 }
247 });
248 }
249 });
250
251 if(NULL_SPEC.equals(spec)) {
252 connection = engine.connect(null, stateListener);
253 } else {
254 connection = engine.connect(spec, stateListener);
255 }
256 }
257
258 public void addConnectionStateListener(final IConnectionStateListener l) {
259
260 connection.runInEventThread(new Runnable() { public void run() {
261 // replay events that we missed
262 for(IStateTransitionEvent event : events) {
263 event.applyEvent(l);
264 }
265 // add me to the listeners
266 listeners.add(l);
267 }});
268 }
269
270 public IRequest prepareRequest(ICommand arg0) {
271 return connection.prepareRequest(arg0);
272 }
273
274 public void runInEventThread(Runnable arg0) {
275 connection.runInEventThread(arg0);
276 }
277
278 public void shutdown() {
279 connection.shutdown();
280 }
281
282 public void waitForCompletion() {
283 connection.waitForCompletion();
284 }
285
286 public boolean waitForCompletion(long arg0) {
287 return connection.waitForCompletion(arg0);
288 }
289 }
290 }
291