diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-02-19 11:55:47 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-19 11:55:47 +0000 |
commit | e3ff96a57778ea2325551dff110e4bedf7e1d4d5 (patch) | |
tree | e77ab4b679d132a86bb206df8f730658a7c74b87 | |
parent | 81fa6193269cf9f6cacd03966d21d7448126653a (diff) | |
download | qpid-python-e3ff96a57778ea2325551dff110e4bedf7e1d4d5.tar.gz |
QPID-372, QPID-376 Broker now ignores all frames for closing channels.
When a close-ok is received the channel can be reopened and used
All uses of getChannel check the return type is not null and throw a NOT_FOUND AMQException. If the channel is not found during a method handler then the Channel will be closed.
ChannelCloseHandler - Now throws a connection exception if trying to close a a non exisitant channel.
AMQMinaProtocolSession - Added pre-check for closing channels to ignore all but Close-OK methods
- Updated ChannelException method to close connection if the CE was a result of not having a valid channel.
- Changed state to CLOSING when writing out a connection close frame.
AMQConnection - Wrapped all _logging calls , Updated comment formatting
AMQSession - called startDispatcherIfRequired when receiving a message as without it a producer will not get a returned message. This is because there is no consumer setup to consume.
ConnectionCloseMethodHandler - Wrapped code in try finally so that the protocol session would always be closed correctly.
AMQStateManager - Added state to the logging values
Modified AMQTimeoutException to include a new constant value to identify the failure reason.
AMQConstant - Added 408 REQUEST_TIMEOUT fixed error with NOT_ALLOWED value was 530 should be 507.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@509172 13f79535-47bb-0310-9956-ffa450edef68
11 files changed, 822 insertions, 112 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 8faf5eedde..9a8fce7129 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -26,9 +26,11 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.AMQChannel; public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody> { @@ -51,11 +53,21 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos ChannelCloseBody body = evt.getMethod(); _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId + " and method " + body.methodId); - session.closeChannel(evt.getChannelId()); + int channelId = evt.getChannelId(); + + AMQChannel channel = session.getChannel(channelId); + + if (channel == null) + { + throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel"); + } + + session.closeChannel(channelId); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); + AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0); session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 309fa4663a..2de32c2f0f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -56,9 +56,11 @@ import org.apache.qpid.framing.MainRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.framing.VersionSpecificRegistry; +import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -254,12 +256,36 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Method frame received: " + frame); } + final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) frame.getBodyFrame()); + + //Check that this channel is not closing + if (channelAwaitingClosure(frame.getChannel())) + { + if ((evt.getMethod() instanceof ChannelCloseOkBody)) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + frame.getChannel() + "] awaiting closure - processing close-ok"); + } + } + else + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + frame.getChannel() + "] awaiting closure ignoring"); + } + return; + } + } + + try { try { + boolean wasAnyoneInterested = _stateManager.methodReceived(evt); if (!_frameListeners.isEmpty()) @@ -277,14 +303,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } catch (AMQChannelException e) { - _logger.error("Closing channel due to: " + e.getMessage()); - writeFrame(e.getCloseFrame(frame.getChannel())); - closeChannel(frame.getChannel()); + if (getChannel(frame.getChannel()) != null) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing channel due to: " + e.getMessage()); + } + writeFrame(e.getCloseFrame(frame.getChannel())); + closeChannel(frame.getChannel()); + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage()); + } + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e.getMessage()); + } + closeSession(); + + AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString()); + + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + writeFrame(ce.getCloseFrame(frame.getChannel())); + } } catch (AMQConnectionException e) { - _logger.error("Closing connection due to: " + e.getMessage()); + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e.getMessage()); + } closeSession(); + _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(e.getCloseFrame(frame.getChannel())); } } @@ -325,8 +379,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content header frame received: " + frame); } - //fixme what happens if getChannel returns null - getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); + + AMQChannel channel = getChannel(frame.getChannel()); + + if (channel == null) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel()); + } + else + { + channel.publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); + } } private void contentBodyReceived(AMQFrame frame) throws AMQException @@ -335,8 +398,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content body frame received: " + frame); } - //fixme what happens if getChannel returns null - getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this); + AMQChannel channel = getChannel(frame.getChannel()); + + if (channel == null) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel()); + } + else + { + channel.publishContentBody((ContentBody) frame.getBodyFrame(), this); + } } /** diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 2030876952..adf2a4bda2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -70,20 +70,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AtomicInteger _idFactory = new AtomicInteger(0); /** - * This is the "root" mutex that must be held when doing anything that could be impacted by failover. - * This must be held by any child objects of this connection such as the session, producers and consumers. + * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be + * held by any child objects of this connection such as the session, producers and consumers. */ private final Object _failoverMutex = new Object(); /** - * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels - * per session and we must prevent the client from opening too many. Zero means unlimited. + * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session + * and we must prevent the client from opening too many. Zero means unlimited. */ private long _maximumChannelCount; - /** - * The maximum size of frame supported by the server - */ + /** The maximum size of frame supported by the server */ private long _maximumFrameSize; /** @@ -93,26 +91,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ private AMQProtocolHandler _protocolHandler; - /** - * Maps from session id (Integer) to AMQSession instance - */ + /** Maps from session id (Integer) to AMQSession instance */ private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap private String _clientName; - /** - * The user name to use for authentication - */ + /** The user name to use for authentication */ private String _username; - /** - * The password to use for authentication - */ + /** The password to use for authentication */ private String _password; - /** - * The virtual path to connect to on the AMQ server - */ + /** The virtual path to connect to on the AMQ server */ private String _virtualHost; private ExceptionListener _exceptionListener; @@ -122,14 +112,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private ConnectionURL _connectionURL; /** - * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for - * message publication. + * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message + * publication. */ private boolean _started; - /** - * Policy dictating how to failover - */ + /** Policy dictating how to failover */ private FailoverPolicy _failoverPolicy; /* @@ -148,9 +136,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ private QpidConnectionMetaData _connectionMetaData; - /** - * Configuration info for SSL - */ + /** Configuration info for SSL */ private SSLConfiguration _sslConfiguration; private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; @@ -164,6 +150,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * @param password password * @param clientName clientid * @param virtualHost virtualhost + * * @throws AMQException * @throws URLSyntaxException */ @@ -182,6 +169,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * @param password password * @param clientName clientid * @param virtualHost virtualhost + * * @throws AMQException * @throws URLSyntaxException */ @@ -238,7 +226,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { - _logger.info("Connection:" + connectionURL); + if (_logger.isInfoEnabled()) + { + _logger.info("Connection:" + connectionURL); + } _sslConfiguration = sslConfig; if (connectionURL == null) { @@ -297,11 +288,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { lastException = e; - _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause()); + if (_logger.isInfoEnabled()) + { + _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause()); + } } } - _logger.debug("Are we connected:" + _connected); + if (_logger.isDebugEnabled()) + { + _logger.debug("Are we connected:" + _connected); + } if (!_connected) { @@ -402,7 +399,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } catch (Exception e) { - _logger.info("Unable to connect to broker at " + bd); + if (_logger.isInfoEnabled()) + { + _logger.info("Unable to connect to broker at " + bd); + } attemptReconnection(); } return false; @@ -421,11 +421,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (!(e instanceof AMQException)) { - _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); + if (_logger.isInfoEnabled()) + { + _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); + } } else { - _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails()); + if (_logger.isInfoEnabled()) + { + _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails()); + } } } } @@ -437,8 +443,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * Get the details of the currently active broker * - * @return null if no broker is active (i.e. no successful connection has been made, or - * the BrokerDetail instance otherwise + * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance + * otherwise */ public BrokerDetails getActiveBrokerDetails() { @@ -593,12 +599,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions - * where specified in the JMS spec + * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in + * the JMS spec * * @param transacted * @param acknowledgeMode + * * @return QueueSession + * * @throws JMSException */ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException @@ -607,12 +615,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions - * where specified in the JMS spec + * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in + * the JMS spec * * @param transacted * @param acknowledgeMode + * * @return TopicSession + * * @throws JMSException */ public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException @@ -718,11 +728,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Marks all sessions and their children as closed without sending any protocol messages. Useful when - * you need to mark objects "visible" in userland as closed after failover or other significant event that - * impacts the connection. - * <p/> - * The caller must hold the failover mutex before calling this method. + * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to + * mark objects "visible" in userland as closed after failover or other significant event that impacts the + * connection. <p/> The caller must hold the failover mutex before calling this method. */ private void markAllSessionsClosed() { @@ -740,9 +748,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * Close all the sessions, either due to normal connection closure or due to an error occurring. * - * @param cause if not null, the error that is causing this shutdown - * <p/> - * The caller must hold the failover mutex before calling this method. + * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex + * before calling this method. */ private void closeAllSessions(Throwable cause, long timeout) throws JMSException { @@ -891,6 +898,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * Fire the preFailover event to the registered connection listener (if any) * * @param redirect true if this is the result of a redirect request rather than a connection error + * * @return true if no listener or listener does not veto change */ public boolean firePreFailover(boolean redirect) @@ -904,10 +912,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Fire the preResubscribe event to the registered connection listener (if any). If the listener - * vetoes resubscription then all the sessions are closed. + * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes + * resubscription then all the sessions are closed. * * @return true if no listener or listener does not veto resubscription. + * * @throws JMSException */ public boolean firePreResubscribe() throws JMSException @@ -927,9 +936,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - /** - * Fires a failover complete event to the registered connection listener (if any). - */ + /** Fires a failover complete event to the registered connection listener (if any). */ public void fireFailoverComplete() { if (_connectionListener != null) @@ -939,8 +946,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * In order to protect the consistency of the connection and its child sessions, consumers and producers, - * the "failover mutex" must be held when doing any operations that could be corrupted during failover. + * In order to protect the consistency of the connection and its child sessions, consumers and producers, the + * "failover mutex" must be held when doing any operations that could be corrupted during failover. * * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs. */ @@ -950,8 +957,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * If failover is taking place this will block until it has completed. If failover - * is not taking place it will return immediately. + * If failover is taking place this will block until it has completed. If failover is not taking place it will + * return immediately. * * @throws InterruptedException */ @@ -961,18 +968,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Invoked by the AMQProtocolSession when a protocol session exception has occurred. - * This method sends the exception to a JMS exception listener, if configured, and - * propagates the exception to sessions, which in turn will propagate to consumers. - * This allows synchronous consumers to have exceptions thrown to them. + * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception + * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will + * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them. * * @param cause the exception */ public void exceptionReceived(Throwable cause) { - _logger.debug("Connection Close done by:" + Thread.currentThread().getName()); - _logger.debug("exceptionReceived is ", cause); + if (_logger.isDebugEnabled()) + { + _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause); + } final JMSException je; if (cause instanceof JMSException) @@ -1012,7 +1020,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - _logger.info("Closing AMQConnection due to :" + cause.getMessage()); + if (_logger.isInfoEnabled()) + { + _logger.info("Closing AMQConnection due to :" + cause.getMessage()); + } _closed.set(true); closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 7ab26f3b47..dc2ffc38c4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1629,9 +1629,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_logger.isDebugEnabled()) { - _logger.debug("Message received in session with channel id " + _channelId); + _logger.debug("Message[" + (message.getDeliverBody() == null ? + "B:" + message.getBounceBody() : "D:" + message.getDeliverBody()) + + "] received in session with channel id " + _channelId); } + startDistpatcherIfNecessary(); + _queue.add(message); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index 57d987712a..9c8e9188ec 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -60,36 +60,41 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); AMQShortString reason = method.replyText; - // TODO: check whether channel id of zero is appropriate - // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, method.getMajor(), method.getMinor())); - - if (errorCode != AMQConstant.REPLY_SUCCESS) + try { - if(errorCode == AMQConstant.NOT_ALLOWED) + // TODO: check whether channel id of zero is appropriate + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short) 0, method.getMajor(), method.getMinor())); + + if (errorCode != AMQConstant.REPLY_SUCCESS) { - _logger.info("Authentication Error:"+Thread.currentThread().getName()); + if (errorCode == AMQConstant.NOT_ALLOWED) + { + _logger.info("Authentication Error:" + Thread.currentThread().getName()); - protocolSession.closeProtocolSession(); + protocolSession.closeProtocolSession(); - //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state. - stateManager.changeState(AMQState.CONNECTION_NOT_STARTED); + //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state. + stateManager.changeState(AMQState.CONNECTION_NOT_STARTED); - throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString()); - } - else - { - _logger.info("Connection close received with error code " + errorCode); + throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString()); + } + else + { + _logger.info("Connection close received with error code " + errorCode); - throw new AMQConnectionClosedException(errorCode, "Error: " + reason); + throw new AMQConnectionClosedException(errorCode, "Error: " + reason); + } } } + finally + { + // this actually closes the connection in the case where it is not an error. - // this actually closes the connection in the case where it is not an error. - - protocolSession.closeProtocolSession(); + protocolSession.closeProtocolSession(); - stateManager.changeState(AMQState.CONNECTION_CLOSED); + stateManager.changeState(AMQState.CONNECTION_CLOSED); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 09dd6ecfff..825baf95d1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -59,25 +59,22 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; /** - * The state manager is responsible for managing the state of the protocol session. - * <p/> - * For each AMQProtocolHandler there is a separate state manager. + * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler + * there is a separate state manager. */ public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); private AMQProtocolSession _protocolSession; - /** - * The current state - */ + /** The current state */ private AMQState _currentState; /** - * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. - * The class must be a subclass of AMQFrame. + * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of + * AMQFrame. */ - private final Map _state2HandlersMap = new HashMap(); + protected final Map _state2HandlersMap = new HashMap(); private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet(); private final Object _stateLock = new Object(); @@ -87,7 +84,7 @@ public class AMQStateManager implements AMQMethodListener { this(null); } - + public AMQStateManager(AMQProtocolSession protocolSession) { @@ -98,7 +95,7 @@ public class AMQStateManager implements AMQMethodListener { _protocolSession = protocolSession; _currentState = state; - if(register) + if (register) { registerListeners(); } @@ -194,7 +191,7 @@ public class AMQStateManager implements AMQMethodListener final Class clazz = frame.getClass(); if (_logger.isDebugEnabled()) { - _logger.debug("Looking for state transition handler for frame " + clazz); + _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + clazz); } final Map classToHandlerMap = (Map) _state2HandlersMap.get(currentState); @@ -228,12 +225,12 @@ public class AMQStateManager implements AMQMethodListener public void attainState(final AMQState s) throws AMQException { - synchronized(_stateLock) + synchronized (_stateLock) { final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME; long waitTime = MAXIMUM_STATE_WAIT_TIME; - while(_currentState != s && waitTime > 0) + while (_currentState != s && waitTime > 0) { try { @@ -243,12 +240,12 @@ public class AMQStateManager implements AMQMethodListener { _logger.warn("Thread interrupted"); } - if(_currentState != s) + if (_currentState != s) { waitTime = waitUntilTime - System.currentTimeMillis(); } } - if(_currentState != s) + if (_currentState != s) { _logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java new file mode 100644 index 0000000000..efbc2b9c78 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.client.channelclose; + +import junit.framework.TestCase; + +import javax.jms.Connection; +import javax.jms.Session; + +import javax.jms.JMSException; +import javax.jms.ExceptionListener; +import javax.jms.MessageProducer; +import javax.jms.MessageConsumer; +import javax.jms.Message; +import javax.jms.TextMessage; +import javax.jms.Queue; + +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; +import org.apache.qpid.AMQDisconnectedException; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.log4j.Logger; + +public class ChannelCloseTest extends TestCase implements ExceptionListener, ConnectionListener +{ + private static final Logger _logger = Logger.getLogger(ChannelCloseTest.class); + + Connection _connection; + private String _brokerlist = "vm://:1"; + private Session _session; + private static final long SYNC_TIMEOUT = 500; + private int TEST = 0; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + /* + close channel, use chanel with same id ensure error. + */ + public void testReusingChannelAfterFullClosure() + { + _connection = newConnection(); + + //Create Producer + try + { + _connection.start(); + + createChannelAndTest(1); + + // Cause it to close + try + { + _logger.info("Testing invalid exchange"); + declareExchange(1, "", "name_that_will_lookup_to_null", false); + fail("Exchange name is empty so this should fail "); + } + catch (AMQException e) + { + assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode()); + } + + // Check that + try + { + _logger.info("Testing valid exchange should fail"); + declareExchange(1, "topic", "amq.topic", false); + fail("This should not succeed as the channel should be closed "); + } + catch (AMQException e) + { + assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); + + _connection = newConnection(); + } + + checkSendingMessage(); + + _session.close(); + _connection.close(); + + } + catch (JMSException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /* + close channel and send guff then send ok no errors + */ + public void testSendingMethodsAfterClose() + { + try + { + _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + + _brokerlist + "'"); + + ((AMQConnection) _connection).setConnectionListener(this); + + + _connection.setExceptionListener(this); + + //Change the StateManager for one that doesn't respond with Close-OKs + AMQStateManager oldStateManager = ((AMQConnection) _connection).getProtocolHandler().getStateManager(); + + _session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + _connection.start(); + + //Test connection + checkSendingMessage(); + + //Set StateManager to manager that ignores Close-oks + AMQProtocolSession protocolSession = ((AMQConnection) _connection).getProtocolHandler().getProtocolSession(); + AMQStateManager newStateManager = new TestCloseStateManager(protocolSession); + newStateManager.changeState(oldStateManager.getCurrentState()); + + ((AMQConnection) _connection).getProtocolHandler().setStateManager(newStateManager); + + final int TEST_CHANNEL = 1; + _logger.info("Testing Channel(" + TEST_CHANNEL + ") Creation"); + + createChannelAndTest(TEST_CHANNEL); + + // Cause it to close + try + { + _logger.info("Closing Channel - invalid exchange"); + declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", false); + fail("Exchange name is empty so this should fail "); + } + catch (AMQException e) + { + assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode()); + } + + try + { + // Send other methods that should be ignored + // send them no wait as server will ignore them + _logger.info("Tested known exchange - should ignore"); + declareExchange(TEST_CHANNEL, "topic", "amq.topic", true); + + _logger.info("Tested known invalid exchange - should ignore"); + declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", true); + + _logger.info("Tested known invalid exchange - should ignore"); + declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", true); + + // Send sync .. server will igore and timy oue + _logger.info("Tested known invalid exchange - should ignore"); + declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", false); + } + catch (AMQTimeoutException te) + { + assertEquals("Request should timeout", AMQConstant.REQUEST_TIMEOUT, te.getErrorCode()); + } + catch (AMQException e) + { + fail("This should not fail as all requests should be ignored"); + } + + _logger.info("Sending Close"); + // Send Close-ok + sendClose(TEST_CHANNEL); + + _logger.info("Re-opening channel"); + + createChannelAndTest(TEST_CHANNEL); + + //Test connection is still ok + + checkSendingMessage(); + + } + catch (JMSException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + catch (AMQException e) + { + fail(e.getMessage()); + + } + catch (URLSyntaxException e) + { + fail(e.getMessage()); + } + finally + { + try + { + _session.close(); + _connection.close(); + } + catch (JMSException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + } + } + + private void createChannelAndTest(int channel) + { + //Create A channel + try + { + createChannel(channel); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + // Test it is ok + try + { + declareExchange(channel, "topic", "amq.topic", false); + _logger.info("Tested known exchange"); + } + catch (AMQException e) + { + fail("This should not fail as this is the default exchange details"); + } + } + + private void sendClose(int channel) + { + AMQFrame frame = ChannelCloseOkBody.createAMQFrame(channel, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion()); + + ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame); + } + + + private void checkSendingMessage() throws JMSException + { + TEST++; + _logger.info("Test creating producer which will use channel id 1"); + + Queue queue = _session.createQueue("CCT_test_validation_queue" + TEST); + + MessageConsumer consumer = _session.createConsumer(queue); + + MessageProducer producer = _session.createProducer(queue); + + final String MESSAGE = "CCT_Test_Message"; + producer.send(_session.createTextMessage(MESSAGE)); + + Message msg = consumer.receive(2000); + + assertNotNull("Received messages should not be null.", msg); + assertEquals("Message received not what we sent", MESSAGE, ((TextMessage) msg).getText()); + } + + private Connection newConnection() + { + AMQConnection connection = null; + try + { + connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + + _brokerlist + "'"); + + connection.setConnectionListener(this); + + _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + connection.start(); + + } + catch (JMSException e) + { + fail("Creating new connection when:"+e.getMessage()); + } + catch (AMQException e) + { + fail("Creating new connection when:"+e.getMessage()); + } + catch (URLSyntaxException e) + { + fail("Creating new connection when:"+e.getMessage()); + } + + + return connection; + } + + private void declareExchange(int channelId, String _type, String _name, boolean nowait) throws AMQException + { + AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(channelId, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), + null, // arguments + false, // autoDelete + false, // durable + new AMQShortString(_name), // exchange + false, // internal + nowait, // nowait + true, // passive + 0, // ticket + new AMQShortString(_type)); // type + + if (nowait) + { + ((AMQConnection) _connection).getProtocolHandler().writeFrame(exchangeDeclare); + } + else + { + ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT); + } + } + + private void createChannel(int channelId) throws AMQException + { + ((AMQConnection) _connection).getProtocolHandler().syncWrite( + ChannelOpenBody.createAMQFrame(channelId, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), + null), // outOfBand + ChannelOpenOkBody.class); + + } + + + public void onException(JMSException jmsException) + { + //_logger.info("CCT" + jmsException); + fail(jmsException.getMessage()); + } + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + + } + + public boolean preFailover(boolean redirect) + { + return false; + } + + public boolean preResubscribe() + { + return false; + } + + public void failoverComplete() + { + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestChannelCloseMethodHandlerNoCloseOk.java new file mode 100644 index 0000000000..69cbdd6a09 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestChannelCloseMethodHandlerNoCloseOk.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.client.channelclose; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ChannelCloseOkBody; + +public class TestChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(TestChannelCloseMethodHandlerNoCloseOk.class); + + private static TestChannelCloseMethodHandlerNoCloseOk _handler = new TestChannelCloseMethodHandlerNoCloseOk(); + + public static TestChannelCloseMethodHandlerNoCloseOk getInstance() + { + return _handler; + } + + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + { + _logger.debug("ChannelClose method received"); + ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); + + AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); + AMQShortString reason = method.replyText; + if (_logger.isDebugEnabled()) + { + _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); + } + + // For this test Method Handler .. don't send Close-OK +// // TODO: Be aware of possible changes to parameter order as versions change. +// AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor()); +// protocolSession.writeFrame(frame); + if (errorCode != AMQConstant.REPLY_SUCCESS) + { + _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason); + if (errorCode == AMQConstant.NO_CONSUMERS) + { + throw new AMQNoConsumersException("Error: " + reason, null); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + throw new AMQNoRouteException("Error: " + reason, null); + } + else if (errorCode == AMQConstant.INVALID_SELECTOR) + { + _logger.debug("Broker responded with Invalid Selector."); + + throw new AMQInvalidSelectorException(String.valueOf(reason)); + } + else if (errorCode == AMQConstant.INVALID_ROUTING_KEY) + { + _logger.debug("Broker responded with Invalid Routing Key."); + + throw new AMQInvalidRoutingKeyException(String.valueOf(reason)); + } + else + { + throw new AMQChannelClosedException(errorCode, "Error: " + reason); + } + + } + protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason)); + } +}
\ No newline at end of file diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestCloseStateManager.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestCloseStateManager.java new file mode 100644 index 0000000000..d643b467ea --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestCloseStateManager.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.client.channelclose; + +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.handler.ConnectionStartMethodHandler; +import org.apache.qpid.client.handler.ConnectionCloseMethodHandler; +import org.apache.qpid.client.handler.ConnectionTuneMethodHandler; +import org.apache.qpid.client.handler.ConnectionSecureMethodHandler; +import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler; +import org.apache.qpid.client.handler.ChannelCloseMethodHandler; +import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler; +import org.apache.qpid.client.handler.BasicDeliverMethodHandler; +import org.apache.qpid.client.handler.BasicReturnMethodHandler; +import org.apache.qpid.client.handler.BasicCancelOkMethodHandler; +import org.apache.qpid.client.handler.ChannelFlowOkMethodHandler; +import org.apache.qpid.client.handler.QueueDeleteOkMethodHandler; +import org.apache.qpid.client.handler.ExchangeBoundOkMethodHandler; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.BasicDeliverBody; +import org.apache.qpid.framing.BasicReturnBody; +import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.ExchangeBoundOkBody; + +import java.util.Map; +import java.util.HashMap; + +public class TestCloseStateManager extends AMQStateManager +{ + public TestCloseStateManager(AMQProtocolSession protocolSession) + { + super(protocolSession); + } + + protected void registerListeners() + { + Map frame2handlerMap = new HashMap(); + + // we need to register a map for the null (i.e. all state) handlers otherwise you get + // a stack overflow in the handler searching code when you present it with a frame for which + // no handlers are registered + // + _state2HandlersMap.put(null, frame2handlerMap); + + frame2handlerMap = new HashMap(); + frame2handlerMap.put(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance()); + frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); + _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap); + + frame2handlerMap = new HashMap(); + frame2handlerMap.put(ConnectionTuneBody.class, ConnectionTuneMethodHandler.getInstance()); + frame2handlerMap.put(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance()); + frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); + _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap); + + frame2handlerMap = new HashMap(); + frame2handlerMap.put(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance()); + frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); + _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap); + + // + // ConnectionOpen handlers + // + frame2handlerMap = new HashMap(); + // Use Test Handler for Close methods to not send Close-OKs + frame2handlerMap.put(ChannelCloseBody.class, TestChannelCloseMethodHandlerNoCloseOk.getInstance()); + + frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance()); + frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); + frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance()); + frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance()); + frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance()); + frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance()); + frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance()); + frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance()); + _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap); + } + + +} diff --git a/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java index 6af681f479..f1f973542d 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java @@ -20,10 +20,12 @@ */ package org.apache.qpid; +import org.apache.qpid.protocol.AMQConstant; + public class AMQTimeoutException extends AMQException { public AMQTimeoutException(String message) { - super(message); + super(AMQConstant.REQUEST_TIMEOUT, message); } } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index 05365de137..0c4736a348 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -90,6 +90,8 @@ public final class AMQConstant public static final AMQConstant IN_USE = new AMQConstant(406, "In use", true); + public static final AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "Request Timeout", true); + public static final AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true); public static final AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true); @@ -100,7 +102,7 @@ public final class AMQConstant public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true); - public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true); + public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true); public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true); |