001    /**
002     * Copyright 2007 Mike Kroutikov.
003     *
004     * This program is free software; you can redistribute it and/or modify
005     *   it under the terms of the Lesser GNU General Public License as 
006     *   published by the Free Software Foundation; either version 3 of
007     *   the License, or (at your option) any later version.
008     *
009     *   This program is distributed in the hope that it will be useful,
010     *   but WITHOUT ANY WARRANTY; without even the implied warranty of
011     *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
012     *   Lesser GNU General Public License for more details.
013     *
014     *   You should have received a copy of the Lesser GNU General Public License
015     *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
016     */
017    
018    package org.otfeed.protocol.request;
019    
020    import org.otfeed.event.ICompletionDelegate;
021    import org.otfeed.event.IDataDelegate;
022    import org.otfeed.event.OTBookCancel;
023    import org.otfeed.event.OTBookChange;
024    import org.otfeed.event.OTBookDelete;
025    import org.otfeed.event.OTBookExecute;
026    import org.otfeed.event.OTBookOrder;
027    import org.otfeed.event.OTBookPriceLevel;
028    import org.otfeed.event.OTBookPurge;
029    import org.otfeed.event.OTBookReplace;
030    import org.otfeed.protocol.CommandEnum;
031    import org.otfeed.protocol.DataEnum;
032    import org.otfeed.protocol.ProtocolException;
033    
034    import org.otfeed.protocol.request.book.BookReader;
035    
036    import java.nio.ByteBuffer;
037    
038    import java.util.Map;
039    import java.util.HashMap;
040    
041    /**
042     * Request to receive real time book event stream.
043     */
044    public final class BookStreamExRequest extends AbstractSessionRequest {
045    
046            private final String exchangeCode;
047            private final String symbolCode;
048            private final int mask;
049    
050            private final Map<Integer,BookReader> map = new HashMap<Integer,BookReader>();
051    
052            public String getExchangeCode()  { return exchangeCode; }
053            public String getSymbolCode()    { return symbolCode; }
054    
055            public BookStreamExRequest(int requestId, 
056                            String exchangeCode,
057                            String symbolCode,
058                            IDataDelegate<OTBookOrder>   orderDelegate,
059                            IDataDelegate<OTBookChange>  changeDelegate,
060                            IDataDelegate<OTBookReplace> replaceDelegate,
061                            IDataDelegate<OTBookCancel>  cancelDelegate,
062                            IDataDelegate<OTBookPurge>   purgeDelegate,
063                            IDataDelegate<OTBookExecute> executeDelegate,
064                            IDataDelegate<OTBookDelete>  deleteDelegate,
065                            IDataDelegate<OTBookPriceLevel> priceLevelDelegate,
066                            ICompletionDelegate completionDelegate) {
067    
068                    super(CommandEnum.REQUEST_BOOK_STREAM_EX, 
069                                    requestId,
070                                    completionDelegate);
071    
072                    Check.notNull(exchangeCode,  "exchangeCode");
073                    Check.notNull(symbolCode,  "symbolCode");
074    
075                    this.exchangeCode  = exchangeCode;
076                    this.symbolCode    = symbolCode;
077    
078                    int mask = 0;
079                    if(orderDelegate != null) {
080                            BookReader rdr = BookReader.orderReader(orderDelegate);
081                            map.put(rdr.type.code, rdr);
082                            mask |= rdr.mask;
083                    }
084                    if(changeDelegate != null) {
085                            BookReader rdr = BookReader.changeReader(changeDelegate);
086                            map.put(rdr.type.code, rdr);
087                            mask |= rdr.mask;
088                    }
089                    if(replaceDelegate != null) {
090                            BookReader rdr = BookReader.replaceReader(replaceDelegate);
091                            map.put(rdr.type.code, rdr);
092                            mask |= rdr.mask;
093                    }
094                    if(cancelDelegate != null) {
095                            BookReader rdr = BookReader.cancelReader(cancelDelegate);
096                            map.put(rdr.type.code, rdr);
097                            mask |= rdr.mask;
098                    }
099                    if(purgeDelegate != null) {
100                            BookReader rdr = BookReader.purgeReader(purgeDelegate);
101                            map.put(rdr.type.code, rdr);
102                            mask |= rdr.mask;
103                    }
104                    if(executeDelegate != null) {
105                            BookReader rdr = BookReader.executeReader(executeDelegate);
106                            map.put(rdr.type.code, rdr);
107                            mask |= rdr.mask;
108                    }
109                    if(deleteDelegate != null) {
110                            BookReader rdr = BookReader.deleteReader(deleteDelegate);
111                            map.put(rdr.type.code, rdr);
112                            mask |= rdr.mask;
113                    }
114                    if(priceLevelDelegate != null) {
115                            BookReader rdr = BookReader.priceLevelReader(priceLevelDelegate);
116                            map.put(rdr.type.code, rdr);
117                            mask |= rdr.mask;
118                    }
119                    
120                    if(mask == 0) {
121                            throw new IllegalArgumentException("you must set one of the book event delegates");
122                    }
123                    
124                    this.mask = mask;
125            }
126    
127            @Override
128            public void writeRequest(ByteBuffer out) {
129                    super.writeRequest(out);
130    
131    
132                    Util.writeString(out, exchangeCode, 15);
133                    Util.writeString(out, symbolCode, 15);
134                    out.put((byte) 0);
135                    out.put((byte) 0);
136                    out.putInt(mask);
137            }
138    
139            @Override
140            public JobStatus handleMessage(Header header, ByteBuffer in) {
141                    if(header.getCommand() != CommandEnum.REQUEST_BOOK_STREAM) {
142                            throw new ProtocolException("unexpected command: "
143                                    + header.getCommand(), in);
144                    }
145    
146                    int typeCode = in.get();
147                    if(typeCode == DataEnum.EOD.code) {
148                            return JobStatus.FINISHED;
149                    }
150    
151                    BookReader reader = map.get(typeCode);
152                    if(reader == null) {
153                            throw new ProtocolException("unrecognized type: " + typeCode, in);
154                    }
155    
156                    reader.read(header, in);
157    
158                    return JobStatus.ACTIVE;
159            }
160    
161            @Override
162            public final CommandEnum getCancelCommand() {
163                    return CommandEnum.CANCEL_BOOK_STREAM;
164            }
165    }