001 /**
002 * Copyright 2007 Mike Kroutikov.
003 *
004 * This program is free software; you can redistribute it and/or modify
005 * it under the terms of the Lesser GNU General Public License as
006 * published by the Free Software Foundation; either version 3 of
007 * the License, or (at your option) any later version.
008 *
009 * This program is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012 * Lesser GNU General Public License for more details.
013 *
014 * You should have received a copy of the Lesser GNU General Public License
015 * along with this program. If not, see <http://www.gnu.org/licenses/>.
016 */
017
018 package org.otfeed;
019
020 import org.otfeed.event.IConnectionStateListener;
021 import org.otfeed.event.OTHost;
022 import org.otfeed.protocol.connector.IStreamerFactory;
023 import org.otfeed.protocol.connector.LoginStreamerFactory;
024 import org.otfeed.protocol.connector.OTEngine;
025 import org.otfeed.support.ByteReverseBufferAllocator;
026 import org.otfeed.support.ConnectionStateListener;
027 import org.otfeed.support.IBufferAllocator;
028
029 import java.util.Collection;
030 import java.util.Iterator;
031 import java.util.List;
032 import java.util.ListIterator;
033 import java.util.Set;
034 import java.util.concurrent.Executor;
035
036 /**
037 * Factory of connections to the OpenTick service.
038 * This is the root object of the OpenTick client API. It is used to
039 * establish a connection to the OpenTick service.
040 *
041 * <p>
042 * Before calling the {@link #connect} method, one have to set
043 * {@link #setUsername username}/{@link #setPassword password}
044 * and at least one host in the {@link #setHosts(Set) hosts}.
045 *
046 * <h3>Sample usage</h3>
047 * Sample usage can be like this:
048 * <pre>
049 * OTConnectionFactory factory = new OTConnectionFactory();
050 *
051 * factory.setUsername("super-trooper");
052 * factory.setPassword("kick-me");
053 * factory.getHostList().add(new OTHost("feed1.opentick.com", 10015));
054 *
055 * IConnection connection = factory.connect(null);
056 *
057 * ListExchangesCommand command = new ListExchangesCommand(new IDataDelegate<OTExchange> {
058 * public void onData(OTExchange exchange) {
059 * System.out.println(exchange);
060 * }
061 * });
062 *
063 * IRequest request = connection.prepareRequest(command);
064 * request.submit();
065 * request.waitForCompletion();
066 *
067 * connection.shutdown();
068 * connection.waitForCompletion();
069 * </pre>
070 *
071 * <h3>Avoid starting more than one simultaneous connection</h3>
072 * Note that one can establish as many connection as she wishes, by calling
073 * {@link #connect} method multiple times. However, <b>this is not
074 * recommended</b>. The reasons are: the avoidance of synchronization issues, and
075 * conservation of resources.
076 * <p>
077 * Synchronization is a concern, because listener methods are called
078 * by a separate <i>event-dispatching</i> thread. Every connection starts its
079 * own event-displatching thread.
080 * <p>
081 * Resources are a concern, because each connection takes a significant
082 * amount of resources. For example, current implementation
083 * creates three threads and one client socket per connection.
084 * <p>
085 * Therefore, the recommentded practice is to limit number of connections
086 * (e.g. work with a single connection per whole application). Each connection
087 * can handle unlimited number of simultaneous requests very efficiently.
088 * Another benefit of having just a single connection per application is
089 * that one can avoid need for synchronization by assuming that all business
090 * logic is done by the event-dispatching thread. When needed, calls from
091 * outside can be translated to the event-dispatching thread by calling
092 * {@link IConnection#runInEventThread} method on the connection object.
093 * <p>
094 * {@link OTPooledConnectionFactory} class can be used to simulate "simultaneous"
095 * connections while keeping only a single actual connection to the Opentick server.
096 *
097 * <h3>No automatic reconnect after connection was lost</h3>
098 * This API does not provide the feature of automatic re-connect on connection
099 * loss. This is because the underlying protocol does not allow for reliable
100 * re-start of ongoing requests (there is no notion of request "progress", that
101 * can be used to re-submit the request). Original driver provided by
102 * OpenTick (see http://www.opentick.com) does attempt to do such a reconnect,
103 * with the risk of loosing some data, and getting duplicate historic data.
104 * <p/>
105 * We feel that since re-connect can easily lead to surprising and
106 * often not obvious data problems, its much safer to terminate all pending
107 * requests, and let the application to decide what to do next (maybe
108 * terminate, or reconnect).
109 * <p/>
110 * Following outlines the steps that application have to take in order
111 * to recover gracefully from lost connection connection. We assume that
112 * it was a historical data request.
113 * <ol>
114 * <li>Remember the latest data timestamp.
115 * <li>Discard all the data that has this timestamp (because
116 * it may be incomplete).
117 * <li>Re-connect to the server.
118 * <li>Re-issue the request using the remembered timestamp as
119 * the starting date.
120 * </ol>
121 * Apparently, this is better be done by the application level,
122 * not the driver.
123 */
124 public class OTConnectionFactory implements IConnectionFactory {
125
126 private final OTConnectionSpec spec = new OTConnectionSpec();
127
128 // default executor just creates threads as needed
129 private Executor executor = new Executor() {
130 public void execute(Runnable runnable) {
131 (new Thread(runnable)).start();
132 }
133 };
134
135 private final LoginStreamerFactory factory
136 = new LoginStreamerFactory(allocator);
137
138 /**
139 * Creates new OTConnectionFactory.
140 */
141 public OTConnectionFactory() { }
142
143 /**
144 * Creates new OTConnectionFactory and initializes
145 * all its properties.
146 *
147 * @param username username.
148 * @param password password.
149 * @param hostList list of hosts.
150 */
151 public OTConnectionFactory(String username, String password,
152 Set<OTHost> hostList) {
153 setUsername(username);
154 setPassword(password);
155 setHosts(hostList);
156 }
157
158 /**
159 * Sets login name.
160 * Username must be set before calling {@link #connect}
161 *
162 * @param val login name.
163 */
164 public void setUsername(String val) {
165 spec.setUsername(val);
166 }
167
168 /**
169 * Gets login name.
170 *
171 * @return username login name.
172 */
173 public String getUsername() {
174 return spec.getUsername();
175 }
176
177 /**
178 * Sets login password.
179 * Password must be set before calling {@link #connect}
180 *
181 * @param val login password.
182 */
183 public void setPassword(String val) {
184 spec.setPassword(val);
185 }
186
187 /**
188 * Gets login password.
189 *
190 * @return login password.
191 */
192 public String getPassword() {
193 return spec.getPassword();
194 }
195
196 /**
197 * Please, use {@link #getHosts()} instead.
198 *
199 * @return a wrapper around hosts set - for backward compatibility.
200 */
201 @Deprecated
202 public List<OTHost> getHostList() {
203 return new List<OTHost>() {
204 public boolean add(OTHost host) {
205 return getHosts().add(host);
206 }
207
208 public void add(int arg0, OTHost arg1) {
209 throw new UnsupportedOperationException();
210 }
211
212 public boolean addAll(Collection<? extends OTHost> arg0) {
213 return getHosts().addAll(arg0);
214 }
215
216 public boolean addAll(int arg0, Collection<? extends OTHost> arg1) {
217 throw new UnsupportedOperationException();
218 }
219
220 public void clear() {
221 getHosts().clear();
222 }
223
224 public boolean contains(Object arg0) {
225 return getHosts().contains(arg0);
226 }
227
228 public boolean containsAll(Collection<?> arg0) {
229 return getHosts().containsAll(arg0);
230 }
231
232 public OTHost get(int arg0) {
233 throw new UnsupportedOperationException();
234 }
235
236 public int indexOf(Object arg0) {
237 throw new UnsupportedOperationException();
238 }
239
240 public boolean isEmpty() {
241 return getHosts().isEmpty();
242 }
243
244 public Iterator<OTHost> iterator() {
245 return getHosts().iterator();
246 }
247
248 public int lastIndexOf(Object arg0) {
249 throw new UnsupportedOperationException();
250 }
251
252 public ListIterator<OTHost> listIterator() {
253 throw new UnsupportedOperationException();
254 }
255
256 public ListIterator<OTHost> listIterator(int arg0) {
257 throw new UnsupportedOperationException();
258 }
259
260 public boolean remove(Object arg0) {
261 return getHosts().remove(arg0);
262 }
263
264 public OTHost remove(int arg0) {
265 throw new UnsupportedOperationException();
266 }
267
268 public boolean removeAll(Collection<?> arg0) {
269 return getHosts().removeAll(arg0);
270 }
271
272 public boolean retainAll(Collection<?> arg0) {
273 return getHosts().retainAll(arg0);
274 }
275
276 public OTHost set(int arg0, OTHost arg1) {
277 throw new UnsupportedOperationException();
278 }
279
280 public int size() {
281 return getHosts().size();
282 }
283
284 public List<OTHost> subList(int arg0, int arg1) {
285 throw new UnsupportedOperationException();
286 }
287
288 public Object[] toArray() {
289 return getHosts().toArray();
290 }
291
292 public <T> T[] toArray(T[] arg0) {
293 return getHosts().toArray(arg0);
294 }
295
296 };
297 }
298
299 /**
300 * Please use {@link #setHosts} instead.
301 *
302 * @param val collection of addresses.
303 */
304 @Deprecated
305 public void setHostList(List<OTHost> val) {
306 getHosts().clear();
307 getHosts().addAll(val);
308 }
309
310 /**
311 * Returns set of addresses.
312 * Allows client to control collection list by doing the following:
313 *
314 * <pre>
315 * session.getHosts().add(new OTHost("feed1.opentick.com", 10015));
316 * session.getHosts().add(new OTHost("feed2.opentick.com", 10015));
317 * </pre>
318 *
319 * Client must configure at least one server address before calling
320 * {@link #connect} method. Initially, this list is empty (but not null).
321 *
322 * @return modifiable set of server addresses.
323 */
324 public Set<OTHost> getHosts() {
325 return spec.getHosts();
326 }
327
328 /**
329 * Sets list of servers.
330 * Allows client to control collection of hosts by doing the following:
331 *
332 * <pre>
333 * Set<OTHost> hosts = new HashSet<OTHost>();
334 * hosts.add(new OTHost("feed1.opentick.com", 10015));
335 * hosts.add(new OTHost("feed2.opentick.com", 10015));
336 * session.setHosts(hosts);
337 * </pre>
338 *
339 * Client must configure at least one server address before calling
340 * {@link #connect} method.
341 *
342 * @param val list of server addresses.
343 */
344 public void setHosts(Set<OTHost> val) {
345 spec.setHosts(val);
346 }
347
348 /**
349 * Sets connection timeout (in milliseconds).
350 *
351 * Default value is 20 000, which is 20secs. This value can be
352 * useful only if there are many "bad" servers in the hosts list,
353 * so that it takes significant time to reach the one that answers
354 * the connection request. There is rarely any need to change this
355 * value.
356 *
357 * @param val connection timeout value.
358 */
359 public void setConnectTimeoutMillis(long val) {
360 factory.setConnectTimeoutMillis(val);
361 }
362
363 /**
364 * Gets connection timeout (in milliseconds).
365 *
366 * @return connection timeout value.
367 */
368 public long getConnectTimeoutMillis() {
369 return factory.getConnectTimeoutMillis();
370 }
371
372 /**
373 * Sets heartbeat interval value (in milliseconds).
374 * When connected, client periodically
375 * sends heartbeat messages to the server. If server does not receive
376 * any message from client for some time, it will close the connection.
377 *
378 * OpenTick recommends the heartbeat to be between 1sec and 10sec.
379 * Default value is 10 000, which is 10secs.
380 *
381 * @param val heartbeat interval value.
382 */
383 public void setHeartbeatIntervalMillis(long val) {
384 spec.setHeartbeatInterval(val);
385 }
386
387 /**
388 * Gets heartbeat interval value (in milliseconds).
389 *
390 * @return heartbeat interval value.
391 */
392 public long getHeartbeatIntervalMillis() {
393 return spec.getHeartbeatInterval();
394 }
395
396 private static final IBufferAllocator allocator = new ByteReverseBufferAllocator();
397
398 /**
399 * Low-level IO object, responsible for connecting and
400 * delivering/receiving raw frames. Normally, you will not
401 * need to change this.
402 * <p/>
403 * The default value is the "correct" real-life
404 * streamer implementation, see
405 * {@link LoginStreamerFactory#getStreamerFactory()}.
406 * <p/>
407 * You may want to change this only for the puproses of
408 * testing, debugging, or mocking.
409 *
410 * @return streamer factory.
411 */
412 public IStreamerFactory getStreamerFactory() {
413 return factory.getStreamerFactory();
414 }
415
416 /**
417 * Sets streamer factory.
418 *
419 * @param val streamer factory.
420 */
421 public void setStreamerFactory(IStreamerFactory val) {
422 factory.setStreamerFactory(val);
423 }
424
425 /**
426 * Executor service (factory of threads). Note that implementation must
427 * support at least three concurrent threads. For example, SingleThreadedExecutor
428 * will not work!
429 *
430 * @return executor service.
431 */
432 public Executor getExecutor() {
433 return executor;
434 }
435
436 /**
437 * Sets executor service.
438 *
439 * @param val executor service to use. Default is Executors.newCachedThreadPool().
440 */
441 public void setExecutor(Executor val) {
442 executor = val;
443 }
444
445 /**
446 * Starts asynchronous connection process. This method
447 * does not block, it returns {@link IConnection} object
448 * immediately.
449 *
450 * Pass {@link OTConnectionSpec} instance as first parameter to
451 * specify connection destination, username/password, and other
452 * connection properties. This parameter is optional. If not used
453 * (meaning user passes <code>null</code>),
454 * values set on the factory instance are used instead.
455 * <p>
456 * Note that <code>username</code>, <code>password</code>, and
457 * <code>hosts</code> properties are required.
458 *
459 * Use <code>list</code> parameter to monitor the connection progress.
460 * For more details, see {@link IConnectionStateListener}.
461 *
462 * If caller is not interested in monitoring connection progress,
463 * it can pass null as <code>list</code> paramater.
464 *
465 * @param spec connection specification. Optional.
466 * @param list connection state listener. Optional.
467 * @return connection object.
468 * @throws IllegalStateException if username/password pair is not set,
469 * or if hostList is empty.
470 */
471 public IConnection connect(OTConnectionSpec spec, IConnectionStateListener list) {
472
473 if(list == null) list = new ConnectionStateListener();
474
475 return new OTEngine(factory,
476 spec == null ? this.spec : spec,
477 allocator, executor, list);
478 }
479
480 /**
481 * {@inheritDoc}
482 */
483 @Deprecated
484 public IConnection connect(IConnectionStateListener listener) {
485 return connect(this.spec, listener);
486 }
487 }
488