diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 164 |
1 files changed, 98 insertions, 66 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 2d8074eea2..e92817f713 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -43,14 +43,17 @@ import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.*; +import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,28 +103,29 @@ import java.util.concurrent.CountDownLatch; * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations * <tr><td> Create the filter chain to filter this handlers events. - * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. + * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. * * <tr><td> Maintain fail-over state. * <tr><td> * </table> * * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the - * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing - * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec - * filter before it mean not doing the read/write asynchronously but in the main filter thread? - * + * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing + * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec + * filter before it mean not doing the read/write asynchronously but in the main filter thread? * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including - * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of - * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could - * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data - * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so - * that lifecycles of the fields match lifecycles of their containing objects. + * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of + * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could + * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data + * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so + * that lifecycles of the fields match lifecycles of their containing objects. */ public class AMQProtocolHandler extends IoHandlerAdapter { /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class); + private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol"); + private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level") != null); /** * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection @@ -136,7 +140,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter private AMQStateManager _stateManager = new AMQStateManager(); /** Holds the method listeners, */ - private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); + private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); /** * We create the failover handler when the session is created since it needs a reference to the IoSession in order @@ -154,14 +158,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ private CountDownLatch _failoverLatch; - /** The last failover exception that occured */ private FailoverException _lastFailoverException; /** Defines the default timeout to use for synchronous protocol commands. */ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; - /** * Creates a new protocol handler, associated with the specified client connection instance. * @@ -245,11 +247,27 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); } } - _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); + _protocolSession = new AMQProtocolSession(this, session, _connection); + + _stateManager.setProtocolSession(_protocolSession); + _protocolSession.init(); } /** + * Called when we want to create a new IoTransport session + * @param brokerDetail + */ + public void createIoTransportSession(BrokerDetails brokerDetail) + { + _protocolSession = new AMQProtocolSession(this, _connection); + _stateManager.setProtocolSession(_protocolSession); + IoTransport.connect_0_9(getProtocolSession(), + brokerDetail.getHost(), brokerDetail.getPort()); + _protocolSession.init(); + } + + /** * Called when the network connection is closed. This can happen, either because the client explicitly requested * that the connection be closed, in which case nothing is done, or because the connection died. In the case * where the connection died, an attempt to failover automatically to a new connection may be started. The failover @@ -263,7 +281,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param session The MINA session. * * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and - * not otherwise? The above comment doesn't make that clear. + * not otherwise? The above comment doesn't make that clear. */ public void sessionClosed(IoSession session) { @@ -374,7 +392,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter "cause isn't AMQConnectionClosedException: " + cause, cause); AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); - propagateExceptionToWaiters(amqe); + propagateExceptionToAllWaiters(amqe); } _connection.exceptionReceived(cause); @@ -395,7 +413,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter // we notify the state manager of the error in case we have any clients waiting on a state // change. Those "waiters" will be interrupted and can handle the exception AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); - propagateExceptionToWaiters(amqe); + propagateExceptionToAllWaiters(amqe); _connection.exceptionReceived(cause); } } @@ -405,11 +423,33 @@ public class AMQProtocolHandler extends IoHandlerAdapter * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. * + * This should be called only when the exception is fatal for the connection. + * * @param e the exception to propagate + * + * @see #propagateExceptionToFrameListeners + * @see #propagateExceptionToStateWaiters */ - public void propagateExceptionToWaiters(Exception e) + public void propagateExceptionToAllWaiters(Exception e) + { + propagateExceptionToFrameListeners(e); + propagateExceptionToStateWaiters(e); + } + + /** + * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any + * protocol level waits. + * + * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should + * stop waiting and relinquish the Failover lock {@see FailoverHandler}. + * + * Once the {@link FailoverHandler} has re-established the connection then the listeners will be able to re-attempt + * their protocol request and so listen again for the correct frame. + * + * @param e the exception to propagate + */ + public void propagateExceptionToFrameListeners(Exception e) { - if (!_frameListeners.isEmpty()) { final Iterator it = _frameListeners.iterator(); @@ -421,6 +461,22 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } + /** + * This caters for the case where we only need to propogate an exception to the the state manager to interupt any + * thing waiting for a state change. + * + * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement. + * + * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal + * cases {@link #propagateExceptionToAllWaiters} would be the correct choice. + * + * @param e the exception to propagate + */ + public void propagateExceptionToStateWaiters(Exception e) + { + getStateManager().error(e); + } + public void notifyFailoverStarting() { // Set the last exception in the sync block to ensure the ordering with add. @@ -431,7 +487,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter _lastFailoverException = new FailoverException("Failing over about to start"); } - propagateExceptionToWaiters(_lastFailoverException); + //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be + // interupted unless failover cannot restore the state. + propagateExceptionToFrameListeners(_lastFailoverException); } public void failoverInProgress() @@ -443,6 +501,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void messageReceived(IoSession session, Object message) throws Exception { + if (PROTOCOL_DEBUG) + { + _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); + } + if(message instanceof AMQFrame) { final boolean debug = _logger.isDebugEnabled(); @@ -459,7 +522,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - bodyFrame.handle(frame.getChannel(),_protocolSession); + bodyFrame.handle(frame.getChannel(), _protocolSession); _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); } @@ -508,20 +571,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter if (!wasAnyoneInterested) { throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" - + _frameListeners, null); + + _frameListeners, null); } } catch (AMQException e) - { - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); - } - } + { + propagateExceptionToFrameListeners(e); exceptionCaught(session, e); } @@ -532,6 +587,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void messageSent(IoSession session, Object message) throws Exception { + if (PROTOCOL_DEBUG) + { + _protocolLogger.debug(String.format("SEND: [%s] %s", this, message)); + } + final long sentMessages = _messagesOut++; final boolean debug = _logger.isDebugEnabled(); @@ -542,34 +602,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter } _connection.bytesSent(session.getWrittenBytes()); - if (debug) - { - _logger.debug("Sent frame " + message); - } - } - - /* - public void addFrameListener(AMQMethodListener listener) - { - _frameListeners.add(listener); - } - - public void removeFrameListener(AMQMethodListener listener) - { - _frameListeners.remove(listener); - } - */ - public void attainState(AMQState s) throws AMQException - { - getStateManager().attainState(s); } - public AMQState attainState(Set<AMQState> states) throws AMQException + public StateWaiter createWaiter(Set<AMQState> states) throws AMQException { - return getStateManager().attainState(states); + return getStateManager().createWaiter(states); } - /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). @@ -617,14 +656,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter { throw _lastFailoverException; } - + _frameListeners.add(listener); } _protocolSession.writeFrame(frame); - AMQMethodEvent e = listener.blockForFrame(timeout); - - return e; + return listener.blockForFrame(timeout); // When control resumes before this line, a reply will have been received // that matches the criteria defined in the blocking listener } @@ -669,8 +706,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter getStateManager().changeState(AMQState.CONNECTION_CLOSING); ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection."),0,0); - + new AMQShortString("JMS client is closing the connection."), 0, 0); final AMQFrame frame = body.generateFrame(0); @@ -745,10 +781,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void setStateManager(AMQStateManager stateManager) { _stateManager = stateManager; - if (_protocolSession != null) - { - _protocolSession.setStateManager(stateManager); - } } public AMQProtocolSession getProtocolSession() @@ -778,7 +810,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public MethodRegistry getMethodRegistry() { - return getStateManager().getMethodRegistry(); + return _protocolSession.getMethodRegistry(); } public ProtocolVersion getProtocolVersion() |