diff options
author | Martin Ritchie <ritchiem@apache.org> | 2008-07-15 17:06:16 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2008-07-15 17:06:16 +0000 |
commit | ba373f4b9b54023b1a0f70c065a1e6f153b432ba (patch) | |
tree | f3f08b253524aace6c06dd9274ced371cb98227e /qpid/java/client | |
parent | 461b8cad6760b694bf561ad295db0474c06daed6 (diff) | |
download | qpid-python-ba373f4b9b54023b1a0f70c065a1e6f153b432ba.tar.gz |
QPID-940,QPID-594,QPID-805,QPID-826 : Updated the client exception handling so that exceptions are not lost. In performing the changes I noted that the AMQStateManager is only used for connection creation in the 08/09 codepath. Now this may be due to the fact that we don't currently need to wait on any other states. We need to improve our testing of error conditions for all protcol versions.
Changes Summary:
The MethodHandlers had their AMQStateManager parameters swapped for AMQSession as that is what they all cared about.
The BlockingMethodFrameListener was used as a basis to create a generic BlockingWaiter which is now used by StateWaiter,
There is probably scope to refactor the AMQStateManager and the parts of the AMQProtocolHandler that deal with the _frameListeners into a generic class but that can be looked at as part of a wider client refactor.
Additionally tests were updated such as SimpleACLTest and ConnectionTest as they were expecting JMSExceptions from the constructor but the JMS API does not demand that and AMQExceptions are now correctly being thrown.
The SimpleACLTest also required a change to AMQSession.
The test calls send which will cause the connection to be closed asynchrously due to a permission violation. As this exception is not expected and is asynchorous to the client code it cannot be directly thrown. The solution is to record this exception in the AMQStateManager, it can tell that there are no waiters for the exception so it can record the value.(Potential exists to alert if the exception is overwritten but I don't think this is required right now)
When the AMQSession checks that the connection is closed it is then possible to check if the current State is CLOSED and if we have an exception sitting in the AMQStateManager. If all these are true we can attach the AMQStateManager exception to the IllegalState Exception that is normally thrown when the Session is closed.
This maintains JMS Compliance and allows us to discover the cause of the failure, SimpleACLTest was updated by removing the IllegalState catch section that was causing the test to silently fail.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@676978 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
35 files changed, 927 insertions, 812 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0abcc8ef26..472eaef5b5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -26,7 +26,6 @@ import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; @@ -76,11 +75,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); private int _size = 0; private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; - public AMQSession get(int channelId) { - if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) { return _fastAccessSessions[channelId]; } @@ -93,7 +91,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQSession put(int channelId, AMQSession session) { AMQSession oldVal; - if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) { oldVal = _fastAccessSessions[channelId]; _fastAccessSessions[channelId] = session; @@ -102,11 +100,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { oldVal = _slowAccessSessions.put(channelId, session); } - if((oldVal != null) && (session == null)) + if ((oldVal != null) && (session == null)) { _size--; } - else if((oldVal == null) && (session != null)) + else if ((oldVal == null) && (session != null)) { _size++; } @@ -115,13 +113,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } - public AMQSession remove(int channelId) { AMQSession session; - if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) { - session = _fastAccessSessions[channelId]; + session = _fastAccessSessions[channelId]; _fastAccessSessions[channelId] = null; } else @@ -129,7 +126,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect session = _slowAccessSessions.remove(channelId); } - if(session != null) + if (session != null) { _size--; } @@ -141,9 +138,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { ArrayList<AMQSession> values = new ArrayList<AMQSession>(size()); - for(int i = 0; i < 16; i++) + for (int i = 0; i < 16; i++) { - if(_fastAccessSessions[i] != null) + if (_fastAccessSessions[i] != null) { values.add(_fastAccessSessions[i]); } @@ -162,14 +159,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _size = 0; _slowAccessSessions.clear(); - for(int i = 0; i<16; i++) + for (int i = 0; i < 16; i++) { _fastAccessSessions[i] = null; } } } - private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); protected AtomicInteger _idFactory = new AtomicInteger(0); @@ -211,7 +207,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** The virtual path to connect to on the AMQ server */ private String _virtualHost; - protected ExceptionListener _exceptionListener; @@ -252,15 +247,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private ProtocolVersion _protocolVersion = ProtocolVersion.v0_9; // FIXME TGM, shouldn't need this protected AMQConnectionDelegate _delegate; - + // this connection maximum number of prefetched messages private long _maxPrefetch; //Indicates whether persistent messages are synchronized private boolean _syncPersistence; - - /** used to hold a list of all exceptions that have been thrown during connection construction. gross */ - final ArrayList<Exception> _exceptions = new ArrayList<Exception>(); /** * @param broker brokerdetails @@ -337,20 +329,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception - * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon. + * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon. */ public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { // set this connection maxPrefetch if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null) { - _maxPrefetch = Long.parseLong( connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH)); + _maxPrefetch = Long.parseLong(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH)); } else { // use the defaul value set for all connections _maxPrefetch = Long.valueOf(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, - ClientProperties.MAX_PREFETCH_DEFAULT)); + ClientProperties.MAX_PREFETCH_DEFAULT)); } if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null) @@ -378,25 +370,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _delegate = new AMQConnectionDelegate_0_10(this); } - - class Listener implements ExceptionListener - { - public void onException(JMSException e) - { - _exceptions.add(e); - } - } - - try - { - setExceptionListener(new Listener()); - } - catch (JMSException e) - { - // Shouldn't happen - throw new AMQException(null, null, e); - } - if (_logger.isInfoEnabled()) { _logger.info("Connection:" + connectionURL); @@ -436,15 +409,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName(); } - - _protocolHandler = new AMQProtocolHandler(this); + _protocolHandler = new AMQProtocolHandler(this); // We are not currently connected _connected = false; // TMG FIXME this seems... wrong... boolean retryAllowed = true; - while (!_connected && retryAllowed ) + Exception connectionException = null; + while (!_connected && retryAllowed) { try { @@ -456,37 +429,29 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.info(pe.getMessage()); _logger.info("Trying broker supported protocol version: " + - TransportConstants.getVersionMajor() + "." + - TransportConstants.getVersionMinor()); + TransportConstants.getVersionMajor() + "." + + TransportConstants.getVersionMinor()); } // we need to check whether we have a delegate for the supported protocol getDelegate(); } catch (Exception e) { - _exceptions.add(e); if (_logger.isInfoEnabled()) { - _logger.info("Unable to connect to broker at " + - _failoverPolicy.getCurrentBrokerDetails(), - e); + _logger.info("Unable to connect to broker at " + + _failoverPolicy.getCurrentBrokerDetails(), + e); } + connectionException = e; } - + if (!_connected) { retryAllowed = _failoverPolicy.failoverAllowed(); brokerDetails = _failoverPolicy.getNextBrokerDetails(); } } - try - { - setExceptionListener(null); - } - catch (JMSException e1) - { - // Can't happen - } if (_logger.isDebugEnabled()) { @@ -496,26 +461,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (!_connected) { String message = null; - try - { - Thread.sleep(150); - } - catch (InterruptedException e) - { - // Eat it, we've hopefully got all the exceptions if this happened - } - - Exception lastException = null; - if (_exceptions.size() > 0) + + if (connectionException != null) { - lastException = _exceptions.get(_exceptions.size() - 1); - if (lastException.getCause() != null) + if (connectionException.getCause() != null) { - message = lastException.getCause().getMessage(); + message = connectionException.getCause().getMessage(); } else { - message = lastException.getMessage(); + message = connectionException.getMessage(); } } @@ -527,20 +482,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else // can only be "" if getMessage() returned it therfore lastException != null { - message = "Unable to Connect:" + lastException.getClass(); + message = "Unable to Connect:" + connectionException.getClass(); } } - AMQException e = new AMQConnectionFailureException(message, _exceptions); - - if (lastException != null) + AMQException e = new AMQConnectionFailureException(message, connectionException); + + if (connectionException != null) { - if (lastException instanceof UnresolvedAddressException) + if (connectionException instanceof UnresolvedAddressException) { e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(), null); } - + } throw e; } @@ -565,18 +520,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { Class c = Class.forName("org.apache.qpid.client.AMQConnectionDelegate_" + - TransportConstants.getVersionMajor() + "_" + - TransportConstants.getVersionMinor()); - Class partypes[] = new Class[1]; + TransportConstants.getVersionMajor() + "_" + + TransportConstants.getVersionMinor()); + Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); } catch (Exception e) { throw new AMQProtocolException(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR, - "Protocol: " + TransportConstants.getVersionMajor() + "." - + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " + - "currently supported by this client library implementation", e); + "Protocol: " + TransportConstants.getVersionMajor() + "." + + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " + + "currently supported by this client library implementation", e); } } @@ -867,7 +822,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - } } @@ -892,14 +846,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - public void close() throws JMSException + public void close() throws JMSException { close(DEFAULT_TIMEOUT); } public void close(long timeout) throws JMSException { - close(new ArrayList<AMQSession>(_sessions.values()),timeout); + close(new ArrayList<AMQSession>(_sessions.values()), timeout); } public void close(List<AMQSession> sessions, long timeout) throws JMSException @@ -912,12 +866,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void doClose(List<AMQSession> sessions, long timeout) throws JMSException { - synchronized(_sessionCreationLock) + synchronized (_sessionCreationLock) { - if(!sessions.isEmpty()) + if (!sessions.isEmpty()) { AMQSession session = sessions.remove(0); - synchronized(session.getMessageDeliveryLock()) + synchronized (session.getMessageDeliveryLock()) { doClose(sessions, timeout); } @@ -1120,7 +1074,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _sessions; } - + public String getUsername() { return _username; @@ -1297,6 +1251,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (cause instanceof IOException) { closer = !_closed.getAndSet(true); + + _protocolHandler.getProtocolSession().notifyError(je); } if (_exceptionListener != null) @@ -1339,7 +1295,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (cause instanceof AMQException) { - return ((AMQException)cause).isHardError(); + return ((AMQException) cause).isHardError(); } return true; @@ -1483,7 +1439,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ public long getMaxPrefetch() { - return _maxPrefetch; + return _maxPrefetch; } /** @@ -1495,14 +1451,4 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _syncPersistence; } - - public Exception getLastException() - { - if (_exceptions.size() > 0) - { - return _exceptions.get(_exceptions.size() - 1); - } - return null; - } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 5074658070..aab094ca7d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -32,12 +32,12 @@ import java.util.Set; import javax.jms.JMSException; import javax.jms.XASession; -import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; @@ -84,11 +84,15 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate final Set<AMQState> openOrClosedStates = EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); + + StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates); + TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); // this blocks until the connection has been set up or when an error // has prevented the connection being set up - AMQState state = _conn._protocolHandler.attainState(openOrClosedStates); + AMQState state = waiter.await(); + if(state == AMQState.CONNECTION_OPEN) { _conn._failoverPolicy.attainedConnection(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 6755e4da5f..2f593ce0c3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -79,6 +79,8 @@ import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.AMQState; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -90,22 +92,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations * <tr><td> * </table> * * @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For - * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second - * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of - * the fail-over process, the retry handler could be used to automatically retry the operation once the connection - * has been reestablished. All fail-over protected operations should be placed in private methods, with - * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the - * fail-over process sets a nowait flag and uses an async method call instead. - * + * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second + * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of + * the fail-over process, the retry handler could be used to automatically retry the operation once the connection + * has been reestablished. All fail-over protected operations should be placed in private methods, with + * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the + * fail-over process sets a nowait flag and uses an async method call instead. * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this, - * after looking at worse bottlenecks first. + * after looking at worse bottlenecks first. */ public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession { @@ -114,10 +114,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>(); - public BasicMessageConsumer get(int id) { - if((id & 0xFFFFFFF0) == 0) + if ((id & 0xFFFFFFF0) == 0) { return _fastAccessConsumers[id]; } @@ -130,7 +129,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public BasicMessageConsumer put(int id, BasicMessageConsumer consumer) { BasicMessageConsumer oldVal; - if((id & 0xFFFFFFF0) == 0) + if ((id & 0xFFFFFFF0) == 0) { oldVal = _fastAccessConsumers[id]; _fastAccessConsumers[id] = consumer; @@ -144,13 +143,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - public BasicMessageConsumer remove(int id) { BasicMessageConsumer consumer; - if((id & 0xFFFFFFF0) == 0) + if ((id & 0xFFFFFFF0) == 0) { - consumer = _fastAccessConsumers[id]; + consumer = _fastAccessConsumers[id]; _fastAccessConsumers[id] = null; } else @@ -166,9 +164,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>(); - for(int i = 0; i < 16; i++) + for (int i = 0; i < 16; i++) { - if(_fastAccessConsumers[i] != null) + if (_fastAccessConsumers[i] != null) { values.add(_fastAccessConsumers[i]); } @@ -178,11 +176,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return values; } - public void clear() { _slowAccessConsumers.clear(); - for(int i = 0; i<16; i++) + for (int i = 0; i < 16; i++) { _fastAccessConsumers[i] = null; } @@ -280,19 +277,13 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess */ protected final FlowControllingBlockingQueue _queue; - /** - * Holds the highest received delivery tag. - */ + /** Holds the highest received delivery tag. */ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); - /** - * All the not yet acknowledged message tags - */ + /** All the not yet acknowledged message tags */ protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); - /** - * All the delivered message tags - */ + /** All the delivered message tags */ protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>(); /** Holds the dispatcher thread for this session. */ @@ -315,9 +306,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * consumer. */ protected final IdToConsumerMap _consumers = new IdToConsumerMap(); - - //Map<AMQShortString, BasicMessageConsumer> _consumers = - //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); + + //Map<AMQShortString, BasicMessageConsumer> _consumers = + //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); /** * Contains a list of consumers which have been removed but which might still have @@ -380,15 +371,13 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess /** Has failover occured on this session */ private boolean _failedOver; - - private static final class FlowControlIndicator { private volatile boolean _flowControl = true; public synchronized void setFlowControl(boolean flowControl) { - _flowControl= flowControl; + _flowControl = flowControl; notify(); } @@ -450,8 +439,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public void aboveThreshold(int currentValue) { _logger.debug( - "Above threshold(" + _defaultPrefetchHighMark - + ") so suspending channel. Current value is " + currentValue); + "Above threshold(" + _defaultPrefetchHighMark + + ") so suspending channel. Current value is " + currentValue); _suspendState.set(true); new Thread(new SuspenderRunner(_suspendState)).start(); @@ -460,8 +449,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public void underThreshold(int currentValue) { _logger.debug( - "Below threshold(" + _defaultPrefetchLowMark - + ") so unsuspending channel. Current value is " + currentValue); + "Below threshold(" + _defaultPrefetchLowMark + + ") so unsuspending channel. Current value is " + currentValue); _suspendState.set(false); new Thread(new SuspenderRunner(_suspendState)).start(); @@ -503,6 +492,19 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess close(-1); } + public void checkNotClosed() throws JMSException + { + // if the Connection has closed then we should throw any exception that has occured that we were not waiting for + AMQStateManager manager = _connection.getProtocolHandler().getStateManager(); + if (isClosed() && manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null) + { + JMSException jmse = new IllegalStateException("Object " + toString() + " has been closed"); + jmse.setLinkedException(manager.getLastException()); + throw jmse; + } + super.checkNotClosed(); + } + public BytesMessage createBytesMessage() throws JMSException { checkNotClosed(); @@ -519,7 +521,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (isClosed()) { throw new IllegalStateException("Session is already closed"); - } + } else if (hasFailedOver()) { throw new IllegalStateException("has failed over"); @@ -564,39 +566,35 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param exchangeName The exchange to bind the queue on. * * @throws AMQException If the queue cannot be bound for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. - * * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? */ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName,final AMQDestination destination) throws AMQException + final AMQShortString exchangeName, final AMQDestination destination) throws AMQException { /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - sendQueueBind(queueName,routingKey,arguments,exchangeName,destination); + sendQueueBind(queueName, routingKey, arguments, exchangeName, destination); return null; } }, _connection).execute(); } - public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException { - if( consumer.getQueuename() != null) + if (consumer.getQueuename() != null) { - bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd); + bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(), amqd); } } public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException; + final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException; /** - * Closes the session. * * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close @@ -606,14 +604,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker. * * @throws JMSException If the JMS provider fails to close the session due to some internal error. - * * @todo Be aware of possible changes to parameter order as versions change. - * * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be - * re-opened. May need to examine this more carefully. - * + * re-opened. May need to examine this more carefully. * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, - * because the failover process sends the failover event before acquiring the mutex itself. + * because the failover process sends the failover event before acquiring the mutex itself. */ public void close(long timeout) throws JMSException { @@ -621,7 +616,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); _logger.info("Closing session: " + this); // + ":" - // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); + // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); } // Ensure we only try and close an open session. @@ -638,7 +633,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess try { - sendClose(timeout); + sendClose(timeout); } catch (AMQException e) { @@ -705,7 +700,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess amqe = new AMQException("Closing session forcibly", e); } - _connection.deregisterSession(_channelId); closeProducersAndConsumers(amqe); } @@ -723,12 +717,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does * not mean that the commit is known to have failed, merely that it is not known whether it * failed or not. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void commit() throws JMSException { - checkTransacted(); + checkTransacted(); try { @@ -792,10 +785,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess // consumer.markClosed(); - - if (consumer.isAutoClose()) - { + { // There is a small window where the message is between the two queues in the dispatcher. if (consumer.isClosed()) { @@ -863,7 +854,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess false, false); } - public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); @@ -881,7 +871,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess messageSelector, null, false, false); } - public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { @@ -891,7 +880,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess messageSelector, null, false, false); } - public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, String selector) throws JMSException { @@ -928,7 +916,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException; public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) - throws JMSException + throws JMSException { checkNotClosed(); checkValidTopic(topic); @@ -996,7 +984,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkNotClosed(); - return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic); + return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic, false, false), topic); } public Queue createQueue(String queueName) throws JMSException @@ -1022,7 +1010,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - /** * Declares the named queue. * @@ -1034,7 +1021,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param exclusive Flag to indicate that the queue is exclusive to this client. * * @throws AMQException If the queue cannot be declared for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, @@ -1043,7 +1029,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess createQueue(name, autoDelete, durable, exclusive, null); } - /** * Declares the named queue. * @@ -1056,7 +1041,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param arguments Arguments used to set special properties of the queue * * @throws AMQException If the queue cannot be declared for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, @@ -1073,7 +1057,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, - final boolean exclusive, final Map<String, Object> arguments)throws AMQException, FailoverException; + final boolean exclusive, final Map<String, Object> arguments) throws AMQException, FailoverException; + /** * Creates a QueueReceiver * @@ -1191,7 +1176,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); } - /** * Creates a non-durable subscriber with a message selector * @@ -1382,7 +1366,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (message instanceof ReturnMessage) { // Return of the bounced message. - returnBouncedMessage((ReturnMessage)message); + returnBouncedMessage((ReturnMessage) message); } else { @@ -1398,7 +1382,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQProtocolHandler protocolHandler = getProtocolHandler(); declareExchange(amqd, protocolHandler, false); AMQShortString queueName = declareQueue(amqd, protocolHandler, false); - bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd); + bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd); } /** @@ -1413,7 +1397,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * <li>Stop message delivery.</li> * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered". * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered. - * Redelivered messages do not have to be delivered in exactly their original delivery order.</li> + * Redelivered messages do not have to be delivered in exactly their original delivery order.</li> * </ul> * * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and @@ -1503,7 +1487,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does * not mean that the rollback is known to have failed, merely that it is not known whether it * failed or not. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void rollback() throws JMSException @@ -1545,8 +1528,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public abstract void releaseForRollback(); - public abstract void sendRollback() throws AMQException, FailoverException ; - + public abstract void sendRollback() throws AMQException, FailoverException; public void run() { @@ -1673,8 +1655,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess ft.addAll(rawSelector); } - BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh,prefetchLow, - noLocal,exclusive, messageSelector, ft, noConsume, autoClose); + BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, + noLocal, exclusive, messageSelector, ft, noConsume, autoClose); if (_messageListener != null) { @@ -1718,8 +1700,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh, - final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, - final boolean noConsume, final boolean autoClose) throws JMSException; + final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, + final boolean noConsume, final boolean autoClose) throws JMSException; /** * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer @@ -1782,12 +1764,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not. * * @throws JMSException If the query fails for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) throws JMSException; - + public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException; /** @@ -1828,10 +1809,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. * * @throws AMQException If the session cannot be started for any reason. - * * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the - * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages - * for each subsequent call to flow.. only need to do this if we have called stop. + * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages + * for each subsequent call to flow.. only need to do this if we have called stop. */ void start() throws AMQException { @@ -2032,7 +2012,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } // at this point the _consumers map will be empty - if (_dispatcher != null) + if (_dispatcher != null) { _dispatcher.close(); _dispatcher = null; @@ -2124,7 +2104,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector,AMQShortString tag) throws AMQException, FailoverException; + AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException; private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate) throws JMSException @@ -2143,7 +2123,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess checkNotClosed(); long producerId = getNextProducerId(); BasicMessageProducer producer = createMessageProducer(destination, mandatory, - immediate, waitUntilSent, producerId); + immediate, waitUntilSent, producerId); registerProducer(producerId, producer); return producer; @@ -2152,20 +2132,19 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent, long producerId); + final boolean immediate, final boolean waitUntilSent, long producerId); private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); } - /** * Returns the number of messages currently queued for the given destination. * * <p/>Note that this operation automatically retries in the event of fail-over. * - * @param amqd The destination to be checked + * @param amqd The destination to be checked * * @return the number of queued messages. * @@ -2198,7 +2177,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param nowait * * @throws AMQException If the exchange cannot be declared for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ private void declareExchange(final AMQShortString name, final AMQShortString type, @@ -2215,8 +2193,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException; - + final boolean nowait) throws AMQException, FailoverException; /** * Declares a queue for a JMS destination. @@ -2234,9 +2211,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * the client. * * @throws AMQException If the queue cannot be declared for any reason. - * * @todo Verify the destiation is valid or throw an exception. - * * @todo Be aware of possible changes to parameter order as versions change. */ protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, @@ -2262,7 +2237,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } - public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)throws AMQException, FailoverException; + public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException; /** * Undeclares the specified queue. @@ -2272,7 +2247,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param queueName The name of the queue to delete. * * @throws JMSException If the queue could not be deleted for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ protected void deleteQueue(final AMQShortString queueName) throws JMSException @@ -2294,7 +2268,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException; + public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException; private long getNextProducerId() { @@ -2384,7 +2358,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess consumer.setQueuename(queueName); // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(), amqd); // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch if (!_immediatePrefetch) @@ -2469,7 +2443,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (_logger.isDebugEnabled()) { _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:" - + message.getDeliveryTag()); + + message.getDeliveryTag()); } messages.remove(); @@ -2519,10 +2493,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess // Bounced message is processed here, away from the mina thread AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), - msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies()); - AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); - AMQShortString reason = msg.getReplyText(); - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); + msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies()); + AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); + AMQShortString reason = msg.getReplyText(); + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. if (errorCode == AMQConstant.NO_CONSUMERS) @@ -2557,7 +2531,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * should be unsuspended. * * @throws AMQException If the session cannot be suspended for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ protected void suspendChannel(boolean suspend) throws AMQException // , FailoverException @@ -2598,7 +2571,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return getAMQConnection().getMaxPrefetch() > 0; } - /** Signifies that the session has pending sends to commit. */ public void markDirty() { @@ -2642,12 +2614,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _flowControl.setFlowControl(active); } - public void checkFlowControl() throws InterruptedException { - synchronized(_flowControl) + synchronized (_flowControl) { - while(!_flowControl.getFlowControl()) + while (!_flowControl.getFlowControl()) { _flowControl.wait(); } @@ -2655,7 +2626,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ class Dispatcher extends Thread { @@ -2856,7 +2826,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess //if (message.getDeliverBody() != null) //{ final BasicMessageConsumer consumer = - _consumers.get(message.getConsumerTag().toIntValue()); + _consumers.get(message.getConsumerTag().toIntValue()); if ((consumer == null) || consumer.isClosed()) { @@ -2865,14 +2835,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (consumer == null) { _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliveryTag() + "] from queue " - + message.getConsumerTag() + " )without a handler - rejecting(requeue)..."); + + message.getDeliveryTag() + "] from queue " + + message.getConsumerTag() + " )without a handler - rejecting(requeue)..."); } else { _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliveryTag() + "] from queue " + " consumer(" - + message.getConsumerTag() + ") is closed rejecting(requeue)..."); + + message.getDeliveryTag() + "] from queue " + " consumer(" + + message.getConsumerTag() + ") is closed rejecting(requeue)..."); } } // Don't reject if we're already closing @@ -2930,7 +2900,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { try { - synchronized(_suspensionLock) + synchronized (_suspensionLock) { suspendChannel(_suspend.get()); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index e0e319250e..82bff1dda7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -482,7 +482,7 @@ public final class AMQSession_0_8 extends AMQSession false, null).generateFrame(_channelId); QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); - getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); + getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); return okHandler._messageCount; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 76c899f565..927f660932 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -139,11 +139,15 @@ public class FailoverHandler implements Runnable // have a state waiter waiting until the connection is closed for some reason. Or in future we may have // a slightly more complex state model therefore I felt it was worthwhile doing this. AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager(); - _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession())); + + _amqProtocolHandler.setStateManager(new AMQStateManager()); + + if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null)) { _logger.info("Failover process veto-ed by client"); + //Restore Existing State Manager _amqProtocolHandler.setStateManager(existingStateManager); //todo: ritchiem these exceptions are useless... Would be better to attempt to propogate exception that @@ -181,13 +185,19 @@ public class FailoverHandler implements Runnable if (!failoverSucceeded) { + //Restore Existing State Manager _amqProtocolHandler.setStateManager(existingStateManager); + _amqProtocolHandler.getConnection().exceptionReceived( new AMQDisconnectedException("Server closed connection and no failover " + "was successful", null)); } else { + // Set the new Protocol Session in the StateManager. + existingStateManager.setProtocolSession(_amqProtocolHandler.getProtocolSession()); + + //Restore Existing State Manager _amqProtocolHandler.setStateManager(existingStateManager); try { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java index 5bd36aa88b..365fed6aa5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java @@ -5,14 +5,8 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.framing.*; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.AMQNoConsumersException; -import org.apache.qpid.client.AMQNoRouteException; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInvalidRoutingKeyException; -import org.apache.qpid.AMQChannelClosedException; -import org.apache.qpid.protocol.AMQConstant; public class AccessRequestOkMethodHandler implements StateAwareMethodListener<AccessRequestOkBody> { @@ -25,11 +19,10 @@ public class AccessRequestOkMethodHandler implements StateAwareMethodListener<Ac return _handler; } - public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId) + public void methodReceived(AMQProtocolSession session, AccessRequestOkBody method, int channelId) throws AMQException { _logger.debug("AccessRequestOk method received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); session.setTicket(method.getTicket(), channelId); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java index e3e08667d8..5cb9412d51 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java @@ -22,11 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,13 +42,9 @@ public class BasicCancelOkMethodHandler implements StateAwareMethodListener<Basi private BasicCancelOkMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, BasicCancelOkBody body, int channelId) + public void methodReceived(AMQProtocolSession session, BasicCancelOkBody body, int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - - - if (_logger.isInfoEnabled()) { _logger.info("New BasicCancelOk method received for consumer:" + body.getConsumerTag()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java index 4deaa314ec..6029e7c171 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java @@ -21,10 +21,8 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.message.UnprocessedMessage_0_8; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicDeliverBody; import org.slf4j.Logger; @@ -41,10 +39,9 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic return _instance; } - public void methodReceived(AMQStateManager stateManager, BasicDeliverBody body, int channelId) - throws AMQException + public void methodReceived(AMQProtocolSession session, BasicDeliverBody body, int channelId) + throws AMQException { - final AMQProtocolSession session = stateManager.getProtocolSession(); final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8( channelId, body.getDeliveryTag(), @@ -52,7 +49,7 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic body.getExchange(), body.getRoutingKey(), body.getRedelivered()); - _logger.debug("New JmsDeliver method received"); + _logger.debug("New JmsDeliver method received:" + session); session.unprocessedMessageReceived(msg); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java index 682c3ac2c1..5731fb7473 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java @@ -22,13 +22,9 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.ReturnMessage; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.message.UnprocessedMessage_0_8; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,11 +41,10 @@ public class BasicReturnMethodHandler implements StateAwareMethodListener<BasicR } - public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, int channelId) + public void methodReceived(AMQProtocolSession session, BasicReturnBody body, int channelId) throws AMQException { _logger.debug("New JmsBounce method received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); final ReturnMessage msg = new ReturnMessage(channelId, body.getExchange(), body.getRoutingKey(), diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index ee4cf14d58..2b6745ebe4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -26,14 +26,12 @@ import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.client.AMQNoConsumersException; import org.apache.qpid.client.AMQNoRouteException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,12 +47,10 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann return _handler; } - public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId) + public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId) throws AMQException { _logger.debug("ChannelClose method received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); - AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode()); AMQShortString reason = method.getReplyText(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java index 8d3277d4de..9a9a0b4e63 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java @@ -23,9 +23,7 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,12 +39,11 @@ public class ChannelCloseOkMethodHandler implements StateAwareMethodListener<Cha return _instance; } - public void methodReceived(AMQStateManager stateManager, ChannelCloseOkBody method, int channelId) + public void methodReceived(AMQProtocolSession session, ChannelCloseOkBody method, int channelId) throws AMQException { _logger.info("Received channel-close-ok for channel-id " + channelId); - final AMQProtocolSession session = stateManager.getProtocolSession(); // todo this should do the local closure } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java index b47fe751d6..2153b9cc8c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java @@ -2,7 +2,6 @@ package org.apache.qpid.client.handler; import org.apache.qpid.framing.ChannelFlowBody; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; import org.slf4j.Logger; @@ -42,11 +41,9 @@ public class ChannelFlowMethodHandler implements StateAwareMethodListener<Channe private ChannelFlowMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId) + public void methodReceived(AMQProtocolSession session, ChannelFlowBody body, int channelId) throws AMQException { - - final AMQProtocolSession session = stateManager.getProtocolSession(); session.setFlowControl(channelId, body.getActive()); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java index 96de8af54b..6f66a972d5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java @@ -22,10 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +41,7 @@ public class ChannelFlowOkMethodHandler implements StateAwareMethodListener<Chan private ChannelFlowOkMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, ChannelFlowOkBody body, int channelId) + public void methodReceived(AMQProtocolSession session, ChannelFlowOkBody body, int channelId) throws AMQException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index afb7517a12..a0f3808b23 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -27,31 +27,33 @@ import org.apache.qpid.framing.*; import org.apache.qpid.AMQException; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.AMQMethodNotImplementedException; - +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClientMethodDispatcherImpl implements MethodDispatcher { - - private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance(); - private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance(); - private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance(); - private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance(); - private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance(); - private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); - private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance(); + private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance(); + private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance(); + private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance(); + private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance(); + private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance(); + private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); + private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance(); private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance(); - private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance(); - private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance(); - private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance(); - private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance(); - private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance(); + private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance(); + private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance(); + private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance(); + private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance(); + private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance(); + private static final Logger _logger = LoggerFactory.getLogger(ClientMethodDispatcherImpl.class); private static interface DispatcherFactory { - public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager); + public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session); } private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories = @@ -62,44 +64,40 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher _dispatcherFactories.put(ProtocolVersion.v8_0, new DispatcherFactory() { - public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager) + public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session) { - return new ClientMethodDispatcherImpl_8_0(stateManager); + return new ClientMethodDispatcherImpl_8_0(session); } }); _dispatcherFactories.put(ProtocolVersion.v0_9, new DispatcherFactory() { - public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager) + public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session) { - return new ClientMethodDispatcherImpl_0_9(stateManager); + return new ClientMethodDispatcherImpl_0_9(session); } }); } - - public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager) + public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQProtocolSession session) { + _logger.error("New Method Dispatcher:" + session); DispatcherFactory factory = _dispatcherFactories.get(version); - return factory.createMethodDispatcher(stateManager); + return factory.createMethodDispatcher(session); } - - + AMQProtocolSession _session; - private AMQStateManager _stateManager; - - public ClientMethodDispatcherImpl(AMQStateManager stateManager) + public ClientMethodDispatcherImpl(AMQProtocolSession session) { - _stateManager = stateManager; + _session = session; } - public AMQStateManager getStateManager() { - return _stateManager; + return _session.getStateManager(); } public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException @@ -109,7 +107,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException { - _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId); + _basicCancelOkMethodHandler.methodReceived(_session, body, channelId); return true; } @@ -120,7 +118,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException { - _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId); + _basicDeliverMethodHandler.methodReceived(_session, body, channelId); return true; } @@ -141,13 +139,13 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException { - _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId); + _basicReturnMethodHandler.methodReceived(_session, body, channelId); return true; } public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException { - _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId); + _channelCloseMethodHandler.methodReceived(_session, body, channelId); return true; } @@ -163,7 +161,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException { - _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId); + _channelFlowOkMethodHandler.methodReceived(_session, body, channelId); return true; } @@ -174,7 +172,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException { - _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId); + _connectionCloseMethodHandler.methodReceived(_session, body, channelId); return true; } @@ -185,37 +183,37 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException { - _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId); + _connectionOpenOkMethodHandler.methodReceived(_session, body, channelId); return true; } public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException { - _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId); + _connectionRedirectMethodHandler.methodReceived(_session, body, channelId); return true; } public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException { - _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId); + _connectionSecureMethodHandler.methodReceived(_session, body, channelId); return true; } public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException { - _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId); + _connectionStartMethodHandler.methodReceived(_session, body, channelId); return true; } public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException { - _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId); + _connectionTuneMethodHandler.methodReceived(_session, body, channelId); return true; } public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException { - _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId); + _queueDeleteOkMethodHandler.methodReceived(_session, body, channelId); return true; } @@ -431,7 +429,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException { - _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId); + _exchangeBoundOkMethodHandler.methodReceived(_session, body, channelId); return true; } @@ -522,7 +520,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException { - return false; + return false; } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java index e235368357..d3e9fba8ed 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java @@ -26,16 +26,15 @@ import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9; import org.apache.qpid.AMQException; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.AMQMethodNotImplementedException; - +import org.apache.qpid.client.protocol.AMQProtocolSession; public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9 { - public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager) + public ClientMethodDispatcherImpl_0_9(AMQProtocolSession session) { - super(stateManager); + super(session); } - public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException { return false; @@ -148,8 +147,7 @@ public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl i public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException { - return false; + return false; } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java index b0f003cd2d..19f758817d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java @@ -24,13 +24,13 @@ import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; import org.apache.qpid.AMQException; -import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.protocol.AMQProtocolSession; public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0 { - public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager) + public ClientMethodDispatcherImpl_8_0(AMQProtocolSession session) { - super(stateManager); + super(session); } public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index 950a3288fc..bc82d6bc62 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -25,13 +25,11 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,14 +46,13 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co } private ConnectionCloseMethodHandler() - { } + { + } - public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody method, int channelId) - throws AMQException + public void methodReceived(AMQProtocolSession session, ConnectionCloseBody method, int channelId) + throws AMQException { _logger.info("ConnectionClose frame received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); - // does it matter // stateManager.changeState(AMQState.CONNECTION_CLOSING); @@ -63,6 +60,8 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode()); AMQShortString reason = method.getReplyText(); + AMQException error = null; + try { @@ -75,35 +74,33 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co { if (errorCode == AMQConstant.NOT_ALLOWED || (errorCode == AMQConstant.ACCESS_REFUSED)) { - _logger.info("Error :" + errorCode +":"+ Thread.currentThread().getName()); - - // todo ritchiem : Why do this here when it is going to be done in the finally block? - session.closeProtocolSession(); + _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName()); - // todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state. - stateManager.changeState(AMQState.CONNECTION_NOT_STARTED); - - throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null); + error = new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null); } else { _logger.info("Connection close received with error code " + errorCode); - throw new AMQConnectionClosedException(errorCode, "Error: " + reason, null); + error = new AMQConnectionClosedException(errorCode, "Error: " + reason, null); } } } finally { - // this actually closes the connection in the case where it is not an error. + if (error != null) + { + session.notifyError(error); + } + + // Close the protocol Session, including any open TCP connections session.closeProtocolSession(); - // ritchiem: Doing this though will cause any waiting connection start to be released without being able to - // see what the cause was. - stateManager.changeState(AMQState.CONNECTION_CLOSED); + // Closing the session should not introduce a race condition as this thread will continue to propgate any + // exception in to the exceptionCaught method of the SessionHandler. + // Any sessionClosed event should occur after this. } } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java index fd7acac84f..e639a33450 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java @@ -24,9 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionOpenOkBody; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.protocol.AMQMethodEvent; public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<ConnectionOpenOkBody> { @@ -41,10 +39,10 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<C { } - public void methodReceived(AMQStateManager stateManager, ConnectionOpenOkBody body, int channelId) + public void methodReceived(AMQProtocolSession session, ConnectionOpenOkBody body, int channelId) throws AMQException { - stateManager.changeState(AMQState.CONNECTION_OPEN); + session.getStateManager().changeState(AMQState.CONNECTION_OPEN); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java index cac68c9467..472c471fd6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java @@ -22,10 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ConnectionRedirectBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,11 +44,10 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener private ConnectionRedirectMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, ConnectionRedirectBody method, int channelId) + public void methodReceived(AMQProtocolSession session, ConnectionRedirectBody method, int channelId) throws AMQException { _logger.info("ConnectionRedirect frame received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); String host = method.getHost().toString(); // the host is in the form hostname:port with the port being optional diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java index 900aa2abac..9a9bee757b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java @@ -25,12 +25,9 @@ import javax.security.sasl.SaslException; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ConnectionSecureBody; import org.apache.qpid.framing.ConnectionSecureOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; public class ConnectionSecureMethodHandler implements StateAwareMethodListener<ConnectionSecureBody> { @@ -41,10 +38,9 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener<C return _instance; } - public void methodReceived(AMQStateManager stateManager, ConnectionSecureBody body, int channelId) + public void methodReceived(AMQProtocolSession session, ConnectionSecureBody body, int channelId) throws AMQException { - final AMQProtocolSession session = stateManager.getProtocolSession(); SaslClient client = session.getSaslClient(); if (client == null) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index d3746f137e..8857f1115a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -25,7 +25,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.security.AMQCallbackHandler; import org.apache.qpid.client.security.CallbackHandlerRegistry; import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.common.QpidProperties; @@ -35,7 +34,6 @@ import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,15 +60,12 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co private ConnectionStartMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, ConnectionStartBody body, int channelId) + public void methodReceived(AMQProtocolSession session, ConnectionStartBody body, int channelId) throws AMQException { _log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, " + "AMQMethodEvent evt): called"); - final AMQProtocolSession session = stateManager.getProtocolSession(); - - ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(), (byte) body.getVersionMinor()); // For the purposes of interop, we can make the client accept the broker's version string. @@ -145,7 +140,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co throw new AMQException(null, "No locales sent from server, passed: " + locales, null); } - stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); + session.getStateManager().changeState(AMQState.CONNECTION_NOT_TUNED); FieldTable clientProperties = FieldTableFactory.newFieldTable(); clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index fc0e40b745..e4e58c317d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -24,10 +24,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.ConnectionTuneParameters; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.*; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,11 +44,10 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con protected ConnectionTuneMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, ConnectionTuneBody frame, int channelId) + public void methodReceived(AMQProtocolSession session, ConnectionTuneBody frame, int channelId) throws AMQException { _logger.debug("ConnectionTune frame received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); final MethodRegistry methodRegistry = session.getMethodRegistry(); @@ -65,7 +62,7 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat())); session.setConnectionTuneParameters(params); - stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); + session.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED); ConnectionTuneOkBody tuneOkBody = methodRegistry.createConnectionTuneOkBody(params.getChannelMax(), params.getFrameMax(), diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java index 8de40beb10..690d782b40 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java @@ -22,10 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +44,7 @@ public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener<Ex private ExchangeBoundOkMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, ExchangeBoundOkBody body, int channelId) + public void methodReceived(AMQProtocolSession session, ExchangeBoundOkBody body, int channelId) throws AMQException { if (_logger.isDebugEnabled()) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java index 41225c0569..01d82c9b55 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java @@ -22,10 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +44,7 @@ public class QueueDeleteOkMethodHandler implements StateAwareMethodListener<Queu private QueueDeleteOkMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, QueueDeleteOkBody body, int channelId) + public void methodReceived(AMQProtocolSession session, QueueDeleteOkBody body, int channelId) throws AMQException { if (_logger.isDebugEnabled()) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 1b75d6e829..8ac4f843de 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -43,6 +43,7 @@ import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.*; @@ -100,23 +101,22 @@ import java.util.concurrent.CountDownLatch; * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations * <tr><td> Create the filter chain to filter this handlers events. - * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. + * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. * * <tr><td> Maintain fail-over state. * <tr><td> * </table> * * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the - * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing - * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec - * filter before it mean not doing the read/write asynchronously but in the main filter thread? - * + * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing + * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec + * filter before it mean not doing the read/write asynchronously but in the main filter thread? * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including - * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of - * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could - * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data - * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so - * that lifecycles of the fields match lifecycles of their containing objects. + * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of + * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could + * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data + * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so + * that lifecycles of the fields match lifecycles of their containing objects. */ public class AMQProtocolHandler extends IoHandlerAdapter { @@ -136,7 +136,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter private AMQStateManager _stateManager = new AMQStateManager(); /** Holds the method listeners, */ - private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); + private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); /** * We create the failover handler when the session is created since it needs a reference to the IoSession in order @@ -154,14 +154,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ private CountDownLatch _failoverLatch; - /** The last failover exception that occured */ private FailoverException _lastFailoverException; /** Defines the default timeout to use for synchronous protocol commands. */ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; - /** * Creates a new protocol handler, associated with the specified client connection instance. * @@ -245,7 +243,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); } } - _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); + _protocolSession = new AMQProtocolSession(this, session, _connection); + + _stateManager.setProtocolSession(_protocolSession); + _protocolSession.init(); } @@ -263,7 +264,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param session The MINA session. * * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and - * not otherwise? The above comment doesn't make that clear. + * not otherwise? The above comment doesn't make that clear. */ public void sessionClosed(IoSession session) { @@ -374,7 +375,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter "cause isn't AMQConnectionClosedException: " + cause, cause); AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); - propagateExceptionToWaiters(amqe); + propagateExceptionToAllWaiters(amqe); } _connection.exceptionReceived(cause); @@ -395,7 +396,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter // we notify the state manager of the error in case we have any clients waiting on a state // change. Those "waiters" will be interrupted and can handle the exception AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); - propagateExceptionToWaiters(amqe); + propagateExceptionToAllWaiters(amqe); _connection.exceptionReceived(cause); } } @@ -405,11 +406,33 @@ public class AMQProtocolHandler extends IoHandlerAdapter * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. * + * This should be called only when the exception is fatal for the connection. + * * @param e the exception to propagate + * + * @see #propagateExceptionToFrameListeners + * @see #propagateExceptionToStateWaiters */ - public void propagateExceptionToWaiters(Exception e) + public void propagateExceptionToAllWaiters(Exception e) + { + propagateExceptionToFrameListeners(e); + propagateExceptionToStateWaiters(e); + } + + /** + * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any + * protocol level waits. + * + * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should + * stop waiting and relinquish the Failover lock {@see FailoverHandler}. + * + * Once the {@link FailoverHandler} has re-established the connection then the listeners will be able to re-attempt + * their protocol request and so listen again for the correct frame. + * + * @param e the exception to propagate + */ + public void propagateExceptionToFrameListeners(Exception e) { - if (!_frameListeners.isEmpty()) { final Iterator it = _frameListeners.iterator(); @@ -421,6 +444,22 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } + /** + * This caters for the case where we only need to propogate an exception to the the state manager to interupt any + * thing waiting for a state change. + * + * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement. + * + * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal + * cases {@link #propagateExceptionToAllWaiters} would be the correct choice. + * + * @param e the exception to propagate + */ + public void propagateExceptionToStateWaiters(Exception e) + { + getStateManager().error(e); + } + public void notifyFailoverStarting() { // Set the last exception in the sync block to ensure the ordering with add. @@ -431,7 +470,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter _lastFailoverException = new FailoverException("Failing over about to start"); } - propagateExceptionToWaiters(_lastFailoverException); + //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be + // interupted unless failover cannot restore the state. + propagateExceptionToFrameListeners(_lastFailoverException); } public void failoverInProgress() @@ -443,7 +484,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void messageReceived(IoSession session, Object message) throws Exception { - if(message instanceof AMQFrame) + if (message instanceof AMQFrame) { final boolean debug = _logger.isDebugEnabled(); final long msgNumber = ++_messageReceivedCount; @@ -459,7 +500,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - bodyFrame.handle(frame.getChannel(),_protocolSession); + bodyFrame.handle(frame.getChannel(), _protocolSession); _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); } @@ -508,20 +549,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter if (!wasAnyoneInterested) { throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" - + _frameListeners, null); + + _frameListeners, null); } } catch (AMQException e) - { - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); - } - } + { + propagateExceptionToFrameListeners(e); exceptionCaught(session, e); } @@ -548,28 +581,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } - /* - public void addFrameListener(AMQMethodListener listener) - { - _frameListeners.add(listener); - } - - public void removeFrameListener(AMQMethodListener listener) - { - _frameListeners.remove(listener); - } - */ - public void attainState(AMQState s) throws Exception - { - getStateManager().attainState(s); - } - - public AMQState attainState(Set<AMQState> states) throws AMQException + public StateWaiter createWaiter(Set<AMQState> states) throws AMQException { - return getStateManager().attainState(states); + return getStateManager().createWaiter(states); } - /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). @@ -617,14 +633,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter { throw _lastFailoverException; } - + _frameListeners.add(listener); } _protocolSession.writeFrame(frame); - AMQMethodEvent e = listener.blockForFrame(timeout); - - return e; + return listener.blockForFrame(timeout); // When control resumes before this line, a reply will have been received // that matches the criteria defined in the blocking listener } @@ -669,8 +683,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter getStateManager().changeState(AMQState.CONNECTION_CLOSING); ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection."),0,0); - + new AMQShortString("JMS client is closing the connection."), 0, 0); final AMQFrame frame = body.generateFrame(0); @@ -745,10 +758,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void setStateManager(AMQStateManager stateManager) { _stateManager = stateManager; - if (_protocolSession != null) - { - _protocolSession.setStateManager(stateManager); - } } public AMQProtocolSession getProtocolSession() @@ -778,7 +787,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public MethodRegistry getMethodRegistry() { - return getStateManager().getMethodRegistry(); + return _protocolSession.getMethodRegistry(); } public ProtocolVersion getProtocolVersion() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 6e782e0bfc..6beec3c9ba 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory; import javax.jms.JMSException; import javax.security.sasl.SaslClient; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -38,10 +37,10 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.ConnectionTuneParameters; -import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.message.UnprocessedMessage_0_8; import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.AMQState; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; @@ -67,8 +66,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected final IoSession _minaProtocolSession; - private AMQStateManager _stateManager; - protected WriteFuture _lastWriteFuture; /** @@ -86,7 +83,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives * first) with the subsequent content header and content bodies. */ - private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>(); + private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>(); private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16]; /** Counter to ensure unique queue names */ @@ -97,26 +94,16 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession // private VersionSpecificRegistry _registry = // MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion()); - private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion()); - private MethodDispatcher _methodDispatcher; - private final AMQConnection _connection; private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) { - this(protocolHandler, protocolSession, connection, new AMQStateManager()); - - } - - public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, - AMQStateManager stateManager) - { _protocolHandler = protocolHandler; _minaProtocolSession = protocolSession; _minaProtocolSession.setAttachment(this); @@ -124,11 +111,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); // fixme - real value needed _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - _stateManager = stateManager; - _stateManager.setProtocolSession(this); _protocolVersion = connection.getProtocolVersion(); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), - stateManager); + this); _connection = connection; } @@ -161,14 +146,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public AMQStateManager getStateManager() { - return _stateManager; - } - - public void setStateManager(AMQStateManager stateManager) - { - _stateManager = stateManager; - _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(_protocolVersion, - stateManager); + return _protocolHandler.getStateManager(); } public String getVirtualHost() @@ -238,9 +216,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException { final int channelId = message.getChannelId(); - if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) { - _channelId2UnprocessedMsgArray[channelId] = message; + _channelId2UnprocessedMsgArray[channelId] = message; } else { @@ -251,17 +229,16 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException { final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId] - : _channelId2UnprocessedMsgMap.get(channelId); - + : _channelId2UnprocessedMsgMap.get(channelId); if (msg == null) { - throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first", null); + throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first on session:" + this, null); } if (msg.getContentHeader() != null) { - throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames", null); + throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames on session:" + this, null); } msg.setContentHeader(contentHeader); @@ -275,7 +252,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { UnprocessedMessage_0_8 msg; final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0; - if(fastAccess) + if (fastAccess) { msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgArray[channelId]; } @@ -291,7 +268,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession if (msg.getContentHeader() == null) { - if(fastAccess) + if (fastAccess) { _channelId2UnprocessedMsgArray[channelId] = null; } @@ -333,7 +310,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { AMQSession session = getSession(channelId); session.messageReceived(msg); - if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) { _channelId2UnprocessedMsgArray[channelId] = null; } @@ -431,12 +408,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION); } - public void closeProtocolSession() + public void closeProtocolSession() throws AMQException { closeProtocolSession(true); } - public void closeProtocolSession(boolean waitLast) + public void closeProtocolSession(boolean waitLast) throws AMQException { _logger.debug("Waiting for last write to join."); if (waitLast && (_lastWriteFuture != null)) @@ -446,6 +423,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _logger.debug("Closing protocol session"); final CloseFuture future = _minaProtocolSession.close(); + + // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED + // then wait for the connection to close. + // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any + // error now shouldn't matter. + + _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); + future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); } @@ -489,9 +474,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _protocolVersion = pv; _methodRegistry = MethodRegistry.getMethodRegistry(pv); - _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, _stateManager); + _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this); - // _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor); + // _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor); } public byte getProtocolMinorVersion() @@ -524,12 +509,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return _methodDispatcher; } - public void setTicket(int ticket, int channelId) { final AMQSession session = getSession(channelId); session.setTicket(ticket); } + public void setMethodDispatcher(MethodDispatcher methodDispatcher) { _methodDispatcher = methodDispatcher; @@ -545,4 +530,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession); } + + public void notifyError(Exception error) + { + _protocolHandler.propagateExceptionToAllWaiters(error); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 0ab2e07340..2bc609ebf2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -20,9 +20,14 @@ */ package org.apache.qpid.client.protocol; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.util.BlockingWaiter; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; @@ -54,38 +59,17 @@ import org.apache.qpid.protocol.AMQMethodListener; * </table> * * @todo Might be neater if this method listener simply wrapped another that provided the method handling using a - * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations - * seem to use it. So wrapping the listeners is possible. - * - * @todo What is to stop a blocking method listener, receiving a second method whilst it is registered as a listener, - * overwriting the first one before the caller of the block method has had a chance to examine it? If one-shot - * behaviour is to be intended it should be enforced, perhaps by always returning false once the blocked for - * method has been received. - * - * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull - * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry - * when this happens. At the very least, restore the interrupted status flag. - * + * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations + * seem to use it. So wrapping the listeners is possible. * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to - * check that SynchronousQueue has a non-blocking put method available. + * check that SynchronousQueue has a non-blocking put method available. */ -public abstract class BlockingMethodFrameListener implements AMQMethodListener +public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMethodEvent> implements AMQMethodListener { - /** This flag is used to indicate that the blocked for method has been received. */ - private volatile boolean _ready = false; - - /** Used to protect the shared event and ready flag between the producer and consumer. */ - private final Object _lock = new Object(); - - /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */ - private volatile Exception _error; /** Holds the channel id for the channel upon which this listener is waiting for a response. */ protected int _channelId; - /** Holds the incoming method. */ - protected AMQMethodEvent _doneEvt = null; - /** * Creates a new method listener, that filters incoming method to just those that match the specified channel id. * @@ -104,7 +88,14 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener * * @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise. */ - public abstract boolean processMethod(int channelId, AMQMethodBody frame); // throws AMQException; + public abstract boolean processMethod(int channelId, AMQMethodBody frame); + + public boolean process(AMQMethodEvent evt) + { + AMQMethodBody method = evt.getMethod(); + + return (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method); + } /** * Informs this listener that an AMQP method has been received. @@ -113,37 +104,9 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener * * @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise. */ - public boolean methodReceived(AMQMethodEvent evt) // throws AMQException + public boolean methodReceived(AMQMethodEvent evt) { - AMQMethodBody method = evt.getMethod(); - - /*try - {*/ - boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method); - - if (ready) - { - // we only update the flag from inside the synchronized block - // so that the blockForFrame method cannot "miss" an update - it - // will only ever read the flag from within the synchronized block - synchronized (_lock) - { - _doneEvt = evt; - _ready = ready; - _lock.notify(); - } - } - - return ready; - - /*} - catch (AMQException e) - { - error(e); - // we rethrow the error here, and the code in the frame dispatcher will go round - // each listener informing them that an exception has been thrown - throw e; - }*/ + return received(evt); } /** @@ -159,75 +122,15 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener */ public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException { - synchronized (_lock) + try { - while (!_ready) - { - try - { - if (timeout == -1) - { - _lock.wait(); - } - else - { - - _lock.wait(timeout); - if (!_ready) - { - _error = new AMQTimeoutException("Server did not respond in a timely fashion", null); - _ready = true; - } - } - } - catch (InterruptedException e) - { - // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess - // if (!_ready && timeout != -1) - // { - // _error = new AMQException("Server did not respond timely"); - // _ready = true; - // } - } - } + return (AMQMethodEvent) block(timeout); } - - if (_error != null) + finally { - if (_error instanceof AMQException) - { - throw (AMQException) _error; - } - else if (_error instanceof FailoverException) - { - // This should ensure that FailoverException is not wrapped and can be caught. - throw (FailoverException) _error; // needed to expose FailoverException. - } - else - { - throw new AMQException(null, "Woken up due to " + _error.getClass(), _error); - } + //Prevent any more errors being notified to this waiter. + close(); } - - return _doneEvt; } - /** - * This is a callback, called by the MINA dispatcher thread only. It is also called from within this - * class to avoid code repetition but again is only called by the MINA dispatcher thread. - * - * @param e - */ - public void error(Exception e) - { - // set the error so that the thread that is blocking (against blockForFrame()) - // can pick up the exception and rethrow to the caller - _error = e; - - synchronized (_lock) - { - _ready = true; - _lock.notify(); - } - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 21f190bd7e..7a5d70ad15 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -28,15 +28,30 @@ import org.apache.qpid.protocol.AMQMethodListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; /** - * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler - * there is a separate state manager. + * The state manager is responsible for managing the state of the protocol session. <p/> + * For each {@link org.apache.qpid.client.protocol.AMQProtocolHandler} there is a separate state manager. + * + * The AMQStateManager is now attached to the {@link org.apache.qpid.client.protocol.AMQProtocolHandler} and that is the sole point of reference so that + * As the {@link AMQProtocolSession} changes due to failover the AMQStateManager need not be copied around. + * + * The StateManager works by any component can wait for a state change to occur by using the following sequence. + * + * <li>StateWaiter waiter = stateManager.createWaiter(Set<AMQState> states); + * <li> // Perform action that will cause state change + * <li>waiter.await(); + * + * The two step process is required as there is an inherit race condition between starting a process that will cause + * the state to change and then attempting to wait for that change. The interest in the change must be first set up so + * that any asynchrous errors that occur can be delivered to the correct waiters. + * + * */ -public class AMQStateManager +public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class); @@ -45,16 +60,13 @@ public class AMQStateManager /** The current state */ private AMQState _currentState; - - /** - * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of - * AMQFrame. - */ - - private final Object _stateLock = new Object(); + private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000")); + protected final List<StateWaiter> _waiters = new CopyOnWriteArrayList<StateWaiter>(); + private Exception _lastException; + public AMQStateManager() { this(null); @@ -62,18 +74,15 @@ public class AMQStateManager public AMQStateManager(AMQProtocolSession protocolSession) { - this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession); + this(AMQState.CONNECTION_NOT_STARTED, protocolSession); } - protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession) + protected AMQStateManager(AMQState state, AMQProtocolSession protocolSession) { _protocolSession = protocolSession; _currentState = state; - } - - public AMQState getCurrentState() { return _currentState; @@ -86,117 +95,101 @@ public class AMQStateManager synchronized (_stateLock) { _currentState = newState; - _stateLock.notifyAll(); + + _logger.debug("Notififying State change to " + _waiters.size() + " : " + _waiters); + + for (StateWaiter waiter : _waiters) + { + waiter.received(newState); + } } } - public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException { - B method = evt.getMethod(); - + // StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod()); method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId()); return true; } + /** + * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted. + * + * The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the + * connection to the network. + * + * @param session The new protocol session + */ + public void setProtocolSession(AMQProtocolSession session) + { + _logger.error("Setting ProtocolSession:" + session); + _protocolSession = session; + } - public void attainState(final AMQState s) throws Exception + /** + * Propogate error to waiters + * + * @param error The error to propogate. + */ + public void error(Exception error) { - synchronized (_stateLock) + if (_waiters.size() == 0) { - final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME; - long waitTime = MAXIMUM_STATE_WAIT_TIME; - - while ((_currentState != s) && (waitTime > 0)) - { - try - { - _stateLock.wait(MAXIMUM_STATE_WAIT_TIME); - } - catch (InterruptedException e) - { - _logger.warn("Thread interrupted"); - if (_protocolSession.getAMQConnection().getLastException() != null) - { - throw _protocolSession.getAMQConnection().getLastException(); - } - - } - - if (_currentState != s) - { - waitTime = waitUntilTime - System.currentTimeMillis(); - } - } - - if (_currentState != s) - { - _logger.warn("State not achieved within permitted time. Current state " + _currentState - + ", desired state: " + s); - throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState - + ", desired state: " + s, null); - } + _logger.error("No Waiters for error saving as last error:" + error.getMessage()); + _lastException = error; + } + for (StateWaiter waiter : _waiters) + { + _logger.error("Notifying Waiters(" + _waiters + ") for error:" + error.getMessage()); + waiter.error(error); } - - // at this point the state will have changed. } - public AMQProtocolSession getProtocolSession() + /** + * This provides a single place that the maximum time for state change to occur can be accessed. + * It is currently set via System property amqj.MaximumStateWait + * + * @return long Milliseconds value for a timeout + */ + public long getWaitTimeout() { - return _protocolSession; + return MAXIMUM_STATE_WAIT_TIME; } - public void setProtocolSession(AMQProtocolSession session) + /** + * Create and add a new waiter to the notifcation list. + * @param states The waiter will attempt to wait for one of these desired set states to be achived. + * @return the created StateWaiter. + */ + public StateWaiter createWaiter(Set<AMQState> states) { - _protocolSession = session; - } + final StateWaiter waiter; + synchronized (_stateLock) + { + waiter = new StateWaiter(this, _currentState, states); - public MethodRegistry getMethodRegistry() - { - return getProtocolSession().getMethodRegistry(); + _waiters.add(waiter); + } + + return waiter; } - public AMQState attainState(Set<AMQState> stateSet) throws AMQException + /** + * Remove the waiter from the notification list. + * @param waiter The waiter to remove. + */ + public void removeWaiter(StateWaiter waiter) { synchronized (_stateLock) { - final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME; - long waitTime = MAXIMUM_STATE_WAIT_TIME; - - while (!stateSet.contains(_currentState) && (waitTime > 0)) - { - try - { - _stateLock.wait(MAXIMUM_STATE_WAIT_TIME); - } - catch (InterruptedException e) - { - _logger.warn("Thread interrupted"); - if (_protocolSession.getAMQConnection().getLastException() != null) - { - throw new AMQException(null, "Could not attain state due to exception", - _protocolSession.getAMQConnection().getLastException()); - } - } - - if (!stateSet.contains(_currentState)) - { - waitTime = waitUntilTime - System.currentTimeMillis(); - } - } - - if (!stateSet.contains(_currentState)) - { - _logger.warn("State not achieved within permitted time. Current state " + _currentState - + ", desired state: " + stateSet); - throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState - + ", desired state: " + stateSet, null); - } - return _currentState; + _waiters.remove(waiter); } + } - + public Exception getLastException() + { + return _lastException; } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java index 8c65f56af3..17d04f4fa3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java @@ -33,6 +33,6 @@ import org.apache.qpid.protocol.AMQMethodEvent; public interface StateAwareMethodListener<B extends AMQMethodBody> { - void methodReceived(AMQStateManager stateManager, B body, int channelId) throws AMQException; + void methodReceived(AMQProtocolSession session, B body, int channelId) throws AMQException; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateListener.java deleted file mode 100644 index df207a0a23..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateListener.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.state; - -import org.apache.qpid.AMQException; - -public interface StateListener -{ - void stateChanged(AMQState oldState, AMQState newState) throws AMQException; - - void error(Throwable t); -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java index 8b8453a1b0..4695b195d5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java @@ -20,103 +20,110 @@ */ package org.apache.qpid.client.state; +import org.apache.qpid.client.util.BlockingWaiter; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.AMQException; - -import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +import java.util.Set; /** - * Waits for a particular state to be reached. + * This is an implementation of the {@link BlockingWaiter} to provide error handing and a waiting mechanism for state + * changes. + * + * On construction the current state and a set of States to await for is provided. + * + * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is + * a desired state then await() returns immediately. + * + * Otherwise it will block for the set timeout for a desired state to be achieved. + * + * The state changes are notified via the {@link #process} method. + * + * Any notified error is handled by the BlockingWaiter and thrown from the {@link #block} method. + * */ -public class StateWaiter implements StateListener +public class StateWaiter extends BlockingWaiter<AMQState> { private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class); - private final AMQState _state; - - private volatile boolean _newStateAchieved; - - private volatile Throwable _throwable; - - private final Object _monitor = new Object(); - private static final long TIME_OUT = 1000 * 60 * 2; - - public StateWaiter(AMQState state) + Set<AMQState> _awaitStates; + private AMQState _startState; + private AMQStateManager _stateManager; + + /** + * + * @param stateManager The StateManager + * @param currentState + * @param awaitStates + */ + public StateWaiter(AMQStateManager stateManager, AMQState currentState, Set<AMQState> awaitStates) { - _state = state; + _logger.info("New StateWaiter :" + currentState + ":" + awaitStates); + _stateManager = stateManager; + _awaitStates = awaitStates; + _startState = currentState; } - public void waituntilStateHasChanged() throws AMQException + /** + * When the state is changed this StateWaiter is notified to process the change. + * + * @param state The new state that has been achieved. + * @return + */ + public boolean process(AMQState state) { - synchronized (_monitor) - { - // - // The guard is required in case we are woken up by a spurious - // notify(). - // - while (!_newStateAchieved && (_throwable == null)) - { - try - { - _logger.debug("State " + _state + " not achieved so waiting..."); - _monitor.wait(TIME_OUT); - // fixme this won't cause the timeout to exit the loop. need to set _throwable - } - catch (InterruptedException e) - { - _logger.debug("Interrupted exception caught while waiting: " + e, e); - } - } - } + return _awaitStates.contains(state); + } - if (_throwable != null) - { - _logger.debug("Throwable reached state waiter: " + _throwable); - if (_throwable instanceof AMQException) - { - throw (AMQException) _throwable; - } - else - { - throw new AMQException(null, "Error: " + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught. - } - } + /** + * Await for the requried State to be achieved within the default timeout. + * @return The achieved state that was requested. + * @throws AMQException The exception that prevented the required state from being achived. + */ + public AMQState await() throws AMQException + { + return await(_stateManager.getWaitTimeout()); } - public void stateChanged(AMQState oldState, AMQState newState) + /** + * Await for the requried State to be achieved. + * + * <b>It is the responsibility of this class to remove the waiter from the StateManager + * + * @param timeout The time in milliseconds to wait for any of the states to be achived. + * @return The achieved state that was requested. + * @throws AMQException The exception that prevented the required state from being achived. + */ + public AMQState await(long timeout) throws AMQException { - synchronized (_monitor) + try { - if (_logger.isDebugEnabled()) + if (process(_startState)) { - _logger.debug("stateChanged called changing from :" + oldState + " to :" + newState); + return _startState; } - if (_state == newState) + try { - _newStateAchieved = true; - - if (_logger.isDebugEnabled()) - { - _logger.debug("New state reached so notifying monitor"); - } + return (AMQState) block(timeout); + } + catch (FailoverException e) + { + _logger.error("Failover occured whilst waiting for states:" + _awaitStates); - _monitor.notifyAll(); + e.printStackTrace(); + return null; } } - } - - public void error(Throwable t) - { - synchronized (_monitor) + finally { - if (_logger.isDebugEnabled()) - { - _logger.debug("exceptionThrown called"); - } + //Prevent any more errors being notified to this waiter. + close(); - _throwable = t; - _monitor.notifyAll(); + //Remove the waiter from the notifcation list in the statee manager + _stateManager.removeWaiter(this); } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java index 623591e0b6..f0d7feb059 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.client.state.listener; -import org.apache.qpid.AMQException; + import org.apache.qpid.client.protocol.BlockingMethodFrameListener; import org.apache.qpid.framing.AMQMethodBody; @@ -34,7 +34,7 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener _expectedClass = expectedClass; } - public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException + public boolean processMethod(int channelId, AMQMethodBody frame) { return _expectedClass.isInstance(frame); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java new file mode 100644 index 0000000000..67cda957fb --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -0,0 +1,348 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.util; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; + +/** + * BlockingWaiter is a 'rendezvous' which delegates handling of + * incoming Objects to a listener implemented as a sub-class of this and hands off the process or + * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this + * differs from a 'rendezvous' in that sense. + * + * <p/>BlockingWaiters are used to coordinate when waiting for an an event that expect a response. + * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register + * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they + * have been completed. + * + * <p/>The {@link #process} must return <tt>true</tt> on any incoming method that it handles. This indicates to + * this listeners that the object just processed ends the waiting process. + * + * <p/>Errors from the producer are rethrown to the consumer. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations </td> + * <tr><td> Accept generic objects as events for processing via {@link #process}. <td> + * <tr><td> Delegate handling and undserstanding of the object to a concrete implementation. <td> + * <tr><td> Block until {@link #process} determines that waiting is no longer required <td> + * <tr><td> Propagate the most recent exception to the consumer.<td> + * </table> + * + * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull + * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry + * when this happens. At the very least, restore the interrupted status flag. + * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to + * check that SynchronousQueue has a non-blocking put method available. + */ +public abstract class BlockingWaiter<T> +{ + /** This flag is used to indicate that the blocked for method has been received. */ + private volatile boolean _ready = false; + + /** This flag is used to indicate that the received error has been processed. */ + private volatile boolean _errorAck = false; + + /** Used to protect the shared event and ready flag between the producer and consumer. */ + private final ReentrantLock _lock = new ReentrantLock(); + + /** Used to signal that a method has been received */ + private final Condition _receivedCondition = _lock.newCondition(); + + /** Used to signal that a error has been processed */ + private final Condition _errorConditionAck = _lock.newCondition(); + + /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */ + private volatile Exception _error; + + /** Holds the incomming Object. */ + protected Object _doneObject = null; + private AtomicBoolean _waiting = new AtomicBoolean(false); + private boolean _closed = false; + + /** + * Delegates processing of the incomming object to the handler. + * + * @param object The object to process. + * + * @return <tt>true</tt> if the waiting is complete, <tt>false</tt> if waiting should continue. + */ + public abstract boolean process(T object); + + /** + * An Object has been received and should be processed to see if our wait condition has been reached. + * + * @param object The object received. + * + * @return <tt>true</tt> if the waiting is complete, <tt>false</tt> if waiting should continue. + */ + public boolean received(T object) + { + + boolean ready = process(object); + + if (ready) + { + // we only update the flag from inside the synchronized block + // so that the blockForFrame method cannot "miss" an update - it + // will only ever read the flag from within the synchronized block + _lock.lock(); + try + { + _doneObject = object; + _ready = ready; + _receivedCondition.signal(); + } + finally + { + _lock.unlock(); + } + } + + return ready; + } + + /** + * Blocks until an object is received that is handled by process, or the specified timeout + * has passed. + * + * Once closed any attempt to wait will throw an exception. + * + * @param timeout The timeout in milliseconds. + * + * @return The object that resolved the blocking. + * + * @throws AMQException + * @throws FailoverException + */ + public Object block(long timeout) throws AMQException, FailoverException + { + long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout); + + _lock.lock(); + + try + { + if (_closed) + { + throw throwClosedException(); + } + + if (_error == null) + { + _waiting.set(true); + + while (!_ready) + { + try + { + if (timeout == -1) + { + _receivedCondition.await(); + } + else + { + nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout); + + if (nanoTimeout <= 0 && !_ready && _error == null) + { + _error = new AMQTimeoutException("Server did not respond in a timely fashion", null); + _ready = true; + } + } + } + catch (InterruptedException e) + { + System.err.println(e.getMessage()); + // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess + // if (!_ready && timeout != -1) + // { + // _error = new AMQException("Server did not respond timely"); + // _ready = true; + // } + } + } + } + + if (_error != null) + { + if (_error instanceof AMQException) + { + throw (AMQException) _error; + } + else if (_error instanceof FailoverException) + { + // This should ensure that FailoverException is not wrapped and can be caught. + throw (FailoverException) _error; // needed to expose FailoverException. + } + else + { + throw new AMQException("Woken up due to " + _error.getClass(), _error); + } + } + + } + finally + { + _waiting.set(false); + + //Release Error handling thread + if (_error != null) + { + _errorAck = true; + _errorConditionAck.signal(); + + _error = null; + } + _lock.unlock(); + } + + return _doneObject; + } + + /** + * This is a callback, called when an error has occured that should interupt any waiter. + * It is also called from within this class to avoid code repetition but it should only be called by the MINA threads. + * + * Once closed any notification of an exception will be ignored. + * + * @param e The exception being propogated. + */ + public void error(Exception e) + { + // set the error so that the thread that is blocking (against blockForFrame()) + // can pick up the exception and rethrow to the caller + + _lock.lock(); + + if (_closed) + { + return; + } + + if (_error == null) + { + _error = e; + } + else + { + System.err.println("WARNING: new error arrived while old one not yet processed"); + } + + try + { + if (_waiting.get()) + { + + _ready = true; + _receivedCondition.signal(); + + while (!_errorAck) + { + try + { + _errorConditionAck.await(); + } + catch (InterruptedException e1) + { + System.err.println(e.getMessage()); + } + } + _errorAck = false; + } + } + finally + { + _lock.unlock(); + } + } + + /** + * Close this Waiter so that no more errors are processed. + * This is a preventative method to ensure that a second error thread does not get stuck in the error method after + * the await has returned. This has not happend but in practise but if two errors occur on the Connection at + * the same time then it is conceiveably possible for the second to get stuck if the first one is processed by a + * waiter. + * + * Once closed any attempt to wait will throw an exception. + * Any notification of an exception will be ignored. + */ + public void close() + { + _lock.lock(); + try + { + //if we have already closed then our job is done. + if (_closed) + { + return; + } + + //Close Waiter so no more exceptions are processed + _closed = true; + + //Wake up any await() threads + + //If we are waiting then use the error() to wake them up. + if (_waiting.get()) + { + error(throwClosedException()); + } + //If they are not waiting then there is nothing to do. + + // Wake up any error handling threads + + if (!_errorAck) + { + _errorAck = true; + _errorConditionAck.signal(); + + _error = null; + } + } + finally + { + _lock.unlock(); + } + } + + /** + * Helper method to generate the a closed Exception. + * + * todo: This should be converted to something more friendly. + * + * @return AMQException to throw to waiters when the Waiter is closed. + */ + private AMQException throwClosedException() + { + return new AMQException(null, "Waiter was closed.", null); + } + +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java index b6776a1a44..66f220643c 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java @@ -27,12 +27,10 @@ import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.client.AMQNoConsumersException; import org.apache.qpid.client.AMQNoRouteException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,12 +46,10 @@ public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListe return _handler; } - public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId) + public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId) throws AMQException { _logger.debug("ChannelClose method received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); - AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode()); AMQShortString reason = method.getReplyText(); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 27adc4dd77..6f4c26945c 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -33,11 +33,12 @@ public class ConnectionURLTest extends TestCase public void testFailoverURL() throws URLSyntaxException { - String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'"; + String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin?cyclecount='100''"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod().equals("roundrobin")); + assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE)); assertTrue(connectionurl.getUsername().equals("ritchiem")); assertTrue(connectionurl.getPassword().equals("bob")); assertTrue(connectionurl.getVirtualHost().equals("/test")); @@ -276,7 +277,7 @@ public class ConnectionURLTest extends TestCase public void testSingleTransportMultiOptionURL() throws URLSyntaxException { - String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672',routingkey='jim',timeout='200',immediatedelivery='true'"; + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'"; ConnectionURL connectionurl = new AMQConnectionURL(url); @@ -493,8 +494,38 @@ public class ConnectionURLTest extends TestCase } } + public void testSingleTransportMultiOptionOnBrokerURL() throws URLSyntaxException + { + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'"; + + ConnectionURL connectionurl = new AMQConnectionURL(url); + + assertTrue(connectionurl.getFailoverMethod() == null); + assertTrue(connectionurl.getUsername().equals("guest")); + assertTrue(connectionurl.getPassword().equals("guest")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); + + assertTrue(connectionurl.getBrokerCount() == 1); + + BrokerDetails service = connectionurl.getBrokerDetails(0); + + assertTrue(service.getTransport().equals("tcp")); + + + assertTrue(service.getHost().equals("localhost")); + assertTrue(service.getPort() == 5672); + assertEquals("jim",service.getProperty("foo")); + assertEquals("bob",service.getProperty("bar")); + assertEquals("jimmy",service.getProperty("fred")); + + assertTrue(connectionurl.getOption("routingkey").equals("jim")); + assertTrue(connectionurl.getOption("timeout").equals("200")); + assertTrue(connectionurl.getOption("immediatedelivery").equals("true")); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(ConnectionURLTest.class); } } + |