001    package org.otfeed.j2ee.ra;
002    
003    import java.util.LinkedList;
004    import java.util.List;
005    import java.util.Set;
006    
007    import javax.resource.ResourceException;
008    import javax.resource.spi.endpoint.MessageEndpointFactory;
009    
010    import org.otfeed.IConnection;
011    import org.otfeed.IRequest;
012    import org.otfeed.command.BookStreamCommand;
013    import org.otfeed.command.TickStreamCommand;
014    import org.otfeed.command.VolumeStyleEnum;
015    import org.otfeed.event.ICompletionDelegate;
016    import org.otfeed.event.IDataDelegate;
017    import org.otfeed.event.OTBBO;
018    import org.otfeed.event.OTBookCancel;
019    import org.otfeed.event.OTBookChange;
020    import org.otfeed.event.OTBookDelete;
021    import org.otfeed.event.OTBookExecute;
022    import org.otfeed.event.OTBookOrder;
023    import org.otfeed.event.OTBookPriceLevel;
024    import org.otfeed.event.OTBookPurge;
025    import org.otfeed.event.OTBookReplace;
026    import org.otfeed.event.OTError;
027    import org.otfeed.event.OTMMQuote;
028    import org.otfeed.event.OTQuote;
029    import org.otfeed.event.OTTrade;
030    import org.otfeed.protocol.DataEnum;
031    import org.otfeed.support.alt.IMessageListener;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    public class OtfeedInboundConnection {
036            
037            private static final Logger LOG = LoggerFactory.getLogger(OtfeedInboundConnection.class);
038            
039            private final IConnection engine;
040            private final List<IMessageListener> listeners = new LinkedList<IMessageListener>();
041    
042            public OtfeedInboundConnection(IConnection engine) {
043                    this.engine = engine;
044            }
045            
046            public void shutdown() {
047                    engine.shutdown();
048            }
049            
050            private final ICompletionDelegate tracingCompletionDelegate = new ICompletionDelegate() {
051                    public void onDataEnd(OTError error) {
052                            LOG.info("data stream completed: {}", error);
053                    }
054            };
055    
056    
057            private void initEndpoint(MessageEndpointFactory epFactory) throws ResourceException {
058                    
059                    final IMessageListener listener = (IMessageListener) epFactory.createEndpoint(null);
060                    
061                    synchronized(listeners) {
062                            listeners.add(listener);
063                    }
064                    
065                    listener.onInit(new IMessageListener.ISubscriber() {
066    
067                            public void subscribeToBookStream(String exchangeCode,
068                                            String symbolCode, Set<DataEnum> dataTypes) {
069                                    
070                                    LOG.trace("subscribing to book stream {}/{}", exchangeCode, symbolCode);
071                                    
072                                    if(dataTypes.isEmpty()) {
073                                            LOG.trace("empty data type set: nothing to do");
074                                            return; 
075                                    }
076                                    
077                                    BookStreamCommand command = new BookStreamCommand();
078                                    command.setExchangeCode(exchangeCode);
079                                    command.setSymbolCode(symbolCode);
080                                    
081                                    for(DataEnum dataType : dataTypes) {
082                                            switch(dataType) {
083                                            case BOOK_CANCEL:
084                                                    command.setCancelDelegate(new IDataDelegate<OTBookCancel>() {
085                                                            public void onData(OTBookCancel event) {
086                                                                    listener.onData(event);
087                                                            }
088                                                    });
089                                                    break;
090                                            case BOOK_CHANGE:
091                                                    command.setChangeDelegate(new IDataDelegate<OTBookChange>() {
092                                                            public void onData(OTBookChange event) {
093                                                                    listener.onData(event);
094                                                            }
095                                                    });
096                                                    break;
097                                                                    
098                                            case BOOK_DELETE:
099                                                    command.setDeleteDelegate(new IDataDelegate<OTBookDelete>() {
100                                                            public void onData(OTBookDelete event) {
101                                                                    listener.onData(event);
102                                                            }
103                                                    });
104                                                    break;
105                                                            
106                                            case BOOK_EXECUTE:
107                                                    command.setExecuteDelegate(new IDataDelegate<OTBookExecute>() {
108                                                            public void onData(OTBookExecute event) {
109                                                                    listener.onData(event);
110                                                            }
111                                                    });
112                                                    break;
113                                                            
114                                            case BOOK_ORDER:
115                                                    command.setOrderDelegate(new IDataDelegate<OTBookOrder>() {
116                                                            public void onData(OTBookOrder event) {
117                                                                    listener.onData(event);
118                                                            }
119                                                    });
120                                                    break;
121                                                            
122                                            case BOOK_PRICE_LEVEL:
123                                                    command.setPriceLevelDelegate(new IDataDelegate<OTBookPriceLevel>() {
124                                                            public void onData(OTBookPriceLevel event) {
125                                                                    listener.onData(event);
126                                                            }
127                                                    });
128                                                    break;
129                                                            
130                                            case BOOK_PURGE:
131                                                    command.setPurgeDelegate(new IDataDelegate<OTBookPurge>() {
132                                                            public void onData(OTBookPurge event) {
133                                                                    listener.onData(event);
134                                                            }
135                                                    });
136                                                    break;
137    
138                                            case BOOK_REPLACE:
139                                                    command.setReplaceDelegate(new IDataDelegate<OTBookReplace>() {
140                                                            public void onData(OTBookReplace event) {
141                                                                    listener.onData(event);
142                                                            }
143                                                    });
144                                                    break;
145                                                    
146                                            default: throw new IllegalArgumentException("unexpected type: " + dataType + " (only book data types are allowed here).");
147                                            }
148                                    }
149    
150                                    command.setCompletionDelegate(tracingCompletionDelegate);
151                                    
152                                    IRequest request = engine.prepareRequest(command);
153                                    request.submit();
154                            }
155    
156                            public void subscribeToTickStream(String exchangeCode,
157                                            String symbolCode, VolumeStyleEnum volumeStyle,
158                                            Set<DataEnum> dataTypes) {
159                                    LOG.trace("subscribing to tick stream {}/{}", exchangeCode, symbolCode);
160    
161                                    if(dataTypes.isEmpty()) {
162                                            LOG.trace("empty data type set: nothing to do");
163                                            return; 
164                                    }
165                                    
166                                    TickStreamCommand command = new TickStreamCommand();
167                                    command.setExchangeCode(exchangeCode);
168                                    command.setSymbolCode(symbolCode);
169                                    command.setVolumeStyle(volumeStyle);
170                                    
171                                    for(DataEnum dataType : dataTypes) {
172                                            switch(dataType) {
173                                            case TRADE:
174                                                    command.setTradeDelegate(new IDataDelegate<OTTrade>() {
175                                                            public void onData(OTTrade event) {
176                                                                    listener.onData(event);
177                                                            }
178                                                    });
179                                                    break;
180                                                    
181                                            case QUOTE:
182                                                    command.setQuoteDelegate(new IDataDelegate<OTQuote>() {
183                                                            public void onData(OTQuote event) {
184                                                                    listener.onData(event);
185                                                            }
186                                                    });
187                                                    break;
188                                                    
189                                            case BBO:
190                                                    command.setBboDelegate(new IDataDelegate<OTBBO>() {
191                                                            public void onData(OTBBO event) {
192                                                                    listener.onData(event);
193                                                            }
194                                                    });
195                                                    break;
196                                                    
197                                            case MMQUOTE:
198                                                    command.setMmQuoteDelegate(new IDataDelegate<OTMMQuote>() {
199                                                            public void onData(OTMMQuote event) {
200                                                                    listener.onData(event);
201                                                            }
202                                                    });
203                                                    break;
204    
205                                            default: 
206                                                    throw new IllegalArgumentException("unexpected type: " + dataType + " (only quote/trade data types are allowed here).");
207                                            }
208                                    }
209                                    
210                                    command.setCompletionDelegate(tracingCompletionDelegate);
211                                    
212                                    IRequest request = engine.prepareRequest(command);
213                                    request.submit();
214                            }
215                    });
216            }
217            
218            public void deployEndpoint(final MessageEndpointFactory epFactory) {
219                    
220                    engine.runInEventThread(new Runnable() {
221                            public void run() {
222                                    try {
223                                            initEndpoint(epFactory);
224                                    } catch (ResourceException e) {
225                                            LOG.warn("endpoint initialization failed", e);
226                                    }
227                            }
228                    });
229            }
230    }