001    /**
002     * Copyright 2007 Mike Kroutikov.
003     *
004     * This program is free software; you can redistribute it and/or modify
005     *   it under the terms of the Lesser GNU General Public License as 
006     *   published by the Free Software Foundation; either version 3 of
007     *   the License, or (at your option) any later version.
008     *
009     *   This program is distributed in the hope that it will be useful,
010     *   but WITHOUT ANY WARRANTY; without even the implied warranty of
011     *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
012     *   Lesser GNU General Public License for more details.
013     *
014     *   You should have received a copy of the Lesser GNU General Public License
015     *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
016     */
017    
018    package org.otfeed.protocol.connector;
019    
020    import org.otfeed.OTConnectionSpec;
021    import org.otfeed.event.IConnectionStateListener;
022    import org.otfeed.event.OTHost;
023    import org.otfeed.protocol.request.LoginRequest;
024    import org.otfeed.protocol.request.Header;
025    import org.otfeed.protocol.StatusEnum;
026    import org.otfeed.support.ByteReverseBufferAllocator;
027    import org.otfeed.support.ConnectionStateListener;
028    import org.otfeed.support.IBufferAllocator;
029    
030    import java.io.IOException;
031    import java.nio.ByteBuffer;
032    
033    import java.util.LinkedList;
034    import java.util.Set;
035    import java.util.HashSet;
036    
037    import java.util.concurrent.atomic.AtomicBoolean;
038    
039    import static org.otfeed.protocol.request.Util.readError;
040    
041    public class LoginStreamerFactory implements ISessionStreamerFactory {
042    
043            private final static int BUFFER_SIZE = 1024;
044    
045            private final AtomicBoolean isShutdown = new AtomicBoolean(false);
046    
047            private IBufferAllocator allocator = new ByteReverseBufferAllocator();
048    
049            public LoginStreamerFactory(IBufferAllocator all) {
050                    allocator = all;
051            }
052            
053            public LoginStreamerFactory() { }
054    
055            private long connectTimeout = 20000; // 20 seconds
056            public void setConnectTimeoutMillis(long val) { connectTimeout = val; }
057            public long getConnectTimeoutMillis() { return connectTimeout; }
058            
059            private IStreamerFactory streamerFactory = new SocketStreamerFactory(allocator);
060    
061            public IStreamerFactory getStreamerFactory() {
062                    return streamerFactory;
063            }
064            public void setStreamerFactory(IStreamerFactory val) {
065                    streamerFactory = val;
066            }
067    
068            private static class SessionStreamer implements ISessionStreamer {
069    
070                    private final IStreamer connector;
071                    private final String sessionId;
072    
073                    private SessionStreamer(IStreamer c, String sid) {
074                            connector = c;
075                            sessionId = sid;
076                    }
077    
078                    public String getSessionId() {
079                            return sessionId;
080                    }
081    
082                    public ByteBuffer read() throws IOException {
083                            return connector.read();
084                    }
085    
086                    public void write(ByteBuffer bb) throws IOException {
087                            connector.write(bb);
088                    }
089    
090                    public void close() {
091                            connector.close();
092                    }
093            }
094    
095            public ISessionStreamer connect(OTConnectionSpec spec) throws LoginFailureException, IOException {
096                    return connect(spec, new ConnectionStateListener());
097            }
098    
099            public ISessionStreamer connect(OTConnectionSpec spec, IConnectionStateListener listener)
100                                    throws IOException, LoginFailureException {
101                    String username = spec.getUsername();
102                    String password = spec.getPassword();
103    
104                    if(username == null || password == null) {
105                            throw new IllegalStateException(
106                                    "username and password must be set");
107                    }
108    
109                    Set<OTHost> triedAddresses = new HashSet<OTHost>();
110    
111                    LinkedList<OTHost> addressList = new LinkedList<OTHost>();
112                    addressList.addAll(spec.getHosts());
113    
114                    if(addressList.size() == 0) {
115                            throw new IllegalStateException(
116                                    "no connection addresses specified");
117                    }
118    
119                    long started = System.currentTimeMillis();
120    
121                    OTHost address;
122                    while((address = addressList.poll()) != null) {
123    
124                            if(triedAddresses.contains(address)) {
125                                    continue;
126                            }
127    
128                            triedAddresses.add(address);
129    
130                            listener.onConnecting(address);
131    
132                            long now = System.currentTimeMillis();
133                            if(now > started + connectTimeout) {
134                                    throw new IOException("connection timed out");
135                            }
136    
137                            if(isShutdown.get()) {
138                                    throw new IOException("shutdown");
139                            }
140    
141                            IStreamer connector = null;
142    
143                            try {
144                                            connector = streamerFactory.connect(address.getHost(), address.getPort());
145                            } catch(IOException ex) {
146    //System.out.println(ex);
147                                    continue;
148                            }
149    
150                            // connected!
151    
152                            LoginRequest request = new LoginRequest(0, username, password);
153    
154                            ByteBuffer buffer = allocator.allocate(BUFFER_SIZE);
155    
156                            request.writeRequest(buffer);
157    
158                            buffer.flip();
159    
160                            try {
161                                    connector.write(buffer);
162                                    buffer = connector.read();
163                            } catch(IOException ex) {
164                                    connector.close();
165                                    continue;
166                            } 
167    
168                            Header header = new Header(buffer);
169    
170                            listener.onConnected();
171    
172                            if(header.getStatus() != StatusEnum.OK) {
173                                    // if fatal error, stop trying
174                                    connector.close();
175    
176                                    throw new LoginFailureException(readError(header.getRequestId(), buffer));
177                            }
178    
179                            LoginRequest.Response response 
180                                    = new LoginRequest.Response(buffer);
181    
182                            if(!response.redirectFlag) {
183                                    // success!
184                                    listener.onLogin();
185                                    return new SessionStreamer(
186                                            connector, response.sessionId);
187                            }
188    
189                            connector.close();
190    
191                            OTHost redirectAddress = new OTHost(response.redirectHost, response.redirectPort);
192                            listener.onRedirect(redirectAddress);
193    
194                            addressList.remove(redirectAddress);
195                            addressList.addFirst(redirectAddress);
196                            // strangely, opentick servers do circular redirects
197                            // e.g connect to l6 gives redirect to l4, then we
198                            // connect to l4 and get bounced back to l6.
199                            // therefore, never blacklist address that we've got as
200                            // redirect
201                            triedAddresses.remove(redirectAddress);
202                    }
203    //System.out.println("all failed to respond");
204    
205                    // if here, all hosts failed. Rethrow last IO exception
206                    throw new IOException("all hosts failed to respond");
207            }
208    
209            public void shutdown() {
210                    isShutdown.set(true);
211            }
212    }