001    package org.otfeed.j2ee.ra;
002    
003    import java.io.IOException;
004    import java.io.PrintWriter;
005    import java.util.HashSet;
006    import java.util.Queue;
007    import java.util.Set;
008    import java.util.concurrent.ConcurrentLinkedQueue;
009    
010    import javax.resource.NotSupportedException;
011    import javax.resource.ResourceException;
012    import javax.resource.spi.ConnectionEvent;
013    import javax.resource.spi.ConnectionEventListener;
014    import javax.resource.spi.ConnectionRequestInfo;
015    import javax.resource.spi.LocalTransaction;
016    import javax.resource.spi.ManagedConnection;
017    import javax.resource.spi.ManagedConnectionMetaData;
018    import javax.security.auth.Subject;
019    import javax.transaction.xa.XAResource;
020    
021    import org.otfeed.IConnection;
022    import org.otfeed.event.IConnectionStateListener;
023    import org.otfeed.event.OTError;
024    import org.otfeed.event.OTHost;
025    
026    import org.slf4j.Logger;
027    import org.slf4j.LoggerFactory;
028    
029    public class OtfeedManagedConnection implements ManagedConnection {
030    
031            private static final Logger LOG = LoggerFactory.getLogger(OtfeedManagedConnection.class);
032            
033            private final IConnection engine;
034    
035            private final Set<OtfeedConnection> handles = new HashSet<OtfeedConnection>();
036            
037            final OtfeedConnectionRequestInfo connectionSpec;
038    
039            void addHandle(OtfeedConnection handle) {
040                    handles.add(handle);
041            }
042            
043            void removeHandle(OtfeedConnection handle) {
044                    handles.remove(handle);
045            }
046            
047            public OtfeedManagedConnection(
048                            OtfeedConnectionRequestInfo spec,
049                            OtfeedResourceAdapter ra
050                    ) throws ResourceException {
051                    this.connectionSpec = spec;
052                    
053                    engine = ra.createConnection(spec, stateListener);
054            }
055    
056            public void associateConnection(Object handle) throws ResourceException {
057                    LOG.trace("associating handle {} with managed connection {}", handle, this);
058                    ((OtfeedConnection) handle).associateConnection(this);
059            }
060    
061            public void shutdown() {
062                    LOG.trace("shutting down managed connection {}", this);
063                    engine.shutdown();
064            }
065            
066            public void cleanup() throws ResourceException {
067                    LOG.trace("cleanup for managed connection {}", this);
068                    for(OtfeedConnection c : handles) {
069                            c.invalidate();
070                    }
071            }
072    
073            public void destroy() throws ResourceException {
074                    LOG.trace("destroy for managed connection {}", this);
075                    shutdown();
076                    cleanup();
077                    listeners.clear();
078            }
079            
080            public Object getConnection(Subject subject, ConnectionRequestInfo info)
081                            throws ResourceException {
082                    LOG.trace("getConnection for info {}", info);
083                    OtfeedConnection c = new OtfeedConnection(this);
084                    addHandle(c);
085                    return c;
086            }
087    
088            public LocalTransaction getLocalTransaction() throws ResourceException {
089                    throw new NotSupportedException();
090            }
091    
092            public ManagedConnectionMetaData getMetaData() throws ResourceException {
093                    return new OtfeedConnectionMetaData();
094            }
095    
096            public XAResource getXAResource() throws ResourceException {
097                    throw new NotSupportedException();
098            }
099            
100            public void fireConnectionError(OTError error) {
101                    ConnectionEvent event = new ConnectionEvent(this, 
102                                    ConnectionEvent.CONNECTION_ERROR_OCCURRED, 
103                                    new IOException(error.toString()));
104                    LOG.trace("fireConnectionError: {}", error);
105                    for(ConnectionEventListener l : listeners) {
106                            l.connectionErrorOccurred(event);
107                    }
108            }
109            
110            public void fireConnectionClosed(Object handle) {
111                    ConnectionEvent event = new ConnectionEvent(this, 
112                                    ConnectionEvent.CONNECTION_CLOSED);
113                    LOG.trace("fireConnectionClosed: {}", handle);
114                    event.setConnectionHandle(handle);
115                    for(ConnectionEventListener l : listeners) {
116                            l.connectionClosed(event);
117                    }
118            }
119    
120            private final IConnectionStateListener stateListener = new IConnectionStateListener() {
121                    public void onConnected() { 
122                            LOG.trace("CONNECTED");
123                    }
124    
125                    public void onConnecting(OTHost addr) { 
126                            LOG.trace("CONNECTING");
127                    }
128    
129                    public void onError(OTError error) {
130                            LOG.trace("ERROR: {}", error);
131                            fireConnectionError(error);
132                    }
133    
134                    public void onLogin() { 
135                            LOG.trace("LOGGED IN");
136                    }
137    
138                    public void onRedirect(OTHost addr) { 
139                            LOG.trace("REDIRECTED TO {}", addr);
140                    }
141            };
142    
143            private final Queue<ConnectionEventListener> listeners = new ConcurrentLinkedQueue<ConnectionEventListener>();
144            public void addConnectionEventListener(ConnectionEventListener val) {
145                    listeners.offer(val);
146            }
147    
148            public void removeConnectionEventListener(ConnectionEventListener val) {
149                    listeners.remove(val);
150            }
151            
152            private PrintWriter logWriter;
153            
154            public PrintWriter getLogWriter() throws ResourceException {
155                    return logWriter;
156            }
157    
158            public void setLogWriter(PrintWriter val) throws ResourceException {
159                    logWriter = val;
160            }
161            
162            IConnection getEngine() {
163                    return engine;
164            }
165    }