001 /**
002 * Copyright 2007 Mike Kroutikov.
003 *
004 * This program is free software; you can redistribute it and/or modify
005 * it under the terms of the Lesser GNU General Public License as
006 * published by the Free Software Foundation; either version 3 of
007 * the License, or (at your option) any later version.
008 *
009 * This program is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012 * Lesser GNU General Public License for more details.
013 *
014 * You should have received a copy of the Lesser GNU General Public License
015 * along with this program. If not, see <http://www.gnu.org/licenses/>.
016 */
017
018 package org.otfeed.support.mock;
019
020 import java.io.BufferedReader;
021 import java.io.IOException;
022 import java.io.InputStream;
023 import java.io.InputStreamReader;
024 import java.io.StringWriter;
025 import java.nio.ByteBuffer;
026 import java.util.Iterator;
027 import java.util.LinkedList;
028 import java.util.List;
029 import java.util.Queue;
030 import java.util.concurrent.ConcurrentLinkedQueue;
031 import java.util.concurrent.atomic.AtomicBoolean;
032
033 import org.otfeed.protocol.connector.IStreamer;
034 import org.otfeed.protocol.connector.IStreamerFactory;
035 import org.otfeed.support.BufferFormat;
036 import org.otfeed.support.IFormat;
037
038 /**
039 * Mock streaming layer for offline testing.
040 * <p/>
041 * This object facilitates offline testing of object
042 * marshalling code
043 * by returning a pre-recorded sequence of frames.
044 * <p/>
045 * Use this object to substitute {@link org.otfeed.OTConnectionFactory#getStreamerFactory() streamerFactory}
046 * of {@link org.otfeed.OTConnectionFactory OTConnectionFactory}.
047 * <p/>
048 * <em>IMPORTANT</em>: set {@link org.otfeed.OTConnectionFactory#setHeartbeatIntervalMillis(long) heartbeatInterval}
049 * property of <code>OTConnectionListener</code> to reasonably
050 * large value to make sure that heartbeat frames are not
051 * sent to the streamer layer. Otherwise, programs that
052 * <code>MockStreamerFactory</code> executes may be confused by
053 * receiving a heartbeat packet.
054 * <p/>
055 * <em>IMPORTANT</em>: recorded packets must contain correct requestId. Otherwise
056 * they will be silently dropped. Request id starts with zero (login request)
057 * and is incremented for every command prepared (event if not submitted).
058 * Thus, first request will have id of one, second - id of two, etc.
059 * Practically, its simpler to restrict testing to just a single command
060 * and then destroy the connection. This way, all frames for a command
061 * to be tested must have request id of 1.
062 * <p/>
063 * Next, build a list of operations that will be the "program"
064 * for the streamer. Assign this list to the {@link #setOpList(List) opList}
065 * property of <code>MockStreamerFactory</code>.
066 * <p/>
067 * Example program may look like this:
068 * <pre>
069 * // EXPECT ANY (will be login request frame)
070 * mockStreamerFactory.getOpList().add(MockStreamerFactory.expectAny());
071 * // SEND "login OK" response
072 * mockStreamerFactory.getOpList().add(MockStreamerFactory.sendLoginOK());
073 * // WAIT for any buffer
074 * mockStreamerFactory.getOpList().add(MockStreamerFactory.expectAny());
075 * // No more elements in the list: streamer will emulate
076 * // disconnect from the server (read error).
077 * </pre>
078 */
079 public class MockStreamerFactory implements IStreamerFactory {
080
081 public interface Op { } // marker interface
082
083 private static final IFormat<ByteBuffer> FORMAT = new BufferFormat();
084
085 private static final String FRAME_LOGIN_OK =
086 "% reply from server 'successfull login'\n"
087 + "02010000 01000000 00000000 61663765 %............af7e\n"
088 + "61326137 38643533 35333934 64343839 %a2a78d535394d489\n"
089 + "62306135 32633464 30643631 34653834 %b0a52c4d0d614e84\n"
090 + "36326132 37313064 61386534 66646261 %62a2710da8e4fdba\n"
091 + "62643130 35353138 38613035 00000000 %bd1055188a05....\n"
092 + "00000000 00000000 00000000 00000000 %................\n"
093 + "00000000 00000000 00000000 00000000 %................\n"
094 + "00000000 00000000 00000000 00000000 %................\n"
095 + "00000000 00000000 00000000 000000 %...............\n";
096
097 private static final String FRAME_LOGIN_FAILED =
098 "% reply from server 'login failed'\n"
099 + "0202f7da 01000000 00000000 e9031900 %................\n"
100 + "42616420 75736572 6e616d65 206f7220 %Bad.username.or.\n"
101 + "70617373 776f7264 00 %password.\n";
102
103 /**
104 * Helper: creates a <code>ByteBuffer</code> from
105 * its hex-dump String representation.
106 *
107 * @param bufferString string in the hex format.
108 * @return buffer.
109 */
110 public static ByteBuffer parse(String bufferString) {
111 return FORMAT.parse(bufferString);
112 }
113
114 /**
115 * Helper: creates a <code>ByteBuffer</code> by parsing
116 * a resource file.
117 *
118 * @param resourceName name of the resource.
119 * @return buffer.
120 * @throws IOException on IO error.
121 */
122 public static ByteBuffer parseResource(String resourceName) throws IOException {
123 InputStream input = MockStreamerFactory.class.getClassLoader().getResourceAsStream(resourceName);
124 if(input == null) {
125 throw new IOException("resource named [" + resourceName + "] not found");
126 }
127 BufferedReader reader = new BufferedReader(
128 new InputStreamReader(
129 input, "UTF-8"));
130 StringWriter writer = new StringWriter();
131 String line = null;
132 while((line = reader.readLine()) != null) {
133 writer.write(line + "\n");
134 }
135
136 return parse(writer.toString());
137 }
138
139 /**
140 * Helper: creates a "SEND" operation with the buffer
141 * telling "login was OK".
142 *
143 * @return send operation.
144 */
145 public static Op sendLoginOK() {
146 return send(parse(FRAME_LOGIN_OK));
147 }
148
149 /**
150 * Helper: creates a "SEND" operation with the buffer
151 * telling that "login failed".
152 *
153 * @return send operation.
154 */
155 public static Op sendLoginFailed() {
156 return send(parse(FRAME_LOGIN_FAILED));
157 }
158
159 /**
160 * Creates new mock streamer.
161 *
162 */
163 public MockStreamerFactory() { }
164
165 private IFormat<ByteBuffer> format = new BufferFormat();
166
167 /**
168 * Format to use when parsing ByteBuffer s.
169 * Defaults to {@link BufferFormat}.
170 *
171 * @return format.
172 */
173 public IFormat<ByteBuffer> getFormat() {
174 return format;
175 }
176
177 /**
178 * Sets format.
179 *
180 * @param val format.
181 */
182 public void setFormat(IFormat<ByteBuffer> val) {
183 format = val;
184 }
185
186 private static class ExpectOp implements Op {
187 ByteBuffer expect;
188 private ExpectOp(ByteBuffer b) {
189 expect = b;
190 }
191 private ExpectOp() {
192 this(null);
193 }
194 }
195
196 private static class SendOp implements Op {
197 ByteBuffer send;
198
199 public SendOp(ByteBuffer s) {
200 send = s;
201 }
202 }
203
204 /**
205 * Creates an operation of "EXPECT ANY BUFFER" type.
206 * This one will match any received buffer.
207 *
208 * @return operation.
209 */
210 public static Op expectAny() {
211 return new ExpectOp();
212 }
213
214 /**
215 * Creates an operation of "EXPECT A BUFFER" type.
216 * This one will try to match the model buffer
217 * against the actual one received from the upper layer.
218 *
219 * @param e - model buffer.
220 * @return operation.
221 */
222 public static Op expectBuffer(ByteBuffer e) {
223 return new ExpectOp(e);
224 }
225
226 /**
227 * Creates an operation of "SEND" type.
228 * This one sends the buffer back to the driver (and, ultimately,
229 * to the client application as a set of events).
230 *
231 * @param s buffer to send.
232 * @return operation.
233 */
234 public static Op send(ByteBuffer s) {
235 return new SendOp(s);
236 }
237
238 private List<Op> opList = new LinkedList<Op>();
239
240 /**
241 * List of operations that drive the mock streamer.
242 * <p/>
243 * List of operations specifies program that mock streamer will
244 * execute. Following operations are defined:
245 * <ul>
246 * <li>expect(ByteBuffer b): waits for the user action
247 * (i.e. a buffer that upper layer sends to the
248 * streamer. Compares that buffer sent matches exactly the
249 * one that is supplied to expect operation. If matches,
250 * continues the execution. if does not match, throws an
251 * exception. To create this kind of operation, use
252 * {@link #expectBuffer(ByteBuffer)} helper.
253 * <li>expect(): wait for a buffer and then continues
254 * (ignores the buffer content). To create this kind of operation
255 * use {@link #expectAny()} helper.
256 * <li>send(ByteBuffer b): sends the buffer back to the upper layer
257 * for decoding and presenting to the user as a an event of
258 * a set of events. To create this kind of operation
259 * user {@link #send(ByteBuffer)} helper.
260 * </ul>
261 *
262 * @return op list.
263 */
264 public List<Op> getOpList() {
265 return opList;
266 }
267
268 /**
269 * Sets op list.
270 *
271 * @param val op list.
272 */
273 public void setOpList(List<Op> val) {
274 opList = val;
275 }
276
277 private boolean trace = false;
278
279 /**
280 * Trace flag makes streamer print all frames
281 * to screen.
282 *
283 * @return trace flag.
284 */
285 public boolean getTrace() {
286 return trace;
287 }
288
289 /**
290 * Sets trace flag.
291 *
292 * @param val trace value.
293 */
294 public void setTrace(boolean val) {
295 trace = val;
296 }
297
298 private void trace(String info) {
299 if(trace) {
300 System.out.println("[" + id + "] " + info);
301 }
302 }
303
304 private String id;
305
306 /**
307 * Identification string.
308 *
309 * @return id string.
310 */
311 public String getId() {
312 return id;
313 }
314
315 /**
316 * Sets identification string.
317 *
318 * @param val id string.
319 */
320 public void setId(String val) {
321 id = val;
322 }
323
324 private void trace(String info, ByteBuffer buffer) {
325 if(trace) {
326 System.out.println("[" + id + "] " + info + "\n" + FORMAT.format(buffer));
327 }
328 }
329
330 private static ByteBuffer copyBuffer(ByteBuffer in) {
331 ByteBuffer copy = ByteBuffer.allocate(in.capacity());
332 copy.put(in.duplicate());
333 copy.flip();
334 return copy;
335 }
336
337 public IStreamer connect(String host, int port) throws IOException {
338
339 trace("connected");
340
341 final Iterator<Op> iterator = opList.iterator();
342
343 return new IStreamer() {
344
345 private final AtomicBoolean isClosed = new AtomicBoolean(false);
346 private final Queue<ByteBuffer> sentQueue = new ConcurrentLinkedQueue<ByteBuffer>();
347
348 public void close() {
349 isClosed.set(true);
350 synchronized(isClosed) {
351 isClosed.notifyAll();
352 }
353
354 trace("close() called");
355 }
356
357 public ByteBuffer read() throws IOException {
358
359 trace("read");
360
361 while(iterator.hasNext()) {
362 Op op = iterator.next();
363 if(op instanceof ExpectOp) {
364 ExpectOp eop = (ExpectOp) op;
365 ByteBuffer buffer = null;
366
367 trace("read: EXPECT is waiting for buffer");
368 while(!isClosed.get()
369 && (buffer = sentQueue.poll()) == null) {
370 synchronized(isClosed) {
371 try {
372 isClosed.wait();
373 } catch(InterruptedException ex) {
374 throw new IOException("interrupted?");
375 }
376 }
377 }
378
379 if(isClosed.get()) {
380 trace("read: closed");
381 throw new IOException("closed");
382 }
383
384 trace("read: EXPECT received buffer:", buffer);
385
386 if(eop.expect == null) {
387 trace("read: buffer matched (wildcard)");
388 } else if(eop.expect.compareTo(buffer) != 0) {
389 trace("read: model and actual buffer are not identical. Expected: ", eop.expect);
390 throw new IOException("model and actual buffers are not identical");
391 } else {
392 trace("read: buffer matched");
393 }
394 } else if(op instanceof SendOp) {
395 SendOp sop = (SendOp) op;
396
397 trace("read: SEND operation processed:", sop.send);
398
399 return sop.send;
400 } else {
401 throw new AssertionError("unexpected op: " + op);
402 }
403 }
404
405 trace("read: no more ops");
406 throw new IOException("eof: no more ops");
407 }
408
409 public void write(ByteBuffer out) throws IOException {
410 // need to copy, because caller may re-use this buffer for the next frame
411 ByteBuffer copy = copyBuffer(out);
412 trace("write:\n" + FORMAT.format(copy));
413
414 sentQueue.offer(copy);
415 synchronized(isClosed) {
416 isClosed.notifyAll();
417 }
418 }
419 };
420 }
421 }