001 /**
002 * Copyright 2007 Mike Kroutikov.
003 *
004 * This program is free software; you can redistribute it and/or modify
005 * it under the terms of the Lesser GNU General Public License as
006 * published by the Free Software Foundation; either version 3 of
007 * the License, or (at your option) any later version.
008 *
009 * This program is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012 * Lesser GNU General Public License for more details.
013 *
014 * You should have received a copy of the Lesser GNU General Public License
015 * along with this program. If not, see <http://www.gnu.org/licenses/>.
016 */
017
018 package org.otfeed.protocol.connector;
019
020 import org.otfeed.OTConnectionSpec;
021 import org.otfeed.event.IConnectionStateListener;
022 import org.otfeed.event.OTHost;
023 import org.otfeed.protocol.request.LoginRequest;
024 import org.otfeed.protocol.request.Header;
025 import org.otfeed.protocol.StatusEnum;
026 import org.otfeed.support.ByteReverseBufferAllocator;
027 import org.otfeed.support.ConnectionStateListener;
028 import org.otfeed.support.IBufferAllocator;
029
030 import java.io.IOException;
031 import java.nio.ByteBuffer;
032
033 import java.util.LinkedList;
034 import java.util.Set;
035 import java.util.HashSet;
036
037 import java.util.concurrent.atomic.AtomicBoolean;
038
039 import static org.otfeed.protocol.request.Util.readError;
040
041 public class LoginStreamerFactory implements ISessionStreamerFactory {
042
043 private final static int BUFFER_SIZE = 1024;
044
045 private final AtomicBoolean isShutdown = new AtomicBoolean(false);
046
047 private IBufferAllocator allocator = new ByteReverseBufferAllocator();
048
049 public LoginStreamerFactory(IBufferAllocator all) {
050 allocator = all;
051 }
052
053 public LoginStreamerFactory() { }
054
055 private long connectTimeout = 20000; // 20 seconds
056 public void setConnectTimeoutMillis(long val) { connectTimeout = val; }
057 public long getConnectTimeoutMillis() { return connectTimeout; }
058
059 private IStreamerFactory streamerFactory = new SocketStreamerFactory(allocator);
060
061 public IStreamerFactory getStreamerFactory() {
062 return streamerFactory;
063 }
064 public void setStreamerFactory(IStreamerFactory val) {
065 streamerFactory = val;
066 }
067
068 private static class SessionStreamer implements ISessionStreamer {
069
070 private final IStreamer connector;
071 private final String sessionId;
072
073 private SessionStreamer(IStreamer c, String sid) {
074 connector = c;
075 sessionId = sid;
076 }
077
078 public String getSessionId() {
079 return sessionId;
080 }
081
082 public ByteBuffer read() throws IOException {
083 return connector.read();
084 }
085
086 public void write(ByteBuffer bb) throws IOException {
087 connector.write(bb);
088 }
089
090 public void close() {
091 connector.close();
092 }
093 }
094
095 public ISessionStreamer connect(OTConnectionSpec spec) throws LoginFailureException, IOException {
096 return connect(spec, new ConnectionStateListener());
097 }
098
099 public ISessionStreamer connect(OTConnectionSpec spec, IConnectionStateListener listener)
100 throws IOException, LoginFailureException {
101 String username = spec.getUsername();
102 String password = spec.getPassword();
103
104 if(username == null || password == null) {
105 throw new IllegalStateException(
106 "username and password must be set");
107 }
108
109 Set<OTHost> triedAddresses = new HashSet<OTHost>();
110
111 LinkedList<OTHost> addressList = new LinkedList<OTHost>();
112 addressList.addAll(spec.getHosts());
113
114 if(addressList.size() == 0) {
115 throw new IllegalStateException(
116 "no connection addresses specified");
117 }
118
119 long started = System.currentTimeMillis();
120
121 OTHost address;
122 while((address = addressList.poll()) != null) {
123
124 if(triedAddresses.contains(address)) {
125 continue;
126 }
127
128 triedAddresses.add(address);
129
130 listener.onConnecting(address);
131
132 long now = System.currentTimeMillis();
133 if(now > started + connectTimeout) {
134 throw new IOException("connection timed out");
135 }
136
137 if(isShutdown.get()) {
138 throw new IOException("shutdown");
139 }
140
141 IStreamer connector = null;
142
143 try {
144 connector = streamerFactory.connect(address.getHost(), address.getPort());
145 } catch(IOException ex) {
146 //System.out.println(ex);
147 continue;
148 }
149
150 // connected!
151
152 LoginRequest request = new LoginRequest(0, username, password);
153
154 ByteBuffer buffer = allocator.allocate(BUFFER_SIZE);
155
156 request.writeRequest(buffer);
157
158 buffer.flip();
159
160 try {
161 connector.write(buffer);
162 buffer = connector.read();
163 } catch(IOException ex) {
164 connector.close();
165 continue;
166 }
167
168 Header header = new Header(buffer);
169
170 listener.onConnected();
171
172 if(header.getStatus() != StatusEnum.OK) {
173 // if fatal error, stop trying
174 connector.close();
175
176 throw new LoginFailureException(readError(header.getRequestId(), buffer));
177 }
178
179 LoginRequest.Response response
180 = new LoginRequest.Response(buffer);
181
182 if(!response.redirectFlag) {
183 // success!
184 listener.onLogin();
185 return new SessionStreamer(
186 connector, response.sessionId);
187 }
188
189 connector.close();
190
191 OTHost redirectAddress = new OTHost(response.redirectHost, response.redirectPort);
192 listener.onRedirect(redirectAddress);
193
194 addressList.remove(redirectAddress);
195 addressList.addFirst(redirectAddress);
196 // strangely, opentick servers do circular redirects
197 // e.g connect to l6 gives redirect to l4, then we
198 // connect to l4 and get bounced back to l6.
199 // therefore, never blacklist address that we've got as
200 // redirect
201 triedAddresses.remove(redirectAddress);
202 }
203 //System.out.println("all failed to respond");
204
205 // if here, all hosts failed. Rethrow last IO exception
206 throw new IOException("all hosts failed to respond");
207 }
208
209 public void shutdown() {
210 isShutdown.set(true);
211 }
212 }