001    /**
002     * Copyright 2007 Mike Kroutikov.
003     *
004     * This program is free software; you can redistribute it and/or modify
005     *   it under the terms of the Lesser GNU General Public License as 
006     *   published by the Free Software Foundation; either version 3 of
007     *   the License, or (at your option) any later version.
008     *
009     *   This program is distributed in the hope that it will be useful,
010     *   but WITHOUT ANY WARRANTY; without even the implied warranty of
011     *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
012     *   Lesser GNU General Public License for more details.
013     *
014     *   You should have received a copy of the Lesser GNU General Public License
015     *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
016     */
017    
018    package org.otfeed.support.mock;
019    
020    import java.io.BufferedReader;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.io.InputStreamReader;
024    import java.io.StringWriter;
025    import java.nio.ByteBuffer;
026    import java.util.Iterator;
027    import java.util.LinkedList;
028    import java.util.List;
029    import java.util.Queue;
030    import java.util.concurrent.ConcurrentLinkedQueue;
031    import java.util.concurrent.atomic.AtomicBoolean;
032    
033    import org.otfeed.protocol.connector.IStreamer;
034    import org.otfeed.protocol.connector.IStreamerFactory;
035    import org.otfeed.support.BufferFormat;
036    import org.otfeed.support.IFormat;
037    
038    /**
039     * Mock streaming layer for offline testing.
040     * <p/>
041     * This object facilitates offline testing of object 
042     * marshalling code
043     * by returning a pre-recorded sequence of frames.
044     * <p/>
045     * Use this object to substitute {@link org.otfeed.OTConnectionFactory#getStreamerFactory() streamerFactory}
046     * of {@link org.otfeed.OTConnectionFactory OTConnectionFactory}.
047     * <p/>
048     * <em>IMPORTANT</em>: set {@link org.otfeed.OTConnectionFactory#setHeartbeatIntervalMillis(long) heartbeatInterval}
049     * property of <code>OTConnectionListener</code> to reasonably
050     * large value to make sure that heartbeat frames are not
051     * sent to the streamer layer. Otherwise, programs that
052     * <code>MockStreamerFactory</code> executes may be confused by
053     * receiving a heartbeat packet.
054     * <p/>
055     * <em>IMPORTANT</em>: recorded packets must contain correct requestId. Otherwise
056     * they will be silently dropped. Request id starts with zero (login request)
057     * and is incremented for every command prepared (event if not submitted).
058     * Thus, first request will have id of one, second - id of two, etc.
059     * Practically, its simpler to restrict testing to just a single command
060     * and then destroy the connection. This way, all frames for a command
061     * to be tested must have request id of 1.
062     * <p/>
063     * Next, build a list of operations that will be the "program"
064     * for the streamer. Assign this list to the {@link #setOpList(List) opList}
065     * property of <code>MockStreamerFactory</code>.
066     * <p/>
067     * Example program may look like this:
068     * <pre>
069     * // EXPECT ANY (will be login request frame)
070     * mockStreamerFactory.getOpList().add(MockStreamerFactory.expectAny());
071     * // SEND "login OK" response
072     * mockStreamerFactory.getOpList().add(MockStreamerFactory.sendLoginOK());
073     * // WAIT for any buffer
074     * mockStreamerFactory.getOpList().add(MockStreamerFactory.expectAny());
075     * // No more elements in the list: streamer will emulate
076     * // disconnect from the server (read error).  
077     * </pre>
078     */
079    public class MockStreamerFactory implements IStreamerFactory {
080    
081            public interface Op { } // marker interface
082            
083            private static final IFormat<ByteBuffer> FORMAT = new BufferFormat();
084    
085            private static final String FRAME_LOGIN_OK =
086                    "% reply from server 'successfull login'\n"
087                    + "02010000 01000000 00000000 61663765  %............af7e\n"
088                    + "61326137 38643533 35333934 64343839  %a2a78d535394d489\n"
089                    + "62306135 32633464 30643631 34653834  %b0a52c4d0d614e84\n"
090                    + "36326132 37313064 61386534 66646261  %62a2710da8e4fdba\n"
091                    + "62643130 35353138 38613035 00000000  %bd1055188a05....\n"
092                    + "00000000 00000000 00000000 00000000  %................\n"
093                    + "00000000 00000000 00000000 00000000  %................\n"
094                    + "00000000 00000000 00000000 00000000  %................\n"
095                    + "00000000 00000000 00000000 000000    %...............\n";
096    
097            private static final String FRAME_LOGIN_FAILED =
098                    "% reply from server 'login failed'\n"
099                    + "0202f7da 01000000 00000000 e9031900  %................\n"
100                    + "42616420 75736572 6e616d65 206f7220  %Bad.username.or.\n"
101                    + "70617373 776f7264 00                 %password.\n";
102            
103            /**
104             * Helper: creates a <code>ByteBuffer</code> from
105             * its hex-dump String representation.
106             * 
107             * @param bufferString string in the hex format.
108             * @return buffer.
109             */
110            public static ByteBuffer parse(String bufferString) {
111                    return FORMAT.parse(bufferString);
112            }
113            
114            /**
115             * Helper: creates a <code>ByteBuffer</code> by parsing
116             * a resource file.
117             * 
118             * @param resourceName name of the resource.
119             * @return buffer.
120             * @throws IOException on IO error.
121             */
122            public static ByteBuffer parseResource(String resourceName) throws IOException {
123                    InputStream input = MockStreamerFactory.class.getClassLoader().getResourceAsStream(resourceName);
124                    if(input == null) {
125                            throw new IOException("resource named [" + resourceName + "] not found");
126                    }
127                    BufferedReader reader = new BufferedReader(
128                                    new InputStreamReader(
129                                                    input, "UTF-8"));
130                    StringWriter writer = new StringWriter();
131                    String line = null;
132                    while((line = reader.readLine()) != null) {
133                            writer.write(line + "\n");
134                    }
135    
136                    return parse(writer.toString());
137            }
138            
139            /**
140             * Helper: creates a "SEND" operation with the buffer
141             * telling "login was OK".
142             * 
143             * @return send operation.
144             */
145            public static Op sendLoginOK() {
146                    return send(parse(FRAME_LOGIN_OK));
147            }
148            
149            /**
150             * Helper: creates a "SEND" operation with the buffer
151             * telling that "login failed".
152             * 
153             * @return send operation.
154             */
155            public static Op sendLoginFailed() {
156                    return send(parse(FRAME_LOGIN_FAILED));
157            }
158            
159            /**
160             * Creates new mock streamer. 
161             * 
162             */
163            public MockStreamerFactory() { }
164            
165            private IFormat<ByteBuffer> format = new BufferFormat();
166            
167            /**
168             * Format to use when parsing ByteBuffer s.
169             * Defaults to {@link BufferFormat}.
170             *
171             * @return format.
172             */
173            public IFormat<ByteBuffer> getFormat() {
174                    return format;
175            }
176            
177            /**
178             * Sets format.
179             * 
180             * @param val format.
181             */
182            public void setFormat(IFormat<ByteBuffer> val) {
183                    format = val;
184            }
185            
186            private static class ExpectOp implements Op {
187                    ByteBuffer expect;
188                    private ExpectOp(ByteBuffer b) {
189                            expect = b;
190                    }
191                    private ExpectOp() {
192                            this(null);
193                    }
194            }
195            
196            private static class SendOp implements Op {
197                    ByteBuffer send;
198                    
199                    public SendOp(ByteBuffer s) {
200                            send = s;
201                    }
202            }
203            
204            /**
205             * Creates an operation of "EXPECT ANY BUFFER" type.
206             * This one will match any received buffer.
207             * 
208             * @return operation.
209             */
210            public static Op expectAny() {
211                    return new ExpectOp();
212            }
213            
214            /**
215             * Creates an operation of "EXPECT A BUFFER" type.
216             * This one will try to match the model buffer
217             * against the actual one received from the upper layer.
218             * 
219             * @param e - model buffer.
220             * @return operation.
221             */
222            public static Op expectBuffer(ByteBuffer e) {
223                    return new ExpectOp(e);
224            }
225            
226            /**
227             * Creates an operation of "SEND" type.
228             * This one sends the buffer back to the driver (and, ultimately,
229             * to the client application as a set of events).
230             * 
231             * @param s buffer to send.
232             * @return operation.
233             */
234            public static Op send(ByteBuffer s) {
235                    return new SendOp(s);
236            }
237    
238            private List<Op> opList = new LinkedList<Op>();
239    
240            /**
241             * List of operations that drive the mock streamer.
242             * <p/>
243             * List of operations specifies program that mock streamer will
244             * execute. Following operations are defined:
245             * <ul>
246             * <li>expect(ByteBuffer b): waits for the user action
247             * (i.e. a buffer that upper layer sends to the
248             * streamer. Compares that buffer sent matches exactly the 
249             * one that is supplied to expect operation. If matches,
250             * continues the execution. if does not match, throws an 
251             * exception. To create this kind of operation, use
252             * {@link #expectBuffer(ByteBuffer)} helper.
253             * <li>expect(): wait for a buffer and then continues
254             * (ignores the buffer content). To create this kind of operation
255             * use {@link #expectAny()} helper.
256             * <li>send(ByteBuffer b): sends the buffer back to the upper layer
257             * for decoding and presenting to the user as a an event of
258             * a set of events. To create this kind of operation
259             * user {@link #send(ByteBuffer)} helper.
260             * </ul>
261             * 
262             * @return op list.
263             */
264            public List<Op> getOpList() {
265                    return opList;
266            }
267            
268            /**
269             * Sets op list.
270             * 
271             * @param val op list.
272             */
273            public void setOpList(List<Op> val) {
274                    opList = val;
275            }
276            
277            private boolean trace = false;
278            
279            /**
280             * Trace flag makes streamer print all frames
281             * to screen.
282             * 
283             * @return trace flag.
284             */
285            public boolean getTrace() {
286                    return trace;
287            }
288            
289            /**
290             * Sets trace flag.
291             * 
292             * @param val trace value.
293             */
294            public void setTrace(boolean val) {
295                    trace = val;
296            }
297            
298            private void trace(String info) {
299                    if(trace) { 
300                            System.out.println("[" + id + "] " + info);
301                    }
302            }
303            
304            private String id;
305            
306            /**
307             * Identification string.
308             * 
309             * @return id string.
310             */
311            public String getId() {
312                    return id;
313            }
314    
315            /**
316             * Sets identification string.
317             * 
318             * @param val id string.
319             */
320            public void setId(String val) {
321                    id = val;
322            }
323            
324            private void trace(String info, ByteBuffer buffer) {
325                    if(trace) {
326                            System.out.println("[" + id + "] " + info + "\n" + FORMAT.format(buffer));
327                    }
328            }
329    
330            private static ByteBuffer copyBuffer(ByteBuffer in) {
331                    ByteBuffer copy = ByteBuffer.allocate(in.capacity());
332                    copy.put(in.duplicate());
333                    copy.flip();
334                    return copy;
335            }
336    
337            public IStreamer connect(String host, int port) throws IOException {
338                    
339                    trace("connected");
340                    
341                    final Iterator<Op> iterator = opList.iterator();
342                    
343                    return new IStreamer() {
344                            
345                            private final AtomicBoolean isClosed = new AtomicBoolean(false);
346                            private final Queue<ByteBuffer> sentQueue = new ConcurrentLinkedQueue<ByteBuffer>();
347    
348                            public void close() { 
349                                    isClosed.set(true);
350                                    synchronized(isClosed) {
351                                            isClosed.notifyAll();
352                                    }
353                                    
354                                    trace("close() called");
355                            }
356    
357                            public ByteBuffer read() throws IOException {
358                    
359                                    trace("read");
360    
361                                    while(iterator.hasNext()) {
362                                            Op op = iterator.next();
363                                            if(op instanceof ExpectOp) {
364                                                    ExpectOp eop = (ExpectOp) op;
365                                                    ByteBuffer buffer = null;
366    
367                                                    trace("read: EXPECT is waiting for buffer");
368                                                    while(!isClosed.get() 
369                                                                    && (buffer = sentQueue.poll()) == null) {
370                                                            synchronized(isClosed) {
371                                                                    try {
372                                                                            isClosed.wait();
373                                                                    } catch(InterruptedException ex) {
374                                                                            throw new IOException("interrupted?");
375                                                                    }
376                                                            }
377                                                    }
378                                            
379                                                    if(isClosed.get()) {
380                                                            trace("read: closed");
381                                                            throw new IOException("closed");
382                                                    }
383                                            
384                                                    trace("read: EXPECT received buffer:", buffer);
385    
386                                                    if(eop.expect == null) {
387                                                            trace("read: buffer matched (wildcard)");
388                                                    } else if(eop.expect.compareTo(buffer) != 0) {
389                                                            trace("read: model and actual buffer are not identical. Expected: ", eop.expect);
390                                                            throw new IOException("model and actual buffers are not identical");
391                                                    } else {
392                                                            trace("read: buffer matched");
393                                                    }
394                                            } else if(op instanceof SendOp) {
395                                                    SendOp sop = (SendOp) op;
396    
397                                                    trace("read: SEND operation processed:", sop.send);
398                                                    
399                                                    return sop.send;
400                                            } else {
401                                                    throw new AssertionError("unexpected op: " + op);
402                                            }
403                                    }
404                                    
405                                    trace("read: no more ops");
406                                    throw new IOException("eof: no more ops");
407                            }
408    
409                            public void write(ByteBuffer out) throws IOException {
410                                    // need to copy, because caller may re-use this buffer for the next frame
411                                    ByteBuffer copy = copyBuffer(out);
412                                    trace("write:\n" + FORMAT.format(copy));
413                                    
414                                    sentQueue.offer(copy);
415                                    synchronized(isClosed) {
416                                            isClosed.notifyAll();
417                                    }
418                            }
419                    };
420            }
421    }