summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java164
1 files changed, 98 insertions, 66 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 2d8074eea2..e92817f713 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -43,14 +43,17 @@ import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,28 +103,29 @@ import java.util.concurrent.CountDownLatch;
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Create the filter chain to filter this handlers events.
- * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
+ * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
*
* <tr><td> Maintain fail-over state.
* <tr><td>
* </table>
*
* @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
- * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
- * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
- * filter before it mean not doing the read/write asynchronously but in the main filter thread?
- *
+ * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
+ * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
+ * filter before it mean not doing the read/write asynchronously but in the main filter thread?
* @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
- * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
- * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
- * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
- * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
- * that lifecycles of the fields match lifecycles of their containing objects.
+ * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
+ * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
+ * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
+ * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
+ * that lifecycles of the fields match lifecycles of their containing objects.
*/
public class AMQProtocolHandler extends IoHandlerAdapter
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
+ private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol");
+ private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level") != null);
/**
* The connection that this protocol handler is associated with. There is a 1-1 mapping between connection
@@ -136,7 +140,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private AMQStateManager _stateManager = new AMQStateManager();
/** Holds the method listeners, */
- private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
+ private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
/**
* We create the failover handler when the session is created since it needs a reference to the IoSession in order
@@ -154,14 +158,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
private CountDownLatch _failoverLatch;
-
/** The last failover exception that occured */
private FailoverException _lastFailoverException;
/** Defines the default timeout to use for synchronous protocol commands. */
private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
-
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -245,11 +247,27 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
}
}
- _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
+ _protocolSession = new AMQProtocolSession(this, session, _connection);
+
+ _stateManager.setProtocolSession(_protocolSession);
+
_protocolSession.init();
}
/**
+ * Called when we want to create a new IoTransport session
+ * @param brokerDetail
+ */
+ public void createIoTransportSession(BrokerDetails brokerDetail)
+ {
+ _protocolSession = new AMQProtocolSession(this, _connection);
+ _stateManager.setProtocolSession(_protocolSession);
+ IoTransport.connect_0_9(getProtocolSession(),
+ brokerDetail.getHost(), brokerDetail.getPort());
+ _protocolSession.init();
+ }
+
+ /**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
* that the connection be closed, in which case nothing is done, or because the connection died. In the case
* where the connection died, an attempt to failover automatically to a new connection may be started. The failover
@@ -263,7 +281,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* @param session The MINA session.
*
* @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
- * not otherwise? The above comment doesn't make that clear.
+ * not otherwise? The above comment doesn't make that clear.
*/
public void sessionClosed(IoSession session)
{
@@ -374,7 +392,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
"cause isn't AMQConnectionClosedException: " + cause, cause);
AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
- propagateExceptionToWaiters(amqe);
+ propagateExceptionToAllWaiters(amqe);
}
_connection.exceptionReceived(cause);
@@ -395,7 +413,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
// we notify the state manager of the error in case we have any clients waiting on a state
// change. Those "waiters" will be interrupted and can handle the exception
AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
- propagateExceptionToWaiters(amqe);
+ propagateExceptionToAllWaiters(amqe);
_connection.exceptionReceived(cause);
}
}
@@ -405,11 +423,33 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
* of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
*
+ * This should be called only when the exception is fatal for the connection.
+ *
* @param e the exception to propagate
+ *
+ * @see #propagateExceptionToFrameListeners
+ * @see #propagateExceptionToStateWaiters
*/
- public void propagateExceptionToWaiters(Exception e)
+ public void propagateExceptionToAllWaiters(Exception e)
+ {
+ propagateExceptionToFrameListeners(e);
+ propagateExceptionToStateWaiters(e);
+ }
+
+ /**
+ * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any
+ * protocol level waits.
+ *
+ * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should
+ * stop waiting and relinquish the Failover lock {@see FailoverHandler}.
+ *
+ * Once the {@link FailoverHandler} has re-established the connection then the listeners will be able to re-attempt
+ * their protocol request and so listen again for the correct frame.
+ *
+ * @param e the exception to propagate
+ */
+ public void propagateExceptionToFrameListeners(Exception e)
{
-
if (!_frameListeners.isEmpty())
{
final Iterator it = _frameListeners.iterator();
@@ -421,6 +461,22 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
}
+ /**
+ * This caters for the case where we only need to propogate an exception to the the state manager to interupt any
+ * thing waiting for a state change.
+ *
+ * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement.
+ *
+ * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal
+ * cases {@link #propagateExceptionToAllWaiters} would be the correct choice.
+ *
+ * @param e the exception to propagate
+ */
+ public void propagateExceptionToStateWaiters(Exception e)
+ {
+ getStateManager().error(e);
+ }
+
public void notifyFailoverStarting()
{
// Set the last exception in the sync block to ensure the ordering with add.
@@ -431,7 +487,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_lastFailoverException = new FailoverException("Failing over about to start");
}
- propagateExceptionToWaiters(_lastFailoverException);
+ //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be
+ // interupted unless failover cannot restore the state.
+ propagateExceptionToFrameListeners(_lastFailoverException);
}
public void failoverInProgress()
@@ -443,6 +501,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void messageReceived(IoSession session, Object message) throws Exception
{
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+ }
+
if(message instanceof AMQFrame)
{
final boolean debug = _logger.isDebugEnabled();
@@ -459,7 +522,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- bodyFrame.handle(frame.getChannel(),_protocolSession);
+ bodyFrame.handle(frame.getChannel(), _protocolSession);
_connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
}
@@ -508,20 +571,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
if (!wasAnyoneInterested)
{
throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
- + _frameListeners, null);
+ + _frameListeners, null);
}
}
catch (AMQException e)
- {
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
- }
- }
+ {
+ propagateExceptionToFrameListeners(e);
exceptionCaught(session, e);
}
@@ -532,6 +587,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void messageSent(IoSession session, Object message) throws Exception
{
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.debug(String.format("SEND: [%s] %s", this, message));
+ }
+
final long sentMessages = _messagesOut++;
final boolean debug = _logger.isDebugEnabled();
@@ -542,34 +602,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
_connection.bytesSent(session.getWrittenBytes());
- if (debug)
- {
- _logger.debug("Sent frame " + message);
- }
- }
-
- /*
- public void addFrameListener(AMQMethodListener listener)
- {
- _frameListeners.add(listener);
- }
-
- public void removeFrameListener(AMQMethodListener listener)
- {
- _frameListeners.remove(listener);
- }
- */
- public void attainState(AMQState s) throws AMQException
- {
- getStateManager().attainState(s);
}
- public AMQState attainState(Set<AMQState> states) throws AMQException
+ public StateWaiter createWaiter(Set<AMQState> states) throws AMQException
{
- return getStateManager().attainState(states);
+ return getStateManager().createWaiter(states);
}
-
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
* getProtocolSession().write().
@@ -617,14 +656,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
throw _lastFailoverException;
}
-
+
_frameListeners.add(listener);
}
_protocolSession.writeFrame(frame);
- AMQMethodEvent e = listener.blockForFrame(timeout);
-
- return e;
+ return listener.blockForFrame(timeout);
// When control resumes before this line, a reply will have been received
// that matches the criteria defined in the blocking listener
}
@@ -669,8 +706,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
getStateManager().changeState(AMQState.CONNECTION_CLOSING);
ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection."),0,0);
-
+ new AMQShortString("JMS client is closing the connection."), 0, 0);
final AMQFrame frame = body.generateFrame(0);
@@ -745,10 +781,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void setStateManager(AMQStateManager stateManager)
{
_stateManager = stateManager;
- if (_protocolSession != null)
- {
- _protocolSession.setStateManager(stateManager);
- }
}
public AMQProtocolSession getProtocolSession()
@@ -778,7 +810,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public MethodRegistry getMethodRegistry()
{
- return getStateManager().getMethodRegistry();
+ return _protocolSession.getMethodRegistry();
}
public ProtocolVersion getProtocolVersion()