001    /**
002     * Copyright 2008 Mike Kroutikov.
003     *
004     * This program is free software; you can redistribute it and/or modify
005     *   it under the terms of the Lesser GNU General Public License as 
006     *   published by the Free Software Foundation; either version 3 of
007     *   the License, or (at your option) any later version.
008     *
009     *   This program is distributed in the hope that it will be useful,
010     *   but WITHOUT ANY WARRANTY; without even the implied warranty of
011     *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
012     *   Lesser GNU General Public License for more details.
013     *
014     *   You should have received a copy of the Lesser GNU General Public License
015     *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
016     */
017    package org.otfeed.support.alt;
018    
019    import java.util.Collections;
020    import java.util.Date;
021    import java.util.EnumSet;
022    import java.util.Iterator;
023    import java.util.Queue;
024    import java.util.Set;
025    import java.util.concurrent.ConcurrentLinkedQueue;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    import java.util.concurrent.atomic.AtomicReference;
028    
029    import org.otfeed.IConnection;
030    import org.otfeed.IRequest;
031    import org.otfeed.OTException;
032    import org.otfeed.command.*;
033    import org.otfeed.command.ListSymbolsCommand.MatchStyleEnum;
034    import org.otfeed.event.*;
035    import org.otfeed.protocol.DataEnum;
036    import org.otfeed.protocol.ICommand;
037    import org.otfeed.support.alt.IOutboundConnection;
038    
039    public class OutboundConnectionAdapter implements IOutboundConnection {
040    
041            private static final IDataReader<Object> EMPTY_DATA_READER = new IDataReader<Object>() {
042                    public void cancel() {}
043                    
044                    @SuppressWarnings("unchecked")
045                    public Iterator<Object> iterator() {
046                            return Collections.EMPTY_LIST.iterator();
047                    }
048                    
049            };
050    
051            private final IConnection connection;
052            
053            public OutboundConnectionAdapter(IConnection c) {
054                    connection = c;
055            }
056    
057            private static class SimpleDataReader<T> implements IDataReader<T> {
058    
059                    private final Object lock = new Object();
060                    private final AtomicBoolean isCompleted = new AtomicBoolean();
061                    private final AtomicReference<OTError> error = new AtomicReference<OTError>();
062                    private final Queue<T> queue = new ConcurrentLinkedQueue<T>();
063    
064                    final IDataDelegate<T> dataEndpoint = new IDataDelegate<T>() {
065                            public void onData(T event) {
066                                    queue.offer(event);
067                                    synchronized(lock) {
068                                            lock.notify();
069                                    }
070                            }
071                    };
072                    
073                    final ICompletionDelegate completionEndpoint = new ICompletionDelegate() {
074                            public void onDataEnd(OTError e) {
075                                    error.set(e);
076                                    isCompleted.set(true);
077                                    synchronized(lock) {
078                                            lock.notifyAll();
079                                    }
080                            }
081                    };
082                    
083                    private IRequest request;
084                    void associateAndSubmitRequest(IRequest r) {
085                            request = r;
086                            request.submit();
087                    }
088                    
089                    public void cancel() {
090                            try {
091                                    request.cancel();
092                            } catch(Exception ex) {
093                                    // LOG.warn("request.cancel() throwed exception (ignoring)", ex);
094                            }
095                    }
096    
097                    private final AtomicReference<T> last = new AtomicReference<T>();
098                    private final Iterator<T> iterator = new Iterator<T> () {
099                            public boolean hasNext() throws OTException {
100                                    
101                                    if(last.get() != null) return true;
102            
103                                    while(true) {
104                                            T event;
105                                            if((event = queue.poll()) != null) {
106                                                    last.set(event);
107                                                    return true;
108                                            }
109                                            
110                                            if(isCompleted.get()) {
111                                                    if(error.get() != null) {
112                                                            throw new OTException(error.get());
113                                                    }
114                                                    return false;
115                                            }
116                                            
117                                            try {
118                                                    synchronized(lock) {
119                                                            lock.wait();
120                                                    }
121                                            } catch(InterruptedException ex) {
122                                                    throw new OTException(ex);
123                                            }
124                                    }
125                            }
126                            
127                            public T next() throws OTException {
128                                    if(hasNext()) {
129                                            return last.getAndSet(null);
130                                    } else {
131                                            return null;
132                                    }
133                            }
134                            
135                            public void remove() {
136                                    throw new UnsupportedOperationException();
137                            }
138                    };
139                    
140                    public Iterator<T> iterator() { 
141                            return iterator;
142                    }
143            }
144            
145            private <T> IDataReader<T> prepareAndSubmitRequest(ICommand command, SimpleDataReader<T> reader) {
146                    IRequest request = connection.prepareRequest(command);
147                    reader.associateAndSubmitRequest(request);
148                                    
149                    return reader;
150            }
151    
152            public IDataReader<OTDividend> requestDivident(String exchangeCode,
153                            String symbolCode, Date startDate, Date endDate) {
154                    SimpleDataReader<OTDividend> dataReader = new SimpleDataReader<OTDividend>();
155    
156                    DividendsCommand command = new DividendsCommand();
157                    command.setExchangeCode(exchangeCode);
158                    command.setSymbolCode(symbolCode);
159                    command.setStartDate(startDate);
160                    command.setEndDate(endDate);
161                    command.setDataDelegate(dataReader.dataEndpoint);
162                    command.setCompletionDelegate(dataReader.completionEndpoint);
163    
164                    return prepareAndSubmitRequest(command, dataReader);
165            }
166                            
167            public IDataReader<OTEquityInit> requestEquityInit(String exchangeCode,
168                            String symbolCode) {
169                    SimpleDataReader<OTEquityInit> dataReader = new SimpleDataReader<OTEquityInit>();
170    
171                    EquityInitCommand command = new EquityInitCommand();
172                    command.setExchangeCode(exchangeCode);
173                    command.setSymbolCode(symbolCode);
174                    command.setDataDelegate(dataReader.dataEndpoint);
175                    command.setCompletionDelegate(dataReader.completionEndpoint);
176    
177                    return prepareAndSubmitRequest(command, dataReader);
178            }
179    
180            public IDataReader<Object> requestHistBook(String exchangeCode,
181                            String symbolCode, Date startDate, Date endDate,
182                            EnumSet<DataEnum> dataTypes) {
183    
184                    if(dataTypes.isEmpty()) {
185                            return EMPTY_DATA_READER;
186                    }
187                                    
188                    HistBooksCommand command = new HistBooksCommand();
189                    command.setExchangeCode(exchangeCode);
190                    command.setSymbolCode(symbolCode);
191                    command.setStartDate(startDate);
192                    command.setEndDate(endDate);
193                                    
194                    final SimpleDataReader<Object> dataReader = new SimpleDataReader<Object>();
195    
196                    for(DataEnum dataType : dataTypes) {
197                            switch(dataType) {
198                            case BOOK_CANCEL:
199                                    command.setCancelDelegate(new IDataDelegate<OTBookCancel>() {
200                                            public void onData(OTBookCancel event) {
201                                                    dataReader.dataEndpoint.onData(event);
202                                            }
203                                    });
204                                    break;
205                                                    
206                            case BOOK_CHANGE:
207                                    command.setChangeDelegate(new IDataDelegate<OTBookChange>() {
208                                            public void onData(OTBookChange event) {
209                                                    dataReader.dataEndpoint.onData(event);
210                                            }
211                                    });
212                                    break;
213                                                    
214                            case BOOK_DELETE:
215                                    command.setDeleteDelegate(new IDataDelegate<OTBookDelete>() {
216                                            public void onData(OTBookDelete event) {
217                                                    dataReader.dataEndpoint.onData(event);
218                                            }
219                                    });
220                                    break;
221                                            
222                            case BOOK_EXECUTE:
223                                    command.setExecuteDelegate(new IDataDelegate<OTBookExecute>() {
224                                            public void onData(OTBookExecute event) {
225                                                    dataReader.dataEndpoint.onData(event);
226                                            }
227                                    });
228                                    break;
229                                            
230                            case BOOK_ORDER:
231                                    command.setOrderDelegate(new IDataDelegate<OTBookOrder>() {
232                                            public void onData(OTBookOrder event) {
233                                                    dataReader.dataEndpoint.onData(event);
234                                            }
235                                    });
236                                    break;
237                                            
238                            case BOOK_PRICE_LEVEL:
239                                    command.setPriceLevelDelegate(new IDataDelegate<OTBookPriceLevel>() {
240                                            public void onData(OTBookPriceLevel event) {
241                                                    dataReader.dataEndpoint.onData(event);
242                                            }
243                                    });
244                                    break;
245                                            
246                            case BOOK_PURGE:
247                                    command.setPurgeDelegate(new IDataDelegate<OTBookPurge>() {
248                                            public void onData(OTBookPurge event) {
249                                                    dataReader.dataEndpoint.onData(event);
250                                            }
251                                    });
252                                    break;
253    
254                            case BOOK_REPLACE:
255                                    command.setReplaceDelegate(new IDataDelegate<OTBookReplace>() {
256                                            public void onData(OTBookReplace event) {
257                                                    dataReader.dataEndpoint.onData(event);
258                                            }
259                                    });
260                                    break;
261                                            
262                            default:
263                                    throw new IllegalArgumentException("Illegal type requested: " + dataType + " (only book event types allowed here");
264                                                            
265                            }
266                    }
267                    command.setCompletionDelegate(dataReader.completionEndpoint);
268    
269                    return prepareAndSubmitRequest(command, dataReader);
270            }
271    
272            public IDataReader<OTOHLC> requestHistData(String exchangeCode,
273                            String symbolCode, Date startDate, Date endDate,
274                            AggregationSpan aggregationSpan) {
275    
276                    SimpleDataReader<OTOHLC> dataReader = new SimpleDataReader<OTOHLC>();
277    
278                    HistDataCommand command = new HistDataCommand();
279                    command.setExchangeCode(exchangeCode);
280                    command.setSymbolCode(symbolCode);
281                    command.setStartDate(startDate);
282                    command.setEndDate(endDate);
283                    command.setDataDelegate(dataReader.dataEndpoint);
284                    command.setCompletionDelegate(dataReader.completionEndpoint);
285    
286                    return prepareAndSubmitRequest(command, dataReader);
287            }
288    
289            public IDataReader<Object> requestHistTicks(String exchangeCode,
290                            String symbolCode, Date startDate, Date endDate,
291                            VolumeStyleEnum volumeStyle, EnumSet<DataEnum> dataTypes) {
292                    
293                    if(dataTypes.isEmpty()) {
294                            return EMPTY_DATA_READER;
295                    }
296                                    
297                    HistTicksCommand command = new HistTicksCommand();
298                    command.setExchangeCode(exchangeCode);
299                    command.setSymbolCode(symbolCode);
300                    command.setStartDate(startDate);
301                    command.setEndDate(endDate);
302                    command.setVolumeStyle(volumeStyle);
303                    
304                    final SimpleDataReader<Object> dataReader = new SimpleDataReader<Object>();
305                    
306                    for(DataEnum dataType : dataTypes) {
307                            switch(dataType) {
308    
309                            case TRADE:
310                                    command.setTradeDelegate(new IDataDelegate<OTTrade>() {
311                                            public void onData(OTTrade event) {
312                                                    dataReader.dataEndpoint.onData(event);
313                                            }
314                                    });
315                                    break;
316                                    
317                            case QUOTE:
318                                    command.setQuoteDelegate(new IDataDelegate<OTQuote>() {
319                                            public void onData(OTQuote event) {
320                                                    dataReader.dataEndpoint.onData(event);
321                                            }
322                                    });
323                                    break;
324                                    
325                            case BBO:
326                                    command.setBboDelegate(new IDataDelegate<OTBBO>() {
327                                            public void onData(OTBBO event) {
328                                                    dataReader.dataEndpoint.onData(event);
329                                            }
330                                    });
331                                    break;
332                                    
333                            case MMQUOTE:
334                                    command.setMmQuoteDelegate(new IDataDelegate<OTMMQuote>() {
335                                            public void onData(OTMMQuote event) {
336                                                    dataReader.dataEndpoint.onData(event);
337                                            }
338                                    });
339                                    break;
340                                    
341                            default:
342                                    throw new IllegalArgumentException("Illegal type requested: " + dataType + " (only quote event types allowed here");
343                                                            
344                            }
345                    }
346                    command.setCompletionDelegate(dataReader.completionEndpoint);
347    
348                    return prepareAndSubmitRequest(command, dataReader);
349            }
350    
351            public IDataReader<OTExchange> requestListExchanges() {
352                    SimpleDataReader<OTExchange> dataReader = new SimpleDataReader<OTExchange>();
353    
354                    ListExchangesCommand command = new ListExchangesCommand();
355                    command.setDataDelegate(dataReader.dataEndpoint);
356                    command.setCompletionDelegate(dataReader.completionEndpoint);
357                    
358                    return prepareAndSubmitRequest(command, dataReader);
359            }
360    
361            public IDataReader<OTSymbol> requestListSymbols(String exchangeCode,
362                            String symbolCodePattern, Set<ListSymbolEnum> types,
363                            MatchStyleEnum matchStyle) {
364                    SimpleDataReader<OTSymbol> dataReader = new SimpleDataReader<OTSymbol>();
365    
366                    ListSymbolsCommand command = new ListSymbolsCommand();
367                    command.setExchangeCode(exchangeCode);
368                    command.setSymbolCodePattern(symbolCodePattern);
369                    command.setTypes(types);
370                    command.setMatchStyle(matchStyle);
371                    
372                    command.setDataDelegate(dataReader.dataEndpoint);
373                    command.setCompletionDelegate(dataReader.completionEndpoint);
374                    
375                    return prepareAndSubmitRequest(command, dataReader);
376            }
377    
378            public IDataReader<Object> requestOptionChainSnapshot(String exchangeCode,
379                            String symbolCode, MonthAndYear expiration, PriceRange strikeRange,
380                            VolumeStyleEnum volumeStyle, EnumSet<DataEnum> dataTypes) {
381                    
382                    if(dataTypes.isEmpty()) {
383                            return EMPTY_DATA_READER;
384                    }
385                    
386                    OptionChainSnapshotCommand command = new OptionChainSnapshotCommand();
387                    command.setExchangeCode(exchangeCode);
388                    command.setSymbolCode(symbolCode);
389                    command.setExpiration(expiration);
390                    command.setStrike(strikeRange);
391                    command.setVolumeStyle(volumeStyle);
392                    
393                    final SimpleDataReader<Object> dataReader = new SimpleDataReader<Object>();
394                    
395                    for(DataEnum dataType : dataTypes) {
396                            switch(dataType) {
397    
398                            case TRADE:
399                                    command.setTradeDelegate(new IDataDelegate<OTTrade>() {
400                                            public void onData(OTTrade event) {
401                                                    dataReader.dataEndpoint.onData(event);
402                                            }
403                                    });
404                                    break;
405                                    
406                            case QUOTE:
407                                    command.setQuoteDelegate(new IDataDelegate<OTQuote>() {
408                                            public void onData(OTQuote event) {
409                                                    dataReader.dataEndpoint.onData(event);
410                                            }
411                                    });
412                                    break;
413                                    
414                            case BBO:
415                                    command.setBboDelegate(new IDataDelegate<OTBBO>() {
416                                            public void onData(OTBBO event) {
417                                                    dataReader.dataEndpoint.onData(event);
418                                            }
419                                    });
420                                    break;
421                                    
422                            case MMQUOTE:
423                                    command.setMmQuoteDelegate(new IDataDelegate<OTMMQuote>() {
424                                            public void onData(OTMMQuote event) {
425                                                    dataReader.dataEndpoint.onData(event);
426                                            }
427                                    });
428                                    break;
429                                    
430                            default:
431                                    throw new IllegalArgumentException("Illegal type requested: " + dataType + " (only quote event types allowed here");
432                                                            
433                            }
434                    }
435                    command.setCompletionDelegate(dataReader.completionEndpoint);
436    
437                    return prepareAndSubmitRequest(command, dataReader);
438            }
439    
440            public IDataReader<OTOptionInit> requestOptionInit(String exchangeCode,
441                            String symbolCode, MonthAndYear expiration, PriceRange strikeRange) {
442    
443                    OptionInitCommand command = new OptionInitCommand();
444                    command.setExchangeCode(exchangeCode);
445                    command.setSymbolCode(symbolCode);
446                    command.setExpiration(expiration);
447                    command.setStrike(strikeRange);
448                    
449                    SimpleDataReader<OTOptionInit> dataReader = new SimpleDataReader<OTOptionInit>();
450                    command.setDataDelegate(dataReader.dataEndpoint);
451                    command.setCompletionDelegate(dataReader.completionEndpoint);
452    
453                    return prepareAndSubmitRequest(command, dataReader);
454            }
455    
456            public IDataReader<Object> requestSnapshot(String exchangeCode,
457                            String symbolCode, VolumeStyleEnum volumeStyle,
458                            EnumSet<DataEnum> dataTypes) {
459    
460                    if(dataTypes.isEmpty()) {
461                            return EMPTY_DATA_READER;
462                    }
463                    
464                    SnapshotCommand command = new SnapshotCommand();
465                    command.setExchangeCode(exchangeCode);
466                    command.setSymbolCode(symbolCode);
467                    command.setVolumeStyle(volumeStyle);
468                    
469                    final SimpleDataReader<Object> dataReader = new SimpleDataReader<Object>();
470                    
471                    for(DataEnum dataType : dataTypes) {
472                            switch(dataType) {
473    
474                            case TRADE:
475                                    command.setTradeDelegate(new IDataDelegate<OTTrade>() {
476                                            public void onData(OTTrade event) {
477                                                    dataReader.dataEndpoint.onData(event);
478                                            }
479                                    });
480                                    break;
481                                    
482                            case QUOTE:
483                                    command.setQuoteDelegate(new IDataDelegate<OTQuote>() {
484                                            public void onData(OTQuote event) {
485                                                    dataReader.dataEndpoint.onData(event);
486                                            }
487                                    });
488                                    break;
489                                    
490                            case BBO:
491                                    command.setBboDelegate(new IDataDelegate<OTBBO>() {
492                                            public void onData(OTBBO event) {
493                                                    dataReader.dataEndpoint.onData(event);
494                                            }
495                                    });
496                                    break;
497                                    
498                            case MMQUOTE:
499                                    command.setMmQuoteDelegate(new IDataDelegate<OTMMQuote>() {
500                                            public void onData(OTMMQuote event) {
501                                                    dataReader.dataEndpoint.onData(event);
502                                            }
503                                    });
504                                    break;
505                                    
506                            default:
507                                    throw new IllegalArgumentException("Illegal type requested: " + dataType + " (only quote event types allowed here");
508                                                            
509                            }
510                    }
511                    command.setCompletionDelegate(dataReader.completionEndpoint);
512    
513                    return prepareAndSubmitRequest(command, dataReader);
514            }
515    
516            public IDataReader<OTSplit> requestSplit(String exchangeCode,
517                            String symbolCode, Date startDate, Date endDate) {
518    
519                    SplitsCommand command = new SplitsCommand();
520                    command.setExchangeCode(exchangeCode);
521                    command.setSymbolCode(symbolCode);
522                    command.setStartDate(startDate);
523                    command.setEndDate(endDate);
524                    
525                    SimpleDataReader<OTSplit> dataReader = new SimpleDataReader<OTSplit>();
526                    command.setDataDelegate(dataReader.dataEndpoint);
527                    command.setCompletionDelegate(dataReader.completionEndpoint);
528    
529                    return prepareAndSubmitRequest(command, dataReader);
530            }
531    
532            public IDataReader<OTTodaysOHL> requestTodaysOHL(String exchangeCode,
533                            String symbolCode) {
534    
535                    TodaysOHLCommand command = new TodaysOHLCommand();
536                    command.setExchangeCode(exchangeCode);
537                    command.setSymbolCode(symbolCode);
538                    
539                    SimpleDataReader<OTTodaysOHL> dataReader = new SimpleDataReader<OTTodaysOHL>();
540                    command.setDataDelegate(dataReader.dataEndpoint);
541                    command.setCompletionDelegate(dataReader.completionEndpoint);
542    
543                    return prepareAndSubmitRequest(command, dataReader);
544            }
545    
546            public void close() {
547                    connection.shutdown();
548            }
549    }