001    /**
002     * Copyright 2007 Mike Kroutikov.
003     *
004     * This program is free software; you can redistribute it and/or modify
005     *   it under the terms of the Lesser GNU General Public License as 
006     *   published by the Free Software Foundation; either version 3 of
007     *   the License, or (at your option) any later version.
008     *
009     *   This program is distributed in the hope that it will be useful,
010     *   but WITHOUT ANY WARRANTY; without even the implied warranty of
011     *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
012     *   Lesser GNU General Public License for more details.
013     *
014     *   You should have received a copy of the Lesser GNU General Public License
015     *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
016     */
017    
018    package org.otfeed.protocol.connector;
019    
020    import java.util.HashMap;
021    import java.util.Map;
022    import java.util.concurrent.Executor;
023    import java.util.concurrent.atomic.AtomicBoolean;
024    import java.util.concurrent.atomic.AtomicInteger;
025    
026    import org.otfeed.IConnection;
027    import org.otfeed.IRequest;
028    import org.otfeed.OTConnectionSpec;
029    import org.otfeed.command.*;
030    import org.otfeed.event.IConnectionStateListener;
031    import org.otfeed.event.OTError;
032    import org.otfeed.protocol.ICommand;
033    import org.otfeed.protocol.request.AbstractSessionRequest;
034    import org.otfeed.protocol.request.BookStreamExRequest;
035    import org.otfeed.protocol.request.DividendRequest;
036    import org.otfeed.protocol.request.EquityInitRequest;
037    import org.otfeed.protocol.request.HistBookRequest;
038    import org.otfeed.protocol.request.HistDataRequest;
039    import org.otfeed.protocol.request.HistTicksRequest;
040    import org.otfeed.protocol.request.ListExchangesRequest;
041    import org.otfeed.protocol.request.ListSymbolsExRequest;
042    import org.otfeed.protocol.request.OptionChainRequest;
043    import org.otfeed.protocol.request.OptionChainSnapshotRequest;
044    import org.otfeed.protocol.request.OptionChainWithSnapshotRequest;
045    import org.otfeed.protocol.request.OptionInitRequest;
046    import org.otfeed.protocol.request.SnapshotRequest;
047    import org.otfeed.protocol.request.SplitRequest;
048    import org.otfeed.protocol.request.TickStreamExRequest;
049    import org.otfeed.protocol.request.TickStreamWithSnapshotRequest;
050    import org.otfeed.protocol.request.TodaysOHLRequest;
051    import org.otfeed.support.IBufferAllocator;
052    
053    public class OTEngine extends OTThreadingEngine implements IConnection {
054    
055            private interface IRequestPreparer {
056                    IRequest prepareRequest(ICommand r);
057            }
058            
059            private final Map<Class<?>,IRequestPreparer> commandMap
060                    = new HashMap<Class<?>,IRequestPreparer>();
061    
062            private class RequestHandler implements IRequest {
063    
064                    private final AbstractSessionRequest request;
065    
066                    private final AtomicBoolean isActiveFlag
067                                             = new AtomicBoolean(false);
068    
069                    private RequestHandler(AbstractSessionRequest r) { 
070                            request = r;
071                    }
072    
073                    public boolean isCompleted() { 
074                            return request.isCompleted();
075                    }
076                    
077                    public OTError getError() {
078                            return request.getError();
079                    }
080    
081                    public void submit() {
082                            if(isActiveFlag.getAndSet(true)) {
083                                    return; // already started
084                            }
085    
086                            OTEngine.this.submit(request);
087                    }
088    
089                    public void cancel() {
090    
091                            if(!isActiveFlag.getAndSet(false)) {
092                                    return; // not running
093                            }
094    
095                            OTEngine.this.cancel(
096                                    generateRequestId(),
097                                    request);
098                    }
099    
100                    public void waitForCompletion() {
101                            request.waitForCompletion();
102                    }
103    
104                    public boolean waitForCompletion(long millis) {
105                            return request.waitForCompletion(millis);
106                    }
107            }
108    
109            private final AtomicInteger requestCounter = new AtomicInteger(0);
110    
111            private int generateRequestId() {
112                    return requestCounter.incrementAndGet();
113            }
114    
115            private IRequest newRequest(AbstractSessionRequest request) {
116    
117                    return new RequestHandler(request);
118            }
119    
120            public OTEngine(ISessionStreamerFactory factory, 
121                            OTConnectionSpec spec,
122                            IBufferAllocator allocator, 
123                            Executor executor,
124                            IConnectionStateListener listener) {
125                    super(factory, spec, allocator, executor, listener);
126    
127                    // configure request maps
128                    
129                    commandMap.put(BookStreamCommand.class, new IRequestPreparer() {
130    
131                            public IRequest prepareRequest(ICommand command) {
132                                    BookStreamCommand c = (BookStreamCommand) command;
133                                    int reqId = generateRequestId();
134    
135                                    AbstractSessionRequest request = new BookStreamExRequest(reqId, 
136                                            c.getExchangeCode(),
137                                            c.getSymbolCode(),
138                                            c.getOrderDelegate(), 
139                                            c.getChangeDelegate(), 
140                                            c.getReplaceDelegate(),
141                                            c.getCancelDelegate(),
142                                            c.getPurgeDelegate(),
143                                            c.getExecuteDelegate(),
144                                            c.getDeleteDelegate(),
145                                            c.getPriceLevelDelegate(),
146                                            c.getCompletionDelegate());
147    
148                                    return newRequest(request);
149                            }
150                    });
151                    
152                    commandMap.put(DividendsCommand.class, new IRequestPreparer() {
153    
154                            public IRequest prepareRequest(ICommand command) {
155                                    DividendsCommand c = (DividendsCommand) command;
156                                    int reqId = generateRequestId();
157    
158                                    AbstractSessionRequest request = new DividendRequest(reqId, 
159                                            c.getExchangeCode(),
160                                            c.getSymbolCode(),
161                                            c.getStartDate(),
162                                            c.getEndDate(),
163                                            c.getDataDelegate(),
164                                            c.getCompletionDelegate());
165                                    return newRequest(request);
166                            }
167                    });
168    
169                    commandMap.put(EquityInitCommand.class, new IRequestPreparer() {
170    
171                            public IRequest prepareRequest(ICommand command) {
172                                    EquityInitCommand c = (EquityInitCommand) command;
173                                    int reqId = generateRequestId();
174    
175                                    AbstractSessionRequest request = new EquityInitRequest(reqId, 
176                                            c.getExchangeCode(),
177                                            c.getSymbolCode(),
178                                            c.getDataDelegate(),
179                                            c.getCompletionDelegate());
180    
181                                    return newRequest(request);
182                            }
183                    });
184                    
185                    commandMap.put(HistBooksCommand.class, new IRequestPreparer() {
186    
187                            public IRequest prepareRequest(ICommand command) {
188                                    HistBooksCommand c = (HistBooksCommand) command;
189                                    int reqId = generateRequestId();
190    
191                                    AbstractSessionRequest request = new HistBookRequest(reqId, 
192                                            c.getExchangeCode(),
193                                            c.getSymbolCode(),
194                                            c.getStartDate(),
195                                            c.getEndDate(),
196                                            c.getOrderDelegate(), 
197                                            c.getChangeDelegate(), 
198                                            c.getReplaceDelegate(),
199                                            c.getCancelDelegate(),
200                                            c.getPurgeDelegate(),
201                                            c.getExecuteDelegate(),
202                                            c.getDeleteDelegate(),
203                                            c.getPriceLevelDelegate(),
204                                            c.getCompletionDelegate());
205    
206                                    return newRequest(request);
207                            }
208                    });
209    
210                    commandMap.put(HistDataCommand.class, new IRequestPreparer() {
211    
212                            public IRequest prepareRequest(ICommand command) {
213                                    HistDataCommand c = (HistDataCommand) command;
214                                    int reqId = generateRequestId();
215    
216                                    AbstractSessionRequest request = new HistDataRequest(reqId, 
217                                            c.getExchangeCode(),
218                                            c.getSymbolCode(),
219                                            c.getStartDate(),
220                                            c.getEndDate(),
221                                            c.getAggregationSpan(),
222                                            c.getDataDelegate(),
223                                            c.getCompletionDelegate());
224    
225                                    return newRequest(request);
226                            }
227                    });
228    
229                    commandMap.put(HistTicksCommand.class, new IRequestPreparer() {
230    
231                            public IRequest prepareRequest(ICommand command) {
232                                    HistTicksCommand c = (HistTicksCommand) command;
233                                    int reqId = generateRequestId();
234    
235                                    AbstractSessionRequest request = new HistTicksRequest(reqId, 
236                                            c.getExchangeCode(),
237                                            c.getSymbolCode(),
238                                            c.getStartDate(),
239                                            c.getEndDate(),
240                                            c.getVolumeStyle(),
241                                            c.getQuoteDelegate(),
242                                            c.getTradeDelegate(),
243                                            c.getMmQuoteDelegate(),
244                                            c.getBboDelegate(),
245                                            c.getCompletionDelegate());
246    
247                                    return newRequest(request);
248                            }
249                    });
250    
251                    commandMap.put(ListExchangesCommand.class, new IRequestPreparer() {
252    
253                            public IRequest prepareRequest(ICommand command) {
254                                    ListExchangesCommand c = (ListExchangesCommand) command;
255                                    int reqId = generateRequestId();
256    
257                                    AbstractSessionRequest request = new ListExchangesRequest(reqId, 
258                                                    c.getDataDelegate(), 
259                                                    c.getCompletionDelegate());
260                                    return newRequest(request);
261                            }
262                    });
263                    
264                    commandMap.put(ListSymbolsCommand.class, new IRequestPreparer() {
265    
266                            public IRequest prepareRequest(ICommand command) {
267                                    ListSymbolsCommand c = (ListSymbolsCommand) command;
268                                    int reqId = generateRequestId();
269    
270                                    AbstractSessionRequest request = new ListSymbolsExRequest(reqId, 
271                                            c.getExchangeCode(),
272                                            c.getSymbolCodePattern(),
273                                            c.getTypes(),
274                                            c.getMatchStyle(),
275                                            c.getDataDelegate(),
276                                            c.getCompletionDelegate());
277    
278                                    return newRequest(request);
279                            }
280                    });
281                    
282                    commandMap.put(OptionChainCommand.class, new IRequestPreparer() {
283    
284                            public IRequest prepareRequest(ICommand command) {
285                                    OptionChainCommand c = (OptionChainCommand) command;
286                                    int reqId = generateRequestId();
287    
288                                    AbstractSessionRequest request = new OptionChainRequest(reqId, 
289                                            c.getExchangeCode(),
290                                            c.getSymbolCode(),
291                                            c.getExpiration(),
292                                            c.getStrike(),
293                                            c.getVolumeStyle(),
294                                            c.getQuoteDelegate(),
295                                            c.getTradeDelegate(),
296                                            c.getMmQuoteDelegate(),
297                                            c.getBboDelegate(),
298                                            c.getCompletionDelegate());
299    
300                                    return newRequest(request);
301                            }
302                    });
303    
304                    commandMap.put(OptionChainSnapshotCommand.class, new IRequestPreparer() {
305    
306                            public IRequest prepareRequest(ICommand command) {
307                                    OptionChainSnapshotCommand c = (OptionChainSnapshotCommand) command;
308                                    int reqId = generateRequestId();
309    
310                                    AbstractSessionRequest request = new OptionChainSnapshotRequest(reqId, 
311                                            c.getExchangeCode(),
312                                            c.getSymbolCode(),
313                                            c.getExpiration(),
314                                            c.getStrike(),
315                                            c.getVolumeStyle(),
316                                            c.getQuoteDelegate(),
317                                            c.getTradeDelegate(),
318                                            c.getMmQuoteDelegate(),
319                                            c.getBboDelegate(),
320                                            c.getCompletionDelegate());
321    
322                                    return newRequest(request);
323                            }
324                    });
325                    
326                    commandMap.put(OptionInitCommand.class, new IRequestPreparer() {
327    
328                            public IRequest prepareRequest(ICommand command) {
329                                    OptionInitCommand c = (OptionInitCommand) command;
330                                    int reqId = generateRequestId();
331    
332                                    AbstractSessionRequest request = new OptionInitRequest(reqId, 
333                                            c.getExchangeCode(),
334                                            c.getSymbolCode(),
335                                            c.getExpiration(),
336                                            c.getStrike(),
337                                            c.getDataDelegate(),
338                                            c.getCompletionDelegate());
339    
340                                    return newRequest(request);
341                            }
342                    });
343    
344                    commandMap.put(SnapshotCommand.class, new IRequestPreparer() {
345    
346                            public IRequest prepareRequest(ICommand command) {
347                                    SnapshotCommand c = (SnapshotCommand) command;
348                                    int reqId = generateRequestId();
349    
350                                    AbstractSessionRequest request = new SnapshotRequest(reqId, 
351                                            c.getExchangeCode(),
352                                            c.getSymbolCode(),
353                                            c.getVolumeStyle(),
354                                            c.getQuoteDelegate(),
355                                            c.getTradeDelegate(),
356                                            c.getMmQuoteDelegate(),
357                                            c.getBboDelegate(),
358                                            c.getCompletionDelegate());
359    
360                                    return newRequest(request);
361                            }
362                    });
363                    
364                    commandMap.put(SplitsCommand.class, new IRequestPreparer() {
365    
366                            public IRequest prepareRequest(ICommand command) {
367                                    SplitsCommand c = (SplitsCommand) command;
368                                    int reqId = generateRequestId();
369    
370                                    AbstractSessionRequest request = new SplitRequest(reqId, 
371                                                    c.getExchangeCode(),
372                                                    c.getSymbolCode(),
373                                                    c.getStartDate(),
374                                                    c.getEndDate(),
375                                                    c.getDataDelegate(),
376                                                    c.getCompletionDelegate());
377    
378                                    return newRequest(request);
379                            }
380                    });
381                    
382                    commandMap.put(TickStreamCommand.class, new IRequestPreparer() {
383    
384                            public IRequest prepareRequest(ICommand command) {
385                                    TickStreamCommand c = (TickStreamCommand) command;
386                                    int reqId = generateRequestId();
387    
388                                    AbstractSessionRequest request = new TickStreamExRequest(reqId, 
389                                            c.getExchangeCode(),
390                                            c.getSymbolCode(),
391                                            c.getVolumeStyle(),
392                                            c.getQuoteDelegate(),
393                                            c.getTradeDelegate(),
394                                            c.getMmQuoteDelegate(),
395                                            c.getBboDelegate(),
396                                            c.getCompletionDelegate());
397    
398                                    return newRequest(request);
399                            }
400                    });
401    
402                    commandMap.put(TodaysOHLCommand.class, new IRequestPreparer() {
403    
404                            public IRequest prepareRequest(ICommand command) {
405                                    TodaysOHLCommand c = (TodaysOHLCommand) command;
406                                    int reqId = generateRequestId();
407    
408                                    AbstractSessionRequest request = new TodaysOHLRequest(reqId, 
409                                            c.getExchangeCode(),
410                                            c.getSymbolCode(),
411                                            c.getDataDelegate(),
412                                            c.getCompletionDelegate());
413    
414                                    return newRequest(request);
415                            }
416                    });
417    
418                    commandMap.put(TickStreamWithSnapshotCommand.class, new IRequestPreparer() {
419    
420                            public IRequest prepareRequest(ICommand command) {
421                                    TickStreamWithSnapshotCommand c = (TickStreamWithSnapshotCommand) command;
422    
423                                    TickStreamWithSnapshotRequest request = new TickStreamWithSnapshotRequest(OTEngine.this, 
424                                            c.getExchangeCode(),
425                                            c.getSymbolCode(),
426                                            c.getVolumeStyle(),
427                                            c.getQuoteDelegate(),
428                                            c.getTradeDelegate(),
429                                            c.getMmQuoteDelegate(),
430                                            c.getBboDelegate(),
431                                            c.getCompletionDelegate());
432                                    request.prepareRequest();
433                                    return request;
434                            }
435                    });
436    
437                    commandMap.put(OptionChainWithSnapshotCommand.class, new IRequestPreparer() {
438    
439                            public IRequest prepareRequest(ICommand command) {
440                                    OptionChainWithSnapshotCommand c = (OptionChainWithSnapshotCommand) command;
441    
442                                    OptionChainWithSnapshotRequest request = new OptionChainWithSnapshotRequest(OTEngine.this,
443                                            c.getExchangeCode(),
444                                            c.getSymbolCode(),
445                                            c.getExpiration(),
446                                            c.getStrike(),
447                                            c.getVolumeStyle(),
448                                            c.getQuoteDelegate(),
449                                            c.getTradeDelegate(),
450                                            c.getMmQuoteDelegate(),
451                                            c.getBboDelegate(),
452                                            c.getCompletionDelegate());
453                                    request.prepareRequest();
454                                    return request;
455                            }
456                    });
457            }
458            
459            public IRequest prepareRequest(ICommand command) {
460    
461                    IRequestPreparer p = commandMap.get(command.getClass());
462                    if(p == null) {
463                            throw new IllegalArgumentException("Unknown (or unregistered) command: " + command.getClass());
464                    }
465                    
466                    return p.prepareRequest(command);
467            }
468    }