001 package org.otfeed.j2ee.ra;
002
003 import java.io.IOException;
004 import java.io.PrintWriter;
005 import java.util.HashSet;
006 import java.util.Queue;
007 import java.util.Set;
008 import java.util.concurrent.ConcurrentLinkedQueue;
009
010 import javax.resource.NotSupportedException;
011 import javax.resource.ResourceException;
012 import javax.resource.spi.ConnectionEvent;
013 import javax.resource.spi.ConnectionEventListener;
014 import javax.resource.spi.ConnectionRequestInfo;
015 import javax.resource.spi.LocalTransaction;
016 import javax.resource.spi.ManagedConnection;
017 import javax.resource.spi.ManagedConnectionMetaData;
018 import javax.security.auth.Subject;
019 import javax.transaction.xa.XAResource;
020
021 import org.otfeed.IConnection;
022 import org.otfeed.event.IConnectionStateListener;
023 import org.otfeed.event.OTError;
024 import org.otfeed.event.OTHost;
025
026 import org.slf4j.Logger;
027 import org.slf4j.LoggerFactory;
028
029 public class OtfeedManagedConnection implements ManagedConnection {
030
031 private static final Logger LOG = LoggerFactory.getLogger(OtfeedManagedConnection.class);
032
033 private final IConnection engine;
034
035 private final Set<OtfeedConnection> handles = new HashSet<OtfeedConnection>();
036
037 final OtfeedConnectionRequestInfo connectionSpec;
038
039 void addHandle(OtfeedConnection handle) {
040 handles.add(handle);
041 }
042
043 void removeHandle(OtfeedConnection handle) {
044 handles.remove(handle);
045 }
046
047 public OtfeedManagedConnection(
048 OtfeedConnectionRequestInfo spec,
049 OtfeedResourceAdapter ra
050 ) throws ResourceException {
051 this.connectionSpec = spec;
052
053 engine = ra.createConnection(spec, stateListener);
054 }
055
056 public void associateConnection(Object handle) throws ResourceException {
057 LOG.trace("associating handle {} with managed connection {}", handle, this);
058 ((OtfeedConnection) handle).associateConnection(this);
059 }
060
061 public void shutdown() {
062 LOG.trace("shutting down managed connection {}", this);
063 engine.shutdown();
064 }
065
066 public void cleanup() throws ResourceException {
067 LOG.trace("cleanup for managed connection {}", this);
068 for(OtfeedConnection c : handles) {
069 c.invalidate();
070 }
071 }
072
073 public void destroy() throws ResourceException {
074 LOG.trace("destroy for managed connection {}", this);
075 shutdown();
076 cleanup();
077 listeners.clear();
078 }
079
080 public Object getConnection(Subject subject, ConnectionRequestInfo info)
081 throws ResourceException {
082 LOG.trace("getConnection for info {}", info);
083 OtfeedConnection c = new OtfeedConnection(this);
084 addHandle(c);
085 return c;
086 }
087
088 public LocalTransaction getLocalTransaction() throws ResourceException {
089 throw new NotSupportedException();
090 }
091
092 public ManagedConnectionMetaData getMetaData() throws ResourceException {
093 return new OtfeedConnectionMetaData();
094 }
095
096 public XAResource getXAResource() throws ResourceException {
097 throw new NotSupportedException();
098 }
099
100 public void fireConnectionError(OTError error) {
101 ConnectionEvent event = new ConnectionEvent(this,
102 ConnectionEvent.CONNECTION_ERROR_OCCURRED,
103 new IOException(error.toString()));
104 LOG.trace("fireConnectionError: {}", error);
105 for(ConnectionEventListener l : listeners) {
106 l.connectionErrorOccurred(event);
107 }
108 }
109
110 public void fireConnectionClosed(Object handle) {
111 ConnectionEvent event = new ConnectionEvent(this,
112 ConnectionEvent.CONNECTION_CLOSED);
113 LOG.trace("fireConnectionClosed: {}", handle);
114 event.setConnectionHandle(handle);
115 for(ConnectionEventListener l : listeners) {
116 l.connectionClosed(event);
117 }
118 }
119
120 private final IConnectionStateListener stateListener = new IConnectionStateListener() {
121 public void onConnected() {
122 LOG.trace("CONNECTED");
123 }
124
125 public void onConnecting(OTHost addr) {
126 LOG.trace("CONNECTING");
127 }
128
129 public void onError(OTError error) {
130 LOG.trace("ERROR: {}", error);
131 fireConnectionError(error);
132 }
133
134 public void onLogin() {
135 LOG.trace("LOGGED IN");
136 }
137
138 public void onRedirect(OTHost addr) {
139 LOG.trace("REDIRECTED TO {}", addr);
140 }
141 };
142
143 private final Queue<ConnectionEventListener> listeners = new ConcurrentLinkedQueue<ConnectionEventListener>();
144 public void addConnectionEventListener(ConnectionEventListener val) {
145 listeners.offer(val);
146 }
147
148 public void removeConnectionEventListener(ConnectionEventListener val) {
149 listeners.remove(val);
150 }
151
152 private PrintWriter logWriter;
153
154 public PrintWriter getLogWriter() throws ResourceException {
155 return logWriter;
156 }
157
158 public void setLogWriter(PrintWriter val) throws ResourceException {
159 logWriter = val;
160 }
161
162 IConnection getEngine() {
163 return engine;
164 }
165 }