001    /**
002     * Copyright 2007 Mike Kroutikov.
003     *
004     * This program is free software; you can redistribute it and/or modify
005     *   it under the terms of the Lesser GNU General Public License as 
006     *   published by the Free Software Foundation; either version 3 of
007     *   the License, or (at your option) any later version.
008     *
009     *   This program is distributed in the hope that it will be useful,
010     *   but WITHOUT ANY WARRANTY; without even the implied warranty of
011     *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
012     *   Lesser GNU General Public License for more details.
013     *
014     *   You should have received a copy of the Lesser GNU General Public License
015     *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
016     */
017    
018    package org.otfeed;
019    
020    import org.otfeed.event.IConnectionStateListener;
021    import org.otfeed.event.OTHost;
022    import org.otfeed.protocol.connector.IStreamerFactory;
023    import org.otfeed.protocol.connector.LoginStreamerFactory;
024    import org.otfeed.protocol.connector.OTEngine;
025    import org.otfeed.support.ByteReverseBufferAllocator;
026    import org.otfeed.support.ConnectionStateListener;
027    import org.otfeed.support.IBufferAllocator;
028    
029    import java.util.Collection;
030    import java.util.Iterator;
031    import java.util.List;
032    import java.util.ListIterator;
033    import java.util.Set;
034    import java.util.concurrent.Executor;
035    
036    /**
037     * Factory of connections to the OpenTick service.
038     * This is the root object of the OpenTick client API. It is used to
039     * establish a connection to the OpenTick service. 
040     *
041     * <p>
042     * Before calling the {@link #connect} method, one have to set 
043     * {@link #setUsername username}/{@link #setPassword password}
044     * and at least one host in the {@link #setHosts(Set) hosts}.
045     *
046     * <h3>Sample usage</h3>
047     * Sample usage can be like this:
048     * <pre>
049     * OTConnectionFactory factory = new OTConnectionFactory();
050     *
051     * factory.setUsername("super-trooper");
052     * factory.setPassword("kick-me");
053     * factory.getHostList().add(new OTHost("feed1.opentick.com", 10015));
054     *
055     * IConnection connection = factory.connect(null);
056     *
057     * ListExchangesCommand command = new ListExchangesCommand(new IDataDelegate<OTExchange> {
058     *         public void onData(OTExchange exchange) {
059     *                 System.out.println(exchange);
060     *         }
061     * });
062     * 
063     * IRequest request = connection.prepareRequest(command);
064     * request.submit();
065     * request.waitForCompletion();
066     *
067     * connection.shutdown();
068     * connection.waitForCompletion();
069     * </pre>
070     *
071     * <h3>Avoid starting more than one simultaneous connection</h3>
072     * Note that one can establish as many connection as she wishes, by calling
073     * {@link #connect} method multiple times. However, <b>this is not 
074     * recommended</b>. The reasons are: the avoidance of synchronization issues, and
075     * conservation of resources.
076     * <p>
077     * Synchronization is a concern, because listener methods are called
078     * by a separate <i>event-dispatching</i> thread. Every connection starts its
079     * own event-displatching thread. 
080     * <p>
081     * Resources are a concern, because each connection takes a significant 
082     * amount of resources. For example, current implementation
083     * creates three threads and one client socket per connection.
084     * <p>
085     * Therefore, the recommentded practice is to limit number of connections
086     * (e.g. work with a single connection per whole application). Each connection
087     * can handle unlimited number of simultaneous requests very efficiently.
088     * Another benefit of having just a single connection per application is
089     * that one can avoid need for synchronization by assuming that all business
090     * logic is done by the event-dispatching thread. When needed, calls from
091     * outside can be translated to the event-dispatching thread by calling
092     * {@link IConnection#runInEventThread} method on the connection object.
093     * <p>
094     * {@link OTPooledConnectionFactory} class can be used to simulate "simultaneous"
095     * connections while keeping only a single actual connection to the Opentick server.
096     *
097     * <h3>No automatic reconnect after connection was lost</h3>
098     * This API does not provide the feature of automatic re-connect on connection
099     * loss. This is because the underlying protocol does not allow for reliable
100     * re-start of ongoing requests (there is no notion of request "progress", that
101     * can be used to re-submit the request). Original driver provided by
102     * OpenTick (see http://www.opentick.com) does attempt to do such a reconnect,
103     * with the risk of loosing some data, and getting duplicate historic data.
104     * <p/>
105     * We feel that since re-connect can easily lead to surprising and 
106     * often not obvious data problems, its much safer to terminate all pending
107     * requests, and let the application to decide what to do next (maybe 
108     * terminate, or reconnect).
109     * <p/>
110     * Following outlines the steps that application have to take in order
111     * to recover gracefully from lost connection connection. We assume that
112     * it was a historical data request.
113     * <ol>
114     *      <li>Remember the latest data timestamp.
115     *  <li>Discard all the data that has this timestamp (because
116     *      it may be incomplete).
117     *  <li>Re-connect to the server.
118     *  <li>Re-issue the request using the remembered timestamp as 
119     *  the starting date.
120     * </ol>
121     * Apparently, this is better be done by the application level, 
122     * not the driver.
123     */
124    public class OTConnectionFactory implements IConnectionFactory {
125            
126            private final OTConnectionSpec spec = new OTConnectionSpec();
127            
128            // default executor just creates threads as needed
129            private Executor executor = new Executor() {
130                    public void execute(Runnable runnable) {
131                            (new Thread(runnable)).start();
132                    }
133            };
134            
135            private final LoginStreamerFactory factory
136                                    = new LoginStreamerFactory(allocator);
137    
138            /**
139             * Creates new OTConnectionFactory.
140             */
141            public OTConnectionFactory() { }
142            
143            /**
144             * Creates new OTConnectionFactory and initializes 
145             * all its properties.
146             * 
147             * @param username username.
148             * @param password password.
149             * @param hostList list of hosts.
150             */
151            public OTConnectionFactory(String username, String password,
152                            Set<OTHost> hostList) {
153                    setUsername(username);
154                    setPassword(password);
155                    setHosts(hostList);
156            }
157    
158            /**
159             * Sets login name.
160             * Username must be set before calling {@link #connect}
161             *
162             * @param val login name.
163             */
164            public void setUsername(String val) {
165                    spec.setUsername(val);
166            }
167    
168            /**
169             * Gets login name.
170             *
171             * @return username login name.
172             */
173            public String getUsername() {
174                    return spec.getUsername();
175            }
176    
177            /**
178             * Sets login password.
179             * Password must be set before calling {@link #connect}
180             *
181             * @param val login password.
182             */
183            public void setPassword(String val) {
184                    spec.setPassword(val);
185            }
186    
187            /**
188             * Gets login password.
189             *
190             * @return login password.
191             */
192            public String getPassword() {
193                    return spec.getPassword();
194            }
195            
196            /**
197             * Please, use {@link #getHosts()} instead.
198             * 
199             * @return a wrapper around hosts set - for backward compatibility.
200             */
201            @Deprecated
202            public List<OTHost> getHostList() {
203                    return new List<OTHost>() {
204                            public boolean add(OTHost host) {
205                                    return getHosts().add(host);
206                            }
207    
208                            public void add(int arg0, OTHost arg1) {
209                                    throw new UnsupportedOperationException();
210                            }
211    
212                            public boolean addAll(Collection<? extends OTHost> arg0) {
213                                    return getHosts().addAll(arg0);
214                            }
215    
216                            public boolean addAll(int arg0, Collection<? extends OTHost> arg1) {
217                                    throw new UnsupportedOperationException();
218                            }
219    
220                            public void clear() {
221                                    getHosts().clear();
222                            }
223    
224                            public boolean contains(Object arg0) {
225                                    return getHosts().contains(arg0);
226                            }
227    
228                            public boolean containsAll(Collection<?> arg0) {
229                                    return getHosts().containsAll(arg0);
230                            }
231    
232                            public OTHost get(int arg0) {
233                                    throw new UnsupportedOperationException();
234                            }
235    
236                            public int indexOf(Object arg0) {
237                                    throw new UnsupportedOperationException();
238                            }
239    
240                            public boolean isEmpty() {
241                                    return getHosts().isEmpty();
242                            }
243    
244                            public Iterator<OTHost> iterator() {
245                                    return getHosts().iterator();
246                            }
247    
248                            public int lastIndexOf(Object arg0) {
249                                    throw new UnsupportedOperationException();
250                            }
251    
252                            public ListIterator<OTHost> listIterator() {
253                                    throw new UnsupportedOperationException();
254                            }
255    
256                            public ListIterator<OTHost> listIterator(int arg0) {
257                                    throw new UnsupportedOperationException();
258                            }
259    
260                            public boolean remove(Object arg0) {
261                                    return getHosts().remove(arg0);
262                            }
263    
264                            public OTHost remove(int arg0) {
265                                    throw new UnsupportedOperationException();
266                            }
267    
268                            public boolean removeAll(Collection<?> arg0) {
269                                    return getHosts().removeAll(arg0);
270                            }
271    
272                            public boolean retainAll(Collection<?> arg0) {
273                                    return getHosts().retainAll(arg0);
274                            }
275    
276                            public OTHost set(int arg0, OTHost arg1) {
277                                    throw new UnsupportedOperationException();
278                            }
279    
280                            public int size() {
281                                    return getHosts().size();
282                            }
283    
284                            public List<OTHost> subList(int arg0, int arg1) {
285                                    throw new UnsupportedOperationException();
286                            }
287    
288                            public Object[] toArray() {
289                                    return getHosts().toArray();
290                            }
291    
292                            public <T> T[] toArray(T[] arg0) {
293                                    return getHosts().toArray(arg0);
294                            }
295                            
296                    };
297            }
298            
299            /**
300             * Please use {@link #setHosts} instead.
301             * 
302             * @param val collection of addresses.
303             */
304            @Deprecated
305            public void setHostList(List<OTHost> val) {
306                    getHosts().clear();
307                    getHosts().addAll(val);
308            }
309    
310            /**
311             * Returns set of addresses.
312             * Allows client to control collection list by doing the following:
313             *
314             * <pre>
315             *    session.getHosts().add(new OTHost("feed1.opentick.com", 10015));
316             *    session.getHosts().add(new OTHost("feed2.opentick.com", 10015));
317             * </pre>
318             *
319             * Client must configure at least one server address before calling
320             * {@link #connect} method. Initially, this list is empty (but not null).
321             *
322             * @return modifiable set of server addresses.
323             */
324            public Set<OTHost> getHosts() {
325                    return spec.getHosts();
326            }
327    
328            /**
329             * Sets list of servers.
330             * Allows client to control collection of hosts by doing the following:
331             *
332             * <pre>
333             *    Set<OTHost> hosts = new HashSet<OTHost>();
334             *    hosts.add(new OTHost("feed1.opentick.com", 10015));
335             *    hosts.add(new OTHost("feed2.opentick.com", 10015));
336             *    session.setHosts(hosts);
337             * </pre>
338             *
339             * Client must configure at least one server address before calling
340             * {@link #connect} method.
341             *
342             * @param val list of server addresses.
343             */
344            public void setHosts(Set<OTHost> val) {
345                    spec.setHosts(val);
346            }
347    
348            /**
349             * Sets connection timeout (in milliseconds).
350             *
351             * Default value is 20 000, which is 20secs. This value can be
352             * useful only if there are many "bad" servers in the hosts list,
353             * so that it takes significant time to reach the one that answers
354             * the connection request. There is rarely any need to change this
355             * value.
356             *
357             * @param val connection timeout value.
358             */
359            public void setConnectTimeoutMillis(long val) {
360                    factory.setConnectTimeoutMillis(val);
361            }
362    
363            /**
364             * Gets connection timeout (in milliseconds).
365             *
366             * @return connection timeout value.
367             */
368            public long getConnectTimeoutMillis() {
369                    return factory.getConnectTimeoutMillis();
370            }
371            
372            /**
373             * Sets heartbeat interval value (in milliseconds).
374             * When connected, client periodically
375             * sends heartbeat messages to the server. If server does not receive
376             * any message from client for some time, it will close the connection.
377             *
378             * OpenTick recommends the heartbeat to be between 1sec and 10sec.
379             * Default value is 10 000, which is 10secs.
380             *
381             * @param val heartbeat interval value.
382             */
383            public void setHeartbeatIntervalMillis(long val) {
384                    spec.setHeartbeatInterval(val);
385            }
386    
387            /**
388             * Gets heartbeat interval value (in milliseconds).
389             *
390             * @return heartbeat interval value.
391             */
392            public long getHeartbeatIntervalMillis() {
393                    return spec.getHeartbeatInterval();
394            }
395    
396            private static final IBufferAllocator allocator = new ByteReverseBufferAllocator();
397    
398            /**
399             * Low-level IO object, responsible for connecting and 
400             * delivering/receiving raw frames. Normally, you will not
401             * need to change this.
402             * <p/>
403             * The default value is the "correct" real-life
404             * streamer implementation, see
405             * {@link LoginStreamerFactory#getStreamerFactory()}.
406             * <p/>
407             * You may want to change this only for the puproses of
408             * testing, debugging, or mocking.
409             * 
410             * @return streamer factory.
411             */
412            public IStreamerFactory getStreamerFactory() {
413                    return factory.getStreamerFactory();
414            }
415            
416            /**
417             * Sets streamer factory.
418             * 
419             * @param val streamer factory.
420             */
421            public void setStreamerFactory(IStreamerFactory val) {
422                    factory.setStreamerFactory(val);
423            }
424            
425            /**
426             * Executor service (factory of threads). Note that implementation must
427             * support at least three concurrent threads. For example, SingleThreadedExecutor
428             * will not work!
429             * 
430             * @return executor service.
431             */
432            public Executor getExecutor() {
433                    return executor;
434            }
435            
436            /**
437             * Sets executor service.
438             * 
439             * @param val executor service to use. Default is Executors.newCachedThreadPool().
440             */
441            public void setExecutor(Executor val) {
442                    executor = val;
443            }
444    
445            /**
446             * Starts asynchronous connection process. This method
447             * does not block, it returns {@link IConnection} object
448             * immediately.
449             * 
450             * Pass {@link OTConnectionSpec} instance as first parameter to
451             * specify connection destination, username/password, and other
452             * connection properties. This parameter is optional. If not used
453             * (meaning user passes <code>null</code>),
454             * values set on the factory instance are used instead.
455             * <p>
456             * Note that <code>username</code>, <code>password</code>, and
457             * <code>hosts</code> properties are required.  
458             *
459             * Use <code>list</code> parameter to monitor the connection progress.
460             * For more details, see {@link IConnectionStateListener}.
461             *
462             * If caller is not interested in monitoring connection progress,
463             * it can pass null as <code>list</code> paramater.
464             *
465             * @param spec connection specification. Optional.
466             * @param list connection state listener. Optional.
467             * @return connection object.
468             * @throws IllegalStateException if username/password pair is not set, 
469             *         or if hostList is empty.
470             */
471            public IConnection connect(OTConnectionSpec spec, IConnectionStateListener list) {
472    
473                    if(list == null) list = new ConnectionStateListener();
474    
475                    return new OTEngine(factory, 
476                                    spec == null ? this.spec : spec, 
477                                    allocator, executor, list);
478            }
479    
480            /**
481             * {@inheritDoc}
482             */
483            @Deprecated
484            public IConnection connect(IConnectionStateListener listener) {
485                    return connect(this.spec, listener);
486            }
487    }
488