001 /**
002 * Copyright 2008 Mike Kroutikov.
003 *
004 * This program is free software; you can redistribute it and/or modify
005 * it under the terms of the Lesser GNU General Public License as
006 * published by the Free Software Foundation; either version 3 of
007 * the License, or (at your option) any later version.
008 *
009 * This program is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012 * Lesser GNU General Public License for more details.
013 *
014 * You should have received a copy of the Lesser GNU General Public License
015 * along with this program. If not, see <http://www.gnu.org/licenses/>.
016 */
017 package org.otfeed.support.alt;
018
019 import java.util.Collections;
020 import java.util.Date;
021 import java.util.EnumSet;
022 import java.util.Iterator;
023 import java.util.Queue;
024 import java.util.Set;
025 import java.util.concurrent.ConcurrentLinkedQueue;
026 import java.util.concurrent.atomic.AtomicBoolean;
027 import java.util.concurrent.atomic.AtomicReference;
028
029 import org.otfeed.IConnection;
030 import org.otfeed.IRequest;
031 import org.otfeed.OTException;
032 import org.otfeed.command.*;
033 import org.otfeed.command.ListSymbolsCommand.MatchStyleEnum;
034 import org.otfeed.event.*;
035 import org.otfeed.protocol.DataEnum;
036 import org.otfeed.protocol.ICommand;
037 import org.otfeed.support.alt.IOutboundConnection;
038
039 public class OutboundConnectionAdapter implements IOutboundConnection {
040
041 private static final IDataReader<Object> EMPTY_DATA_READER = new IDataReader<Object>() {
042 public void cancel() {}
043
044 @SuppressWarnings("unchecked")
045 public Iterator<Object> iterator() {
046 return Collections.EMPTY_LIST.iterator();
047 }
048
049 };
050
051 private final IConnection connection;
052
053 public OutboundConnectionAdapter(IConnection c) {
054 connection = c;
055 }
056
057 private static class SimpleDataReader<T> implements IDataReader<T> {
058
059 private final Object lock = new Object();
060 private final AtomicBoolean isCompleted = new AtomicBoolean();
061 private final AtomicReference<OTError> error = new AtomicReference<OTError>();
062 private final Queue<T> queue = new ConcurrentLinkedQueue<T>();
063
064 final IDataDelegate<T> dataEndpoint = new IDataDelegate<T>() {
065 public void onData(T event) {
066 queue.offer(event);
067 synchronized(lock) {
068 lock.notify();
069 }
070 }
071 };
072
073 final ICompletionDelegate completionEndpoint = new ICompletionDelegate() {
074 public void onDataEnd(OTError e) {
075 error.set(e);
076 isCompleted.set(true);
077 synchronized(lock) {
078 lock.notifyAll();
079 }
080 }
081 };
082
083 private IRequest request;
084 void associateAndSubmitRequest(IRequest r) {
085 request = r;
086 request.submit();
087 }
088
089 public void cancel() {
090 try {
091 request.cancel();
092 } catch(Exception ex) {
093 // LOG.warn("request.cancel() throwed exception (ignoring)", ex);
094 }
095 }
096
097 private final AtomicReference<T> last = new AtomicReference<T>();
098 private final Iterator<T> iterator = new Iterator<T> () {
099 public boolean hasNext() throws OTException {
100
101 if(last.get() != null) return true;
102
103 while(true) {
104 T event;
105 if((event = queue.poll()) != null) {
106 last.set(event);
107 return true;
108 }
109
110 if(isCompleted.get()) {
111 if(error.get() != null) {
112 throw new OTException(error.get());
113 }
114 return false;
115 }
116
117 try {
118 synchronized(lock) {
119 lock.wait();
120 }
121 } catch(InterruptedException ex) {
122 throw new OTException(ex);
123 }
124 }
125 }
126
127 public T next() throws OTException {
128 if(hasNext()) {
129 return last.getAndSet(null);
130 } else {
131 return null;
132 }
133 }
134
135 public void remove() {
136 throw new UnsupportedOperationException();
137 }
138 };
139
140 public Iterator<T> iterator() {
141 return iterator;
142 }
143 }
144
145 private <T> IDataReader<T> prepareAndSubmitRequest(ICommand command, SimpleDataReader<T> reader) {
146 IRequest request = connection.prepareRequest(command);
147 reader.associateAndSubmitRequest(request);
148
149 return reader;
150 }
151
152 public IDataReader<OTDividend> requestDivident(String exchangeCode,
153 String symbolCode, Date startDate, Date endDate) {
154 SimpleDataReader<OTDividend> dataReader = new SimpleDataReader<OTDividend>();
155
156 DividendsCommand command = new DividendsCommand();
157 command.setExchangeCode(exchangeCode);
158 command.setSymbolCode(symbolCode);
159 command.setStartDate(startDate);
160 command.setEndDate(endDate);
161 command.setDataDelegate(dataReader.dataEndpoint);
162 command.setCompletionDelegate(dataReader.completionEndpoint);
163
164 return prepareAndSubmitRequest(command, dataReader);
165 }
166
167 public IDataReader<OTEquityInit> requestEquityInit(String exchangeCode,
168 String symbolCode) {
169 SimpleDataReader<OTEquityInit> dataReader = new SimpleDataReader<OTEquityInit>();
170
171 EquityInitCommand command = new EquityInitCommand();
172 command.setExchangeCode(exchangeCode);
173 command.setSymbolCode(symbolCode);
174 command.setDataDelegate(dataReader.dataEndpoint);
175 command.setCompletionDelegate(dataReader.completionEndpoint);
176
177 return prepareAndSubmitRequest(command, dataReader);
178 }
179
180 public IDataReader<Object> requestHistBook(String exchangeCode,
181 String symbolCode, Date startDate, Date endDate,
182 EnumSet<DataEnum> dataTypes) {
183
184 if(dataTypes.isEmpty()) {
185 return EMPTY_DATA_READER;
186 }
187
188 HistBooksCommand command = new HistBooksCommand();
189 command.setExchangeCode(exchangeCode);
190 command.setSymbolCode(symbolCode);
191 command.setStartDate(startDate);
192 command.setEndDate(endDate);
193
194 final SimpleDataReader<Object> dataReader = new SimpleDataReader<Object>();
195
196 for(DataEnum dataType : dataTypes) {
197 switch(dataType) {
198 case BOOK_CANCEL:
199 command.setCancelDelegate(new IDataDelegate<OTBookCancel>() {
200 public void onData(OTBookCancel event) {
201 dataReader.dataEndpoint.onData(event);
202 }
203 });
204 break;
205
206 case BOOK_CHANGE:
207 command.setChangeDelegate(new IDataDelegate<OTBookChange>() {
208 public void onData(OTBookChange event) {
209 dataReader.dataEndpoint.onData(event);
210 }
211 });
212 break;
213
214 case BOOK_DELETE:
215 command.setDeleteDelegate(new IDataDelegate<OTBookDelete>() {
216 public void onData(OTBookDelete event) {
217 dataReader.dataEndpoint.onData(event);
218 }
219 });
220 break;
221
222 case BOOK_EXECUTE:
223 command.setExecuteDelegate(new IDataDelegate<OTBookExecute>() {
224 public void onData(OTBookExecute event) {
225 dataReader.dataEndpoint.onData(event);
226 }
227 });
228 break;
229
230 case BOOK_ORDER:
231 command.setOrderDelegate(new IDataDelegate<OTBookOrder>() {
232 public void onData(OTBookOrder event) {
233 dataReader.dataEndpoint.onData(event);
234 }
235 });
236 break;
237
238 case BOOK_PRICE_LEVEL:
239 command.setPriceLevelDelegate(new IDataDelegate<OTBookPriceLevel>() {
240 public void onData(OTBookPriceLevel event) {
241 dataReader.dataEndpoint.onData(event);
242 }
243 });
244 break;
245
246 case BOOK_PURGE:
247 command.setPurgeDelegate(new IDataDelegate<OTBookPurge>() {
248 public void onData(OTBookPurge event) {
249 dataReader.dataEndpoint.onData(event);
250 }
251 });
252 break;
253
254 case BOOK_REPLACE:
255 command.setReplaceDelegate(new IDataDelegate<OTBookReplace>() {
256 public void onData(OTBookReplace event) {
257 dataReader.dataEndpoint.onData(event);
258 }
259 });
260 break;
261
262 default:
263 throw new IllegalArgumentException("Illegal type requested: " + dataType + " (only book event types allowed here");
264
265 }
266 }
267 command.setCompletionDelegate(dataReader.completionEndpoint);
268
269 return prepareAndSubmitRequest(command, dataReader);
270 }
271
272 public IDataReader<OTOHLC> requestHistData(String exchangeCode,
273 String symbolCode, Date startDate, Date endDate,
274 AggregationSpan aggregationSpan) {
275
276 SimpleDataReader<OTOHLC> dataReader = new SimpleDataReader<OTOHLC>();
277
278 HistDataCommand command = new HistDataCommand();
279 command.setExchangeCode(exchangeCode);
280 command.setSymbolCode(symbolCode);
281 command.setStartDate(startDate);
282 command.setEndDate(endDate);
283 command.setDataDelegate(dataReader.dataEndpoint);
284 command.setCompletionDelegate(dataReader.completionEndpoint);
285
286 return prepareAndSubmitRequest(command, dataReader);
287 }
288
289 public IDataReader<Object> requestHistTicks(String exchangeCode,
290 String symbolCode, Date startDate, Date endDate,
291 VolumeStyleEnum volumeStyle, EnumSet<DataEnum> dataTypes) {
292
293 if(dataTypes.isEmpty()) {
294 return EMPTY_DATA_READER;
295 }
296
297 HistTicksCommand command = new HistTicksCommand();
298 command.setExchangeCode(exchangeCode);
299 command.setSymbolCode(symbolCode);
300 command.setStartDate(startDate);
301 command.setEndDate(endDate);
302 command.setVolumeStyle(volumeStyle);
303
304 final SimpleDataReader<Object> dataReader = new SimpleDataReader<Object>();
305
306 for(DataEnum dataType : dataTypes) {
307 switch(dataType) {
308
309 case TRADE:
310 command.setTradeDelegate(new IDataDelegate<OTTrade>() {
311 public void onData(OTTrade event) {
312 dataReader.dataEndpoint.onData(event);
313 }
314 });
315 break;
316
317 case QUOTE:
318 command.setQuoteDelegate(new IDataDelegate<OTQuote>() {
319 public void onData(OTQuote event) {
320 dataReader.dataEndpoint.onData(event);
321 }
322 });
323 break;
324
325 case BBO:
326 command.setBboDelegate(new IDataDelegate<OTBBO>() {
327 public void onData(OTBBO event) {
328 dataReader.dataEndpoint.onData(event);
329 }
330 });
331 break;
332
333 case MMQUOTE:
334 command.setMmQuoteDelegate(new IDataDelegate<OTMMQuote>() {
335 public void onData(OTMMQuote event) {
336 dataReader.dataEndpoint.onData(event);
337 }
338 });
339 break;
340
341 default:
342 throw new IllegalArgumentException("Illegal type requested: " + dataType + " (only quote event types allowed here");
343
344 }
345 }
346 command.setCompletionDelegate(dataReader.completionEndpoint);
347
348 return prepareAndSubmitRequest(command, dataReader);
349 }
350
351 public IDataReader<OTExchange> requestListExchanges() {
352 SimpleDataReader<OTExchange> dataReader = new SimpleDataReader<OTExchange>();
353
354 ListExchangesCommand command = new ListExchangesCommand();
355 command.setDataDelegate(dataReader.dataEndpoint);
356 command.setCompletionDelegate(dataReader.completionEndpoint);
357
358 return prepareAndSubmitRequest(command, dataReader);
359 }
360
361 public IDataReader<OTSymbol> requestListSymbols(String exchangeCode,
362 String symbolCodePattern, Set<ListSymbolEnum> types,
363 MatchStyleEnum matchStyle) {
364 SimpleDataReader<OTSymbol> dataReader = new SimpleDataReader<OTSymbol>();
365
366 ListSymbolsCommand command = new ListSymbolsCommand();
367 command.setExchangeCode(exchangeCode);
368 command.setSymbolCodePattern(symbolCodePattern);
369 command.setTypes(types);
370 command.setMatchStyle(matchStyle);
371
372 command.setDataDelegate(dataReader.dataEndpoint);
373 command.setCompletionDelegate(dataReader.completionEndpoint);
374
375 return prepareAndSubmitRequest(command, dataReader);
376 }
377
378 public IDataReader<Object> requestOptionChainSnapshot(String exchangeCode,
379 String symbolCode, MonthAndYear expiration, PriceRange strikeRange,
380 VolumeStyleEnum volumeStyle, EnumSet<DataEnum> dataTypes) {
381
382 if(dataTypes.isEmpty()) {
383 return EMPTY_DATA_READER;
384 }
385
386 OptionChainSnapshotCommand command = new OptionChainSnapshotCommand();
387 command.setExchangeCode(exchangeCode);
388 command.setSymbolCode(symbolCode);
389 command.setExpiration(expiration);
390 command.setStrike(strikeRange);
391 command.setVolumeStyle(volumeStyle);
392
393 final SimpleDataReader<Object> dataReader = new SimpleDataReader<Object>();
394
395 for(DataEnum dataType : dataTypes) {
396 switch(dataType) {
397
398 case TRADE:
399 command.setTradeDelegate(new IDataDelegate<OTTrade>() {
400 public void onData(OTTrade event) {
401 dataReader.dataEndpoint.onData(event);
402 }
403 });
404 break;
405
406 case QUOTE:
407 command.setQuoteDelegate(new IDataDelegate<OTQuote>() {
408 public void onData(OTQuote event) {
409 dataReader.dataEndpoint.onData(event);
410 }
411 });
412 break;
413
414 case BBO:
415 command.setBboDelegate(new IDataDelegate<OTBBO>() {
416 public void onData(OTBBO event) {
417 dataReader.dataEndpoint.onData(event);
418 }
419 });
420 break;
421
422 case MMQUOTE:
423 command.setMmQuoteDelegate(new IDataDelegate<OTMMQuote>() {
424 public void onData(OTMMQuote event) {
425 dataReader.dataEndpoint.onData(event);
426 }
427 });
428 break;
429
430 default:
431 throw new IllegalArgumentException("Illegal type requested: " + dataType + " (only quote event types allowed here");
432
433 }
434 }
435 command.setCompletionDelegate(dataReader.completionEndpoint);
436
437 return prepareAndSubmitRequest(command, dataReader);
438 }
439
440 public IDataReader<OTOptionInit> requestOptionInit(String exchangeCode,
441 String symbolCode, MonthAndYear expiration, PriceRange strikeRange) {
442
443 OptionInitCommand command = new OptionInitCommand();
444 command.setExchangeCode(exchangeCode);
445 command.setSymbolCode(symbolCode);
446 command.setExpiration(expiration);
447 command.setStrike(strikeRange);
448
449 SimpleDataReader<OTOptionInit> dataReader = new SimpleDataReader<OTOptionInit>();
450 command.setDataDelegate(dataReader.dataEndpoint);
451 command.setCompletionDelegate(dataReader.completionEndpoint);
452
453 return prepareAndSubmitRequest(command, dataReader);
454 }
455
456 public IDataReader<Object> requestSnapshot(String exchangeCode,
457 String symbolCode, VolumeStyleEnum volumeStyle,
458 EnumSet<DataEnum> dataTypes) {
459
460 if(dataTypes.isEmpty()) {
461 return EMPTY_DATA_READER;
462 }
463
464 SnapshotCommand command = new SnapshotCommand();
465 command.setExchangeCode(exchangeCode);
466 command.setSymbolCode(symbolCode);
467 command.setVolumeStyle(volumeStyle);
468
469 final SimpleDataReader<Object> dataReader = new SimpleDataReader<Object>();
470
471 for(DataEnum dataType : dataTypes) {
472 switch(dataType) {
473
474 case TRADE:
475 command.setTradeDelegate(new IDataDelegate<OTTrade>() {
476 public void onData(OTTrade event) {
477 dataReader.dataEndpoint.onData(event);
478 }
479 });
480 break;
481
482 case QUOTE:
483 command.setQuoteDelegate(new IDataDelegate<OTQuote>() {
484 public void onData(OTQuote event) {
485 dataReader.dataEndpoint.onData(event);
486 }
487 });
488 break;
489
490 case BBO:
491 command.setBboDelegate(new IDataDelegate<OTBBO>() {
492 public void onData(OTBBO event) {
493 dataReader.dataEndpoint.onData(event);
494 }
495 });
496 break;
497
498 case MMQUOTE:
499 command.setMmQuoteDelegate(new IDataDelegate<OTMMQuote>() {
500 public void onData(OTMMQuote event) {
501 dataReader.dataEndpoint.onData(event);
502 }
503 });
504 break;
505
506 default:
507 throw new IllegalArgumentException("Illegal type requested: " + dataType + " (only quote event types allowed here");
508
509 }
510 }
511 command.setCompletionDelegate(dataReader.completionEndpoint);
512
513 return prepareAndSubmitRequest(command, dataReader);
514 }
515
516 public IDataReader<OTSplit> requestSplit(String exchangeCode,
517 String symbolCode, Date startDate, Date endDate) {
518
519 SplitsCommand command = new SplitsCommand();
520 command.setExchangeCode(exchangeCode);
521 command.setSymbolCode(symbolCode);
522 command.setStartDate(startDate);
523 command.setEndDate(endDate);
524
525 SimpleDataReader<OTSplit> dataReader = new SimpleDataReader<OTSplit>();
526 command.setDataDelegate(dataReader.dataEndpoint);
527 command.setCompletionDelegate(dataReader.completionEndpoint);
528
529 return prepareAndSubmitRequest(command, dataReader);
530 }
531
532 public IDataReader<OTTodaysOHL> requestTodaysOHL(String exchangeCode,
533 String symbolCode) {
534
535 TodaysOHLCommand command = new TodaysOHLCommand();
536 command.setExchangeCode(exchangeCode);
537 command.setSymbolCode(symbolCode);
538
539 SimpleDataReader<OTTodaysOHL> dataReader = new SimpleDataReader<OTTodaysOHL>();
540 command.setDataDelegate(dataReader.dataEndpoint);
541 command.setCompletionDelegate(dataReader.completionEndpoint);
542
543 return prepareAndSubmitRequest(command, dataReader);
544 }
545
546 public void close() {
547 connection.shutdown();
548 }
549 }