001 package org.otfeed.j2ee.ra;
002
003 import java.util.LinkedList;
004 import java.util.List;
005 import java.util.Set;
006
007 import javax.resource.ResourceException;
008 import javax.resource.spi.endpoint.MessageEndpointFactory;
009
010 import org.otfeed.IConnection;
011 import org.otfeed.IRequest;
012 import org.otfeed.command.BookStreamCommand;
013 import org.otfeed.command.TickStreamCommand;
014 import org.otfeed.command.VolumeStyleEnum;
015 import org.otfeed.event.ICompletionDelegate;
016 import org.otfeed.event.IDataDelegate;
017 import org.otfeed.event.OTBBO;
018 import org.otfeed.event.OTBookCancel;
019 import org.otfeed.event.OTBookChange;
020 import org.otfeed.event.OTBookDelete;
021 import org.otfeed.event.OTBookExecute;
022 import org.otfeed.event.OTBookOrder;
023 import org.otfeed.event.OTBookPriceLevel;
024 import org.otfeed.event.OTBookPurge;
025 import org.otfeed.event.OTBookReplace;
026 import org.otfeed.event.OTError;
027 import org.otfeed.event.OTMMQuote;
028 import org.otfeed.event.OTQuote;
029 import org.otfeed.event.OTTrade;
030 import org.otfeed.protocol.DataEnum;
031 import org.otfeed.support.alt.IMessageListener;
032 import org.slf4j.Logger;
033 import org.slf4j.LoggerFactory;
034
035 public class OtfeedInboundConnection {
036
037 private static final Logger LOG = LoggerFactory.getLogger(OtfeedInboundConnection.class);
038
039 private final IConnection engine;
040 private final List<IMessageListener> listeners = new LinkedList<IMessageListener>();
041
042 public OtfeedInboundConnection(IConnection engine) {
043 this.engine = engine;
044 }
045
046 public void shutdown() {
047 engine.shutdown();
048 }
049
050 private final ICompletionDelegate tracingCompletionDelegate = new ICompletionDelegate() {
051 public void onDataEnd(OTError error) {
052 LOG.info("data stream completed: {}", error);
053 }
054 };
055
056
057 private void initEndpoint(MessageEndpointFactory epFactory) throws ResourceException {
058
059 final IMessageListener listener = (IMessageListener) epFactory.createEndpoint(null);
060
061 synchronized(listeners) {
062 listeners.add(listener);
063 }
064
065 listener.onInit(new IMessageListener.ISubscriber() {
066
067 public void subscribeToBookStream(String exchangeCode,
068 String symbolCode, Set<DataEnum> dataTypes) {
069
070 LOG.trace("subscribing to book stream {}/{}", exchangeCode, symbolCode);
071
072 if(dataTypes.isEmpty()) {
073 LOG.trace("empty data type set: nothing to do");
074 return;
075 }
076
077 BookStreamCommand command = new BookStreamCommand();
078 command.setExchangeCode(exchangeCode);
079 command.setSymbolCode(symbolCode);
080
081 for(DataEnum dataType : dataTypes) {
082 switch(dataType) {
083 case BOOK_CANCEL:
084 command.setCancelDelegate(new IDataDelegate<OTBookCancel>() {
085 public void onData(OTBookCancel event) {
086 listener.onData(event);
087 }
088 });
089 break;
090 case BOOK_CHANGE:
091 command.setChangeDelegate(new IDataDelegate<OTBookChange>() {
092 public void onData(OTBookChange event) {
093 listener.onData(event);
094 }
095 });
096 break;
097
098 case BOOK_DELETE:
099 command.setDeleteDelegate(new IDataDelegate<OTBookDelete>() {
100 public void onData(OTBookDelete event) {
101 listener.onData(event);
102 }
103 });
104 break;
105
106 case BOOK_EXECUTE:
107 command.setExecuteDelegate(new IDataDelegate<OTBookExecute>() {
108 public void onData(OTBookExecute event) {
109 listener.onData(event);
110 }
111 });
112 break;
113
114 case BOOK_ORDER:
115 command.setOrderDelegate(new IDataDelegate<OTBookOrder>() {
116 public void onData(OTBookOrder event) {
117 listener.onData(event);
118 }
119 });
120 break;
121
122 case BOOK_PRICE_LEVEL:
123 command.setPriceLevelDelegate(new IDataDelegate<OTBookPriceLevel>() {
124 public void onData(OTBookPriceLevel event) {
125 listener.onData(event);
126 }
127 });
128 break;
129
130 case BOOK_PURGE:
131 command.setPurgeDelegate(new IDataDelegate<OTBookPurge>() {
132 public void onData(OTBookPurge event) {
133 listener.onData(event);
134 }
135 });
136 break;
137
138 case BOOK_REPLACE:
139 command.setReplaceDelegate(new IDataDelegate<OTBookReplace>() {
140 public void onData(OTBookReplace event) {
141 listener.onData(event);
142 }
143 });
144 break;
145
146 default: throw new IllegalArgumentException("unexpected type: " + dataType + " (only book data types are allowed here).");
147 }
148 }
149
150 command.setCompletionDelegate(tracingCompletionDelegate);
151
152 IRequest request = engine.prepareRequest(command);
153 request.submit();
154 }
155
156 public void subscribeToTickStream(String exchangeCode,
157 String symbolCode, VolumeStyleEnum volumeStyle,
158 Set<DataEnum> dataTypes) {
159 LOG.trace("subscribing to tick stream {}/{}", exchangeCode, symbolCode);
160
161 if(dataTypes.isEmpty()) {
162 LOG.trace("empty data type set: nothing to do");
163 return;
164 }
165
166 TickStreamCommand command = new TickStreamCommand();
167 command.setExchangeCode(exchangeCode);
168 command.setSymbolCode(symbolCode);
169 command.setVolumeStyle(volumeStyle);
170
171 for(DataEnum dataType : dataTypes) {
172 switch(dataType) {
173 case TRADE:
174 command.setTradeDelegate(new IDataDelegate<OTTrade>() {
175 public void onData(OTTrade event) {
176 listener.onData(event);
177 }
178 });
179 break;
180
181 case QUOTE:
182 command.setQuoteDelegate(new IDataDelegate<OTQuote>() {
183 public void onData(OTQuote event) {
184 listener.onData(event);
185 }
186 });
187 break;
188
189 case BBO:
190 command.setBboDelegate(new IDataDelegate<OTBBO>() {
191 public void onData(OTBBO event) {
192 listener.onData(event);
193 }
194 });
195 break;
196
197 case MMQUOTE:
198 command.setMmQuoteDelegate(new IDataDelegate<OTMMQuote>() {
199 public void onData(OTMMQuote event) {
200 listener.onData(event);
201 }
202 });
203 break;
204
205 default:
206 throw new IllegalArgumentException("unexpected type: " + dataType + " (only quote/trade data types are allowed here).");
207 }
208 }
209
210 command.setCompletionDelegate(tracingCompletionDelegate);
211
212 IRequest request = engine.prepareRequest(command);
213 request.submit();
214 }
215 });
216 }
217
218 public void deployEndpoint(final MessageEndpointFactory epFactory) {
219
220 engine.runInEventThread(new Runnable() {
221 public void run() {
222 try {
223 initEndpoint(epFactory);
224 } catch (ResourceException e) {
225 LOG.warn("endpoint initialization failed", e);
226 }
227 }
228 });
229 }
230 }