001    package org.otfeed;
002    
003    import java.util.HashMap;
004    import java.util.LinkedList;
005    import java.util.List;
006    import java.util.Map;
007    import java.util.concurrent.atomic.AtomicBoolean;
008    
009    import org.otfeed.IConnection;
010    import org.otfeed.IConnectionFactory;
011    import org.otfeed.IRequest;
012    import org.otfeed.event.IConnectionStateListener;
013    import org.otfeed.event.OTError;
014    import org.otfeed.event.OTHost;
015    import org.otfeed.protocol.ICommand;
016    
017    /**
018     * Class that facilitates connection pooling (sharing).
019     * This class maintains one physical connection per specification
020     * and routes all {@link #connect(OTConnectionSpec, IConnectionStateListener)} 
021     * requests to the connection that matches the specification. 
022     * This is convenient as all clients
023     * can view their copy of connection as an independent one. 
024     * This is efficient, because only minimal number of system resources
025     * is consumed.
026     * It is recommended that this class should be used whenever there is
027     * a chance that factory's {@link #connect(OTConnectionSpec,IConnectionStateListener)} method can be called more
028     * than once. 
029     */
030    public class OTPooledConnectionFactory implements IConnectionFactory {
031            
032            private IConnectionFactory engine;
033            
034            /**
035             * The factory of shared connection (typically an instance of {@link org.otfeed.OTConnectionFactory}).
036             * @param engine
037             */
038            public void setConnectionFactory(IConnectionFactory engine) {
039                    this.engine = engine;
040            }
041            public IConnectionFactory getConnectionFactory() {
042                    return engine;
043            }
044            
045            private Map<OTConnectionSpec,PooledConnection> pooledConnections = new HashMap<OTConnectionSpec,PooledConnection>();
046            private boolean isGlobalShutdown = false; // global shutdown flag, object no longer usable if "on"
047            
048            private static final OTConnectionSpec NULL_SPEC = new OTConnectionSpec();
049            
050            public synchronized IConnection connect(IConnectionStateListener listener) {
051                    return connect(NULL_SPEC, listener);
052            }
053    
054            public synchronized IConnection connect(final OTConnectionSpec spec, IConnectionStateListener listener) {
055                    
056                    if(isGlobalShutdown) {
057                            throw new IllegalStateException("trying to connect after global shutdown");
058                    }
059                    
060                    if(spec == null) {
061                            throw new IllegalArgumentException("'spec' parameter can not be null");
062                    }
063                    
064                    PooledConnection pooledConnection = pooledConnections.get(spec);
065                    if(pooledConnection == null) {
066                            pooledConnection = new PooledConnection(
067                                            new OTConnectionSpec(spec) // clone content
068                                    );
069                    }
070                    
071                    pooledConnection.addConnectionStateListener(listener);
072                    pooledConnection.refCount++;
073                    final PooledConnection connection = pooledConnection;
074    
075                    // this wrapper ensures that shutdown is seen only once per connection
076                    // otherwise refcounting could be confused
077                    return new IConnection () {
078                            
079                            private final AtomicBoolean isShutdown = new AtomicBoolean();
080                            private final AtomicBoolean isLastToShutdown = new AtomicBoolean(false);
081    
082                            public IRequest prepareRequest(ICommand arg0) {
083                                    return connection.prepareRequest(arg0);
084                            }
085    
086                            public void runInEventThread(Runnable arg0) {
087                                    connection.runInEventThread(arg0);
088                            }
089    
090                            public void shutdown() {
091                                    if(isShutdown.getAndSet(true) == false) {
092                                            System.out.println("shutting down " + this);
093                                            synchronized(OTPooledConnectionFactory.this) {
094                                                    if(isGlobalShutdown) return;
095                                                    System.out.println("refcount=" + connection.refCount);
096                                                    if(--connection.refCount == 0) {
097                                                            connection.shutdown();
098                                                            isLastToShutdown.set(true);
099                                                            pooledConnections.remove(spec);
100                                                    }
101                                            }
102                                            
103                                            synchronized(isShutdown) {
104                                                    isShutdown.notifyAll();
105                                            }
106                                    }
107                            }
108    
109                            public void waitForCompletion() {
110                                    waitForCompletion(Integer.MAX_VALUE);
111                            }
112    
113                            public boolean waitForCompletion(long millis) {
114                                    long target = System.currentTimeMillis() + millis;
115    
116                                    while(isShutdown.get() == false) {
117                                            long toWait = target - System.currentTimeMillis();
118                                            
119                                            if(toWait <= 0) return false;
120                                            
121                                            synchronized(isShutdown) {
122                                                    try {
123                                                            isShutdown.wait(toWait);
124                                                    } catch(InterruptedException ex) {
125                                                            return false; // not sure
126                                                    }
127                                            }
128                                    }
129                                    
130                                    if(isLastToShutdown.get()) {
131                                            return connection.waitForCompletion(target - System.currentTimeMillis());
132                                    }
133                                    
134                                    return true;
135                            }
136    
137                    };
138            }
139            
140            /**
141             * Forces the shutdown of the pooled connection. Useful for hooking to the
142             * Spring's "deinit" bean lifecycle to make sure that even if users of this bean 
143             * did not shutdown their clones of the connection, this method will actually
144             * disconnect from Opentick and release all resources.
145             */
146            public synchronized void shutdownAll() {
147                    if(isGlobalShutdown) return; // ignore repeated shutdowns
148                    
149                    for(PooledConnection pc : pooledConnections.values()) {
150                            pc.shutdown();
151                    }
152                    
153                    pooledConnections.clear();
154            }
155            
156            private interface IStateTransitionEvent {
157                    public void applyEvent(IConnectionStateListener l);
158            }
159    
160            // since new connection state listener can be added at any time
161            // during the life cycle of pooled connection, this class will keep
162            // the history of all connection state events seen so far, and replay them
163            // for the incoming listener.
164            private class PooledConnection implements IConnection {
165                    
166                    int refCount = 0;
167                    
168                    private final OTConnectionSpec spec;
169                    private final IConnection connection;
170                    private final List<IConnectionStateListener> listeners = new LinkedList<IConnectionStateListener>();
171                    private final List<IStateTransitionEvent> events = new LinkedList<IStateTransitionEvent>();
172                    
173                    private final IConnectionStateListener stateListener = new IConnectionStateListener() {
174    
175                            public void onConnected() {
176                                    for(IConnectionStateListener l : listeners) {
177                                            l.onConnected();
178                                    }
179                            }
180    
181                            public void onConnecting(final OTHost arg0) {
182                                    for(IConnectionStateListener l : listeners) {
183                                            l.onConnecting(arg0);
184                                    }
185                            }
186    
187                            public void onError(OTError error) {
188                                    for(IConnectionStateListener l : listeners) {
189                                            l.onError(error);
190                                    }
191                            }
192    
193                            public void onLogin() {
194                                    for(IConnectionStateListener l : listeners) {
195                                            l.onLogin();
196                                    }
197                            }
198    
199                            public void onRedirect(OTHost arg0) {
200                                    for(IConnectionStateListener l : listeners) {
201                                            l.onRedirect(arg0);
202                                    }
203                            }
204                    };
205                    
206                    public PooledConnection(OTConnectionSpec s) {
207                            spec = s;
208                            listeners.add(new IConnectionStateListener() {
209    
210                                    public void onConnected() {
211                                            events.add(new IStateTransitionEvent() {
212                                                    public void applyEvent(IConnectionStateListener l) {
213                                                            l.onConnected();
214                                                    }
215                                            });
216                                    }
217    
218                                    public void onConnecting(final OTHost arg0) {
219                                            events.add(new IStateTransitionEvent() {
220                                                    public void applyEvent(IConnectionStateListener l) {
221                                                            l.onConnecting(arg0);
222                                                    }
223                                            });
224                                    }
225    
226                                    public void onError(final OTError arg0) {
227                                            events.add(new IStateTransitionEvent() {
228                                                    public void applyEvent(IConnectionStateListener l) {
229                                                            l.onError(arg0);
230                                                    }
231                                            });
232                                    }
233    
234                                    public void onLogin() {
235                                            events.add(new IStateTransitionEvent() {
236                                                    public void applyEvent(IConnectionStateListener l) {
237                                                            l.onLogin();
238                                                    }
239                                            });
240                                    }
241    
242                                    public void onRedirect(final OTHost arg0) {
243                                            events.add(new IStateTransitionEvent() {
244                                                    public void applyEvent(IConnectionStateListener l) {
245                                                            l.onRedirect(arg0);
246                                                    }
247                                            });
248                                    }
249                            });
250                            
251                            if(NULL_SPEC.equals(spec)) {
252                                    connection = engine.connect(null, stateListener);
253                            } else {
254                                    connection = engine.connect(spec, stateListener);
255                            }
256                    }
257                    
258                    public void addConnectionStateListener(final IConnectionStateListener l) {
259                            
260                            connection.runInEventThread(new Runnable() { public void run() {
261                                    // replay events that we missed
262                                    for(IStateTransitionEvent event : events) {
263                                            event.applyEvent(l);
264                                    }
265                                    // add me to the listeners
266                                    listeners.add(l);
267                            }});
268                    }
269    
270                    public IRequest prepareRequest(ICommand arg0) {
271                            return connection.prepareRequest(arg0);
272                    }
273    
274                    public void runInEventThread(Runnable arg0) {
275                            connection.runInEventThread(arg0);
276                    }
277    
278                    public void shutdown() {
279                            connection.shutdown();
280                    }
281    
282                    public void waitForCompletion() {
283                            connection.waitForCompletion();
284                    }
285    
286                    public boolean waitForCompletion(long arg0) {
287                            return connection.waitForCompletion(arg0);
288                    }
289            }
290    }
291