001 /**
002 * Copyright 2007 Mike Kroutikov.
003 *
004 * This program is free software; you can redistribute it and/or modify
005 * it under the terms of the Lesser GNU General Public License as
006 * published by the Free Software Foundation; either version 3 of
007 * the License, or (at your option) any later version.
008 *
009 * This program is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012 * Lesser GNU General Public License for more details.
013 *
014 * You should have received a copy of the Lesser GNU General Public License
015 * along with this program. If not, see <http://www.gnu.org/licenses/>.
016 */
017
018 package org.otfeed.protocol.connector;
019
020 import java.util.HashMap;
021 import java.util.Map;
022 import java.util.concurrent.Executor;
023 import java.util.concurrent.atomic.AtomicBoolean;
024 import java.util.concurrent.atomic.AtomicInteger;
025
026 import org.otfeed.IConnection;
027 import org.otfeed.IRequest;
028 import org.otfeed.OTConnectionSpec;
029 import org.otfeed.command.*;
030 import org.otfeed.event.IConnectionStateListener;
031 import org.otfeed.event.OTError;
032 import org.otfeed.protocol.ICommand;
033 import org.otfeed.protocol.request.AbstractSessionRequest;
034 import org.otfeed.protocol.request.BookStreamExRequest;
035 import org.otfeed.protocol.request.DividendRequest;
036 import org.otfeed.protocol.request.EquityInitRequest;
037 import org.otfeed.protocol.request.HistBookRequest;
038 import org.otfeed.protocol.request.HistDataRequest;
039 import org.otfeed.protocol.request.HistTicksRequest;
040 import org.otfeed.protocol.request.ListExchangesRequest;
041 import org.otfeed.protocol.request.ListSymbolsExRequest;
042 import org.otfeed.protocol.request.OptionChainRequest;
043 import org.otfeed.protocol.request.OptionChainSnapshotRequest;
044 import org.otfeed.protocol.request.OptionChainWithSnapshotRequest;
045 import org.otfeed.protocol.request.OptionInitRequest;
046 import org.otfeed.protocol.request.SnapshotRequest;
047 import org.otfeed.protocol.request.SplitRequest;
048 import org.otfeed.protocol.request.TickStreamExRequest;
049 import org.otfeed.protocol.request.TickStreamWithSnapshotRequest;
050 import org.otfeed.protocol.request.TodaysOHLRequest;
051 import org.otfeed.support.IBufferAllocator;
052
053 public class OTEngine extends OTThreadingEngine implements IConnection {
054
055 private interface IRequestPreparer {
056 IRequest prepareRequest(ICommand r);
057 }
058
059 private final Map<Class<?>,IRequestPreparer> commandMap
060 = new HashMap<Class<?>,IRequestPreparer>();
061
062 private class RequestHandler implements IRequest {
063
064 private final AbstractSessionRequest request;
065
066 private final AtomicBoolean isActiveFlag
067 = new AtomicBoolean(false);
068
069 private RequestHandler(AbstractSessionRequest r) {
070 request = r;
071 }
072
073 public boolean isCompleted() {
074 return request.isCompleted();
075 }
076
077 public OTError getError() {
078 return request.getError();
079 }
080
081 public void submit() {
082 if(isActiveFlag.getAndSet(true)) {
083 return; // already started
084 }
085
086 OTEngine.this.submit(request);
087 }
088
089 public void cancel() {
090
091 if(!isActiveFlag.getAndSet(false)) {
092 return; // not running
093 }
094
095 OTEngine.this.cancel(
096 generateRequestId(),
097 request);
098 }
099
100 public void waitForCompletion() {
101 request.waitForCompletion();
102 }
103
104 public boolean waitForCompletion(long millis) {
105 return request.waitForCompletion(millis);
106 }
107 }
108
109 private final AtomicInteger requestCounter = new AtomicInteger(0);
110
111 private int generateRequestId() {
112 return requestCounter.incrementAndGet();
113 }
114
115 private IRequest newRequest(AbstractSessionRequest request) {
116
117 return new RequestHandler(request);
118 }
119
120 public OTEngine(ISessionStreamerFactory factory,
121 OTConnectionSpec spec,
122 IBufferAllocator allocator,
123 Executor executor,
124 IConnectionStateListener listener) {
125 super(factory, spec, allocator, executor, listener);
126
127 // configure request maps
128
129 commandMap.put(BookStreamCommand.class, new IRequestPreparer() {
130
131 public IRequest prepareRequest(ICommand command) {
132 BookStreamCommand c = (BookStreamCommand) command;
133 int reqId = generateRequestId();
134
135 AbstractSessionRequest request = new BookStreamExRequest(reqId,
136 c.getExchangeCode(),
137 c.getSymbolCode(),
138 c.getOrderDelegate(),
139 c.getChangeDelegate(),
140 c.getReplaceDelegate(),
141 c.getCancelDelegate(),
142 c.getPurgeDelegate(),
143 c.getExecuteDelegate(),
144 c.getDeleteDelegate(),
145 c.getPriceLevelDelegate(),
146 c.getCompletionDelegate());
147
148 return newRequest(request);
149 }
150 });
151
152 commandMap.put(DividendsCommand.class, new IRequestPreparer() {
153
154 public IRequest prepareRequest(ICommand command) {
155 DividendsCommand c = (DividendsCommand) command;
156 int reqId = generateRequestId();
157
158 AbstractSessionRequest request = new DividendRequest(reqId,
159 c.getExchangeCode(),
160 c.getSymbolCode(),
161 c.getStartDate(),
162 c.getEndDate(),
163 c.getDataDelegate(),
164 c.getCompletionDelegate());
165 return newRequest(request);
166 }
167 });
168
169 commandMap.put(EquityInitCommand.class, new IRequestPreparer() {
170
171 public IRequest prepareRequest(ICommand command) {
172 EquityInitCommand c = (EquityInitCommand) command;
173 int reqId = generateRequestId();
174
175 AbstractSessionRequest request = new EquityInitRequest(reqId,
176 c.getExchangeCode(),
177 c.getSymbolCode(),
178 c.getDataDelegate(),
179 c.getCompletionDelegate());
180
181 return newRequest(request);
182 }
183 });
184
185 commandMap.put(HistBooksCommand.class, new IRequestPreparer() {
186
187 public IRequest prepareRequest(ICommand command) {
188 HistBooksCommand c = (HistBooksCommand) command;
189 int reqId = generateRequestId();
190
191 AbstractSessionRequest request = new HistBookRequest(reqId,
192 c.getExchangeCode(),
193 c.getSymbolCode(),
194 c.getStartDate(),
195 c.getEndDate(),
196 c.getOrderDelegate(),
197 c.getChangeDelegate(),
198 c.getReplaceDelegate(),
199 c.getCancelDelegate(),
200 c.getPurgeDelegate(),
201 c.getExecuteDelegate(),
202 c.getDeleteDelegate(),
203 c.getPriceLevelDelegate(),
204 c.getCompletionDelegate());
205
206 return newRequest(request);
207 }
208 });
209
210 commandMap.put(HistDataCommand.class, new IRequestPreparer() {
211
212 public IRequest prepareRequest(ICommand command) {
213 HistDataCommand c = (HistDataCommand) command;
214 int reqId = generateRequestId();
215
216 AbstractSessionRequest request = new HistDataRequest(reqId,
217 c.getExchangeCode(),
218 c.getSymbolCode(),
219 c.getStartDate(),
220 c.getEndDate(),
221 c.getAggregationSpan(),
222 c.getDataDelegate(),
223 c.getCompletionDelegate());
224
225 return newRequest(request);
226 }
227 });
228
229 commandMap.put(HistTicksCommand.class, new IRequestPreparer() {
230
231 public IRequest prepareRequest(ICommand command) {
232 HistTicksCommand c = (HistTicksCommand) command;
233 int reqId = generateRequestId();
234
235 AbstractSessionRequest request = new HistTicksRequest(reqId,
236 c.getExchangeCode(),
237 c.getSymbolCode(),
238 c.getStartDate(),
239 c.getEndDate(),
240 c.getVolumeStyle(),
241 c.getQuoteDelegate(),
242 c.getTradeDelegate(),
243 c.getMmQuoteDelegate(),
244 c.getBboDelegate(),
245 c.getCompletionDelegate());
246
247 return newRequest(request);
248 }
249 });
250
251 commandMap.put(ListExchangesCommand.class, new IRequestPreparer() {
252
253 public IRequest prepareRequest(ICommand command) {
254 ListExchangesCommand c = (ListExchangesCommand) command;
255 int reqId = generateRequestId();
256
257 AbstractSessionRequest request = new ListExchangesRequest(reqId,
258 c.getDataDelegate(),
259 c.getCompletionDelegate());
260 return newRequest(request);
261 }
262 });
263
264 commandMap.put(ListSymbolsCommand.class, new IRequestPreparer() {
265
266 public IRequest prepareRequest(ICommand command) {
267 ListSymbolsCommand c = (ListSymbolsCommand) command;
268 int reqId = generateRequestId();
269
270 AbstractSessionRequest request = new ListSymbolsExRequest(reqId,
271 c.getExchangeCode(),
272 c.getSymbolCodePattern(),
273 c.getTypes(),
274 c.getMatchStyle(),
275 c.getDataDelegate(),
276 c.getCompletionDelegate());
277
278 return newRequest(request);
279 }
280 });
281
282 commandMap.put(OptionChainCommand.class, new IRequestPreparer() {
283
284 public IRequest prepareRequest(ICommand command) {
285 OptionChainCommand c = (OptionChainCommand) command;
286 int reqId = generateRequestId();
287
288 AbstractSessionRequest request = new OptionChainRequest(reqId,
289 c.getExchangeCode(),
290 c.getSymbolCode(),
291 c.getExpiration(),
292 c.getStrike(),
293 c.getVolumeStyle(),
294 c.getQuoteDelegate(),
295 c.getTradeDelegate(),
296 c.getMmQuoteDelegate(),
297 c.getBboDelegate(),
298 c.getCompletionDelegate());
299
300 return newRequest(request);
301 }
302 });
303
304 commandMap.put(OptionChainSnapshotCommand.class, new IRequestPreparer() {
305
306 public IRequest prepareRequest(ICommand command) {
307 OptionChainSnapshotCommand c = (OptionChainSnapshotCommand) command;
308 int reqId = generateRequestId();
309
310 AbstractSessionRequest request = new OptionChainSnapshotRequest(reqId,
311 c.getExchangeCode(),
312 c.getSymbolCode(),
313 c.getExpiration(),
314 c.getStrike(),
315 c.getVolumeStyle(),
316 c.getQuoteDelegate(),
317 c.getTradeDelegate(),
318 c.getMmQuoteDelegate(),
319 c.getBboDelegate(),
320 c.getCompletionDelegate());
321
322 return newRequest(request);
323 }
324 });
325
326 commandMap.put(OptionInitCommand.class, new IRequestPreparer() {
327
328 public IRequest prepareRequest(ICommand command) {
329 OptionInitCommand c = (OptionInitCommand) command;
330 int reqId = generateRequestId();
331
332 AbstractSessionRequest request = new OptionInitRequest(reqId,
333 c.getExchangeCode(),
334 c.getSymbolCode(),
335 c.getExpiration(),
336 c.getStrike(),
337 c.getDataDelegate(),
338 c.getCompletionDelegate());
339
340 return newRequest(request);
341 }
342 });
343
344 commandMap.put(SnapshotCommand.class, new IRequestPreparer() {
345
346 public IRequest prepareRequest(ICommand command) {
347 SnapshotCommand c = (SnapshotCommand) command;
348 int reqId = generateRequestId();
349
350 AbstractSessionRequest request = new SnapshotRequest(reqId,
351 c.getExchangeCode(),
352 c.getSymbolCode(),
353 c.getVolumeStyle(),
354 c.getQuoteDelegate(),
355 c.getTradeDelegate(),
356 c.getMmQuoteDelegate(),
357 c.getBboDelegate(),
358 c.getCompletionDelegate());
359
360 return newRequest(request);
361 }
362 });
363
364 commandMap.put(SplitsCommand.class, new IRequestPreparer() {
365
366 public IRequest prepareRequest(ICommand command) {
367 SplitsCommand c = (SplitsCommand) command;
368 int reqId = generateRequestId();
369
370 AbstractSessionRequest request = new SplitRequest(reqId,
371 c.getExchangeCode(),
372 c.getSymbolCode(),
373 c.getStartDate(),
374 c.getEndDate(),
375 c.getDataDelegate(),
376 c.getCompletionDelegate());
377
378 return newRequest(request);
379 }
380 });
381
382 commandMap.put(TickStreamCommand.class, new IRequestPreparer() {
383
384 public IRequest prepareRequest(ICommand command) {
385 TickStreamCommand c = (TickStreamCommand) command;
386 int reqId = generateRequestId();
387
388 AbstractSessionRequest request = new TickStreamExRequest(reqId,
389 c.getExchangeCode(),
390 c.getSymbolCode(),
391 c.getVolumeStyle(),
392 c.getQuoteDelegate(),
393 c.getTradeDelegate(),
394 c.getMmQuoteDelegate(),
395 c.getBboDelegate(),
396 c.getCompletionDelegate());
397
398 return newRequest(request);
399 }
400 });
401
402 commandMap.put(TodaysOHLCommand.class, new IRequestPreparer() {
403
404 public IRequest prepareRequest(ICommand command) {
405 TodaysOHLCommand c = (TodaysOHLCommand) command;
406 int reqId = generateRequestId();
407
408 AbstractSessionRequest request = new TodaysOHLRequest(reqId,
409 c.getExchangeCode(),
410 c.getSymbolCode(),
411 c.getDataDelegate(),
412 c.getCompletionDelegate());
413
414 return newRequest(request);
415 }
416 });
417
418 commandMap.put(TickStreamWithSnapshotCommand.class, new IRequestPreparer() {
419
420 public IRequest prepareRequest(ICommand command) {
421 TickStreamWithSnapshotCommand c = (TickStreamWithSnapshotCommand) command;
422
423 TickStreamWithSnapshotRequest request = new TickStreamWithSnapshotRequest(OTEngine.this,
424 c.getExchangeCode(),
425 c.getSymbolCode(),
426 c.getVolumeStyle(),
427 c.getQuoteDelegate(),
428 c.getTradeDelegate(),
429 c.getMmQuoteDelegate(),
430 c.getBboDelegate(),
431 c.getCompletionDelegate());
432 request.prepareRequest();
433 return request;
434 }
435 });
436
437 commandMap.put(OptionChainWithSnapshotCommand.class, new IRequestPreparer() {
438
439 public IRequest prepareRequest(ICommand command) {
440 OptionChainWithSnapshotCommand c = (OptionChainWithSnapshotCommand) command;
441
442 OptionChainWithSnapshotRequest request = new OptionChainWithSnapshotRequest(OTEngine.this,
443 c.getExchangeCode(),
444 c.getSymbolCode(),
445 c.getExpiration(),
446 c.getStrike(),
447 c.getVolumeStyle(),
448 c.getQuoteDelegate(),
449 c.getTradeDelegate(),
450 c.getMmQuoteDelegate(),
451 c.getBboDelegate(),
452 c.getCompletionDelegate());
453 request.prepareRequest();
454 return request;
455 }
456 });
457 }
458
459 public IRequest prepareRequest(ICommand command) {
460
461 IRequestPreparer p = commandMap.get(command.getClass());
462 if(p == null) {
463 throw new IllegalArgumentException("Unknown (or unregistered) command: " + command.getClass());
464 }
465
466 return p.prepareRequest(command);
467 }
468 }