001 package org.otfeed.j2ee.ra;
002
003 import java.util.HashMap;
004 import java.util.Map;
005 import java.util.Set;
006 import java.util.concurrent.Executor;
007
008 import javax.resource.ResourceException;
009 import javax.resource.spi.ActivationSpec;
010 import javax.resource.spi.BootstrapContext;
011 import javax.resource.spi.ResourceAdapter;
012 import javax.resource.spi.ResourceAdapterInternalException;
013 import javax.resource.spi.endpoint.MessageEndpointFactory;
014 import javax.resource.spi.work.WorkManager;
015 import javax.transaction.xa.XAResource;
016
017 import org.otfeed.IConnection;
018 import org.otfeed.event.IConnectionStateListener;
019 import org.otfeed.event.OTError;
020 import org.otfeed.event.OTHost;
021 import org.otfeed.protocol.connector.LoginStreamerFactory;
022 import org.otfeed.protocol.connector.OTEngine;
023 import org.otfeed.support.ByteReverseBufferAllocator;
024 import org.otfeed.support.ConnectionStateListener;
025 import org.otfeed.support.IBufferAllocator;
026 import org.slf4j.Logger;
027 import org.slf4j.LoggerFactory;
028
029 public class OtfeedResourceAdapter implements ResourceAdapter {
030
031 private static final Logger LOG = LoggerFactory.getLogger(OtfeedResourceAdapter.class);
032
033 static final IBufferAllocator ALLOCATOR = new ByteReverseBufferAllocator();
034
035 private final Map<OtfeedConnectionRequestInfo,OtfeedInboundConnection> connections = new HashMap<OtfeedConnectionRequestInfo,OtfeedInboundConnection>();
036
037 IConnection createConnection(OtfeedConnectionRequestInfo spec, IConnectionStateListener listener) throws ResourceException {
038 LOG.trace("creating new connetion for spec: {}", spec);
039
040 String username = spec.getUsername();
041 if(username == null) {
042 throw new ResourceException("misconfiguration: 'username' property not specified in activation spec (and no 'defaultUsername' set on resource adapter)");
043 }
044
045 String password = spec.getPassword();
046 if(password == null) {
047 throw new ResourceException("misconfiguration: 'password' property not specified in activation spec (and no 'defaultPassword' set on resource adapter)");
048 }
049
050 Set<OTHost> hosts = spec.getHosts();
051 if(hosts == null || hosts.isEmpty()) {
052 throw new ResourceException("misconfiguration: 'hosts' property not specified in activation spec (and no 'defaultAddressList' set on resource adapter)");
053 }
054
055 Long heartbeatInterval = spec.getHeartbeatInterval();
056 if(heartbeatInterval == null) {
057 throw new ResourceException("misconfiguration: 'heartbeatInterval' property not specified in activation spec (and no 'defaultHeartbeatInterval' set on resource adapter)");
058 }
059
060 LoginStreamerFactory streamerFactory = new LoginStreamerFactory(ALLOCATOR);
061
062 return new OTEngine(streamerFactory, spec, ALLOCATOR, executor, listener);
063 }
064
065 private OtfeedInboundConnection findOrCreateConnection(final OtfeedConnectionRequestInfo spec) throws ResourceException {
066 synchronized(connections) {
067 OtfeedInboundConnection connection = connections.get(spec);
068 if(connection != null) {
069 LOG.trace("found cached connection for {}", spec);
070 return connection;
071 }
072
073 IConnection engine = createConnection(spec, new ConnectionStateListener() {
074 @Override
075 public void onError(OTError error) {
076 synchronized (connections) {
077 OtfeedInboundConnection me = connections.remove(spec);
078 LOG.trace("removed dead connection, spec: {}", spec);
079 me.shutdown();
080 }
081 }
082 });
083
084 connection = new OtfeedInboundConnection(engine);
085
086 connections.put(spec, connection);
087 return connection;
088 }
089 }
090
091 public void endpointActivation(MessageEndpointFactory endpointFactory,
092 ActivationSpec activationSpec) throws ResourceException {
093 LOG.trace("activating spec: {}", activationSpec);
094
095 if(!(activationSpec instanceof OtfeedActivationSpec)) {
096 throw new ResourceException("unsupported activation spec: " + activationSpec.getClass());
097 }
098
099 // merge connection spec with the default values
100 // note that provided ones take precedence
101 OtfeedConnectionRequestInfo spec = OtfeedConnectionRequestInfo.merge(
102 (OtfeedActivationSpec) activationSpec,
103 connectionDefaults
104 );
105 LOG.trace("merged connection spec: {}", spec);
106
107 OtfeedInboundConnection connection = findOrCreateConnection(spec);
108 LOG.trace("created connection");
109
110 connection.deployEndpoint(endpointFactory);
111 LOG.trace("endpoint successfully activated");
112 }
113
114 public void endpointDeactivation(MessageEndpointFactory factory,
115 ActivationSpec activationSpec) {
116 // merge connection spec with the default values
117 // note that provided ones take precedence
118 OtfeedConnectionRequestInfo spec = OtfeedConnectionRequestInfo.merge(
119 (OtfeedActivationSpec) activationSpec,
120 connectionDefaults
121 );
122 LOG.trace("deactivating [merged] spec: {}", spec);
123 }
124
125 public XAResource[] getXAResources(ActivationSpec[] spec)
126 throws ResourceException {
127 return null;
128 }
129
130 private Executor executor;
131
132 public void start(BootstrapContext ctx)
133 throws ResourceAdapterInternalException {
134 LOG.trace("start");
135 WorkManager workManager = ctx.getWorkManager();
136 executor = new WorkManagerExecutor(workManager);
137 LOG.trace("start completed");
138 }
139
140 public void stop() {
141 LOG.trace("stop");
142 synchronized(connections) {
143 for(OtfeedInboundConnection c : connections.values()) {
144 try {
145 c.shutdown();
146 } catch(Exception ex) {
147 LOG.warn("exception when closing connection", ex);
148 }
149 }
150 }
151 LOG.trace("stop completed");
152 }
153
154 final OtfeedConnectionRequestInfo connectionDefaults = new OtfeedConnectionRequestInfo();
155
156 public String getDefaultUsername() {
157 return connectionDefaults.getUsername();
158 }
159
160 public void setDefaultUsername(String username) {
161 LOG.trace("RA: setting default username to {}", username);
162 connectionDefaults.setUsername(username);
163 }
164
165 public String getDefaultPassword() {
166 return connectionDefaults.getPassword();
167 }
168
169 public void setDefaultPassword(String password) {
170 LOG.trace("RA: setting default password to {}", "*****");
171 connectionDefaults.setPassword(password);
172 }
173
174 public String getDefaultHostsString() {
175 return connectionDefaults.getHostsString();
176 }
177
178 public void setDefaultHostsString(String address) {
179 LOG.trace("RA: setting address list to [{}]", address);
180 connectionDefaults.setHostsString(address);
181 }
182
183 public Long getDefaultHeartbeatInterval() {
184 return connectionDefaults.getHeartbeatInterval();
185 }
186
187 public void setDefaultHeartbeatInterval(Long val) {
188 LOG.trace("RA: setting heartbeat interval to {}", val);
189 connectionDefaults.setHeartbeatInterval(val);
190 }
191 }