001    package org.otfeed.j2ee.ra;
002    
003    import java.util.HashMap;
004    import java.util.Map;
005    import java.util.Set;
006    import java.util.concurrent.Executor;
007    
008    import javax.resource.ResourceException;
009    import javax.resource.spi.ActivationSpec;
010    import javax.resource.spi.BootstrapContext;
011    import javax.resource.spi.ResourceAdapter;
012    import javax.resource.spi.ResourceAdapterInternalException;
013    import javax.resource.spi.endpoint.MessageEndpointFactory;
014    import javax.resource.spi.work.WorkManager;
015    import javax.transaction.xa.XAResource;
016    
017    import org.otfeed.IConnection;
018    import org.otfeed.event.IConnectionStateListener;
019    import org.otfeed.event.OTError;
020    import org.otfeed.event.OTHost;
021    import org.otfeed.protocol.connector.LoginStreamerFactory;
022    import org.otfeed.protocol.connector.OTEngine;
023    import org.otfeed.support.ByteReverseBufferAllocator;
024    import org.otfeed.support.ConnectionStateListener;
025    import org.otfeed.support.IBufferAllocator;
026    import org.slf4j.Logger;
027    import org.slf4j.LoggerFactory;
028    
029    public class OtfeedResourceAdapter implements ResourceAdapter {
030            
031            private static final Logger LOG = LoggerFactory.getLogger(OtfeedResourceAdapter.class);
032    
033            static final IBufferAllocator ALLOCATOR = new ByteReverseBufferAllocator();
034            
035            private final Map<OtfeedConnectionRequestInfo,OtfeedInboundConnection> connections = new HashMap<OtfeedConnectionRequestInfo,OtfeedInboundConnection>();
036            
037            IConnection createConnection(OtfeedConnectionRequestInfo spec, IConnectionStateListener listener) throws ResourceException {
038                    LOG.trace("creating new connetion for spec: {}", spec);
039                    
040                    String username = spec.getUsername();
041                    if(username == null) {
042                            throw new ResourceException("misconfiguration: 'username' property not specified in activation spec (and no 'defaultUsername' set on resource adapter)");
043                    }
044                            
045                    String password = spec.getPassword();
046                    if(password == null) {
047                            throw new ResourceException("misconfiguration: 'password' property not specified in activation spec (and no 'defaultPassword' set on resource adapter)");
048                    }
049    
050                    Set<OTHost> hosts = spec.getHosts();
051                    if(hosts == null || hosts.isEmpty()) {
052                            throw new ResourceException("misconfiguration: 'hosts' property not specified in activation spec (and no 'defaultAddressList' set on resource adapter)");
053                    }
054    
055                    Long heartbeatInterval = spec.getHeartbeatInterval();
056                    if(heartbeatInterval == null) {
057                            throw new ResourceException("misconfiguration: 'heartbeatInterval' property not specified in activation spec (and no 'defaultHeartbeatInterval' set on resource adapter)");
058                    }
059    
060                    LoginStreamerFactory streamerFactory = new LoginStreamerFactory(ALLOCATOR);
061                    
062                    return new OTEngine(streamerFactory, spec, ALLOCATOR, executor, listener);
063            }
064            
065            private OtfeedInboundConnection findOrCreateConnection(final OtfeedConnectionRequestInfo spec) throws ResourceException {
066                    synchronized(connections) {
067                            OtfeedInboundConnection connection = connections.get(spec);
068                            if(connection != null) {
069                                    LOG.trace("found cached connection for {}", spec);
070                                    return connection;
071                            }
072                            
073                            IConnection engine = createConnection(spec, new ConnectionStateListener() {
074                                    @Override
075                                    public void onError(OTError error) {
076                                            synchronized (connections) {
077                                                    OtfeedInboundConnection me = connections.remove(spec);
078                                                    LOG.trace("removed dead connection, spec: {}", spec);
079                                                    me.shutdown();
080                                            }
081                                    }
082                            });
083                            
084                            connection = new OtfeedInboundConnection(engine);
085                            
086                            connections.put(spec, connection);
087                            return connection;
088                    }
089            }
090    
091            public void endpointActivation(MessageEndpointFactory endpointFactory,
092                            ActivationSpec activationSpec) throws ResourceException {
093                    LOG.trace("activating spec: {}", activationSpec);
094    
095                    if(!(activationSpec instanceof OtfeedActivationSpec)) {
096                            throw new ResourceException("unsupported activation spec: " + activationSpec.getClass());
097                    }
098                    
099                    // merge connection spec with the default values
100                    // note that provided ones take precedence
101                    OtfeedConnectionRequestInfo spec = OtfeedConnectionRequestInfo.merge(
102                                    (OtfeedActivationSpec) activationSpec,
103                                    connectionDefaults
104                                    );
105                    LOG.trace("merged connection spec: {}", spec);
106                    
107                    OtfeedInboundConnection connection = findOrCreateConnection(spec);
108                    LOG.trace("created connection");
109                    
110                    connection.deployEndpoint(endpointFactory);
111                    LOG.trace("endpoint successfully activated");
112            }
113    
114            public void endpointDeactivation(MessageEndpointFactory factory,
115                            ActivationSpec activationSpec) {
116                    // merge connection spec with the default values
117                    // note that provided ones take precedence
118                    OtfeedConnectionRequestInfo spec = OtfeedConnectionRequestInfo.merge(
119                                    (OtfeedActivationSpec) activationSpec,
120                                    connectionDefaults
121                                    );
122                    LOG.trace("deactivating [merged] spec: {}", spec);
123            }
124    
125            public XAResource[] getXAResources(ActivationSpec[] spec)
126                            throws ResourceException {
127                    return null;
128            }
129            
130            private Executor executor;
131    
132            public void start(BootstrapContext ctx)
133                            throws ResourceAdapterInternalException {
134                    LOG.trace("start");
135                    WorkManager workManager = ctx.getWorkManager();
136                    executor = new WorkManagerExecutor(workManager);
137                    LOG.trace("start completed");
138            }
139    
140            public void stop() {
141                    LOG.trace("stop");
142                    synchronized(connections) {
143                            for(OtfeedInboundConnection c : connections.values()) {
144                                    try {
145                                            c.shutdown();
146                                    } catch(Exception ex) {
147                                            LOG.warn("exception when closing connection", ex);
148                                    }
149                            }
150                    }
151                    LOG.trace("stop completed");
152            }
153            
154            final OtfeedConnectionRequestInfo connectionDefaults = new OtfeedConnectionRequestInfo();
155    
156            public String getDefaultUsername() {
157                    return connectionDefaults.getUsername();
158            }
159    
160            public void setDefaultUsername(String username) {
161                    LOG.trace("RA: setting default username to {}", username);
162                    connectionDefaults.setUsername(username);
163            }
164    
165            public String getDefaultPassword() {
166                    return connectionDefaults.getPassword();
167            }
168    
169            public void setDefaultPassword(String password) {
170                    LOG.trace("RA: setting default password to {}", "*****");
171                    connectionDefaults.setPassword(password);
172            }
173    
174            public String getDefaultHostsString() {
175                    return connectionDefaults.getHostsString();
176            }
177    
178            public void setDefaultHostsString(String address) {
179                    LOG.trace("RA: setting address list to [{}]", address);
180                    connectionDefaults.setHostsString(address);
181            }
182            
183            public Long getDefaultHeartbeatInterval() {
184                    return connectionDefaults.getHeartbeatInterval();
185            }
186            
187            public void setDefaultHeartbeatInterval(Long val) {
188                    LOG.trace("RA: setting heartbeat interval to {}", val);
189                    connectionDefaults.setHeartbeatInterval(val);
190            }
191    }