summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-19 11:55:47 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-19 11:55:47 +0000
commite3ff96a57778ea2325551dff110e4bedf7e1d4d5 (patch)
treee77ab4b679d132a86bb206df8f730658a7c74b87
parent81fa6193269cf9f6cacd03966d21d7448126653a (diff)
downloadqpid-python-e3ff96a57778ea2325551dff110e4bedf7e1d4d5.tar.gz
QPID-372, QPID-376 Broker now ignores all frames for closing channels.
When a close-ok is received the channel can be reopened and used All uses of getChannel check the return type is not null and throw a NOT_FOUND AMQException. If the channel is not found during a method handler then the Channel will be closed. ChannelCloseHandler - Now throws a connection exception if trying to close a a non exisitant channel. AMQMinaProtocolSession - Added pre-check for closing channels to ignore all but Close-OK methods - Updated ChannelException method to close connection if the CE was a result of not having a valid channel. - Changed state to CLOSING when writing out a connection close frame. AMQConnection - Wrapped all _logging calls , Updated comment formatting AMQSession - called startDispatcherIfRequired when receiving a message as without it a producer will not get a returned message. This is because there is no consumer setup to consume. ConnectionCloseMethodHandler - Wrapped code in try finally so that the protocol session would always be closed correctly. AMQStateManager - Added state to the logging values Modified AMQTimeoutException to include a new constant value to identify the failure reason. AMQConstant - Added 408 REQUEST_TIMEOUT fixed error with NOT_ALLOWED value was 530 should be 507. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@509172 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java87
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java137
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java45
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java29
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java400
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestChannelCloseMethodHandlerNoCloseOk.java98
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestCloseStateManager.java108
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java4
11 files changed, 822 insertions, 112 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 8faf5eedde..9a8fce7129 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -26,9 +26,11 @@ import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.AMQChannel;
public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody>
{
@@ -51,11 +53,21 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
ChannelCloseBody body = evt.getMethod();
_logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
" and method " + body.methodId);
- session.closeChannel(evt.getChannelId());
+ int channelId = evt.getChannelId();
+
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel");
+ }
+
+ session.closeChannel(channelId);
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
+ AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 309fa4663a..2de32c2f0f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -56,9 +56,11 @@ import org.apache.qpid.framing.MainRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.framing.VersionSpecificRegistry;
+import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -254,12 +256,36 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Method frame received: " + frame);
}
+
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(),
(AMQMethodBody) frame.getBodyFrame());
+
+ //Check that this channel is not closing
+ if (channelAwaitingClosure(frame.getChannel()))
+ {
+ if ((evt.getMethod() instanceof ChannelCloseOkBody))
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + frame.getChannel() + "] awaiting closure - processing close-ok");
+ }
+ }
+ else
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + frame.getChannel() + "] awaiting closure ignoring");
+ }
+ return;
+ }
+ }
+
+
try
{
try
{
+
boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
if (!_frameListeners.isEmpty())
@@ -277,14 +303,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
catch (AMQChannelException e)
{
- _logger.error("Closing channel due to: " + e.getMessage());
- writeFrame(e.getCloseFrame(frame.getChannel()));
- closeChannel(frame.getChannel());
+ if (getChannel(frame.getChannel()) != null)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing channel due to: " + e.getMessage());
+ }
+ writeFrame(e.getCloseFrame(frame.getChannel()));
+ closeChannel(frame.getChannel());
+ }
+ else
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage());
+ }
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + e.getMessage());
+ }
+ closeSession();
+
+ AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+ AMQConstant.CHANNEL_ERROR.getName().toString());
+
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(ce.getCloseFrame(frame.getChannel()));
+ }
}
catch (AMQConnectionException e)
{
- _logger.error("Closing connection due to: " + e.getMessage());
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + e.getMessage());
+ }
closeSession();
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(e.getCloseFrame(frame.getChannel()));
}
}
@@ -325,8 +379,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content header frame received: " + frame);
}
- //fixme what happens if getChannel returns null
- getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame());
+
+ AMQChannel channel = getChannel(frame.getChannel());
+
+ if (channel == null)
+ {
+ throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel());
+ }
+ else
+ {
+ channel.publishContentHeader((ContentHeaderBody) frame.getBodyFrame());
+ }
}
private void contentBodyReceived(AMQFrame frame) throws AMQException
@@ -335,8 +398,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content body frame received: " + frame);
}
- //fixme what happens if getChannel returns null
- getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this);
+ AMQChannel channel = getChannel(frame.getChannel());
+
+ if (channel == null)
+ {
+ throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel());
+ }
+ else
+ {
+ channel.publishContentBody((ContentBody) frame.getBodyFrame(), this);
+ }
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 2030876952..adf2a4bda2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -70,20 +70,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AtomicInteger _idFactory = new AtomicInteger(0);
/**
- * This is the "root" mutex that must be held when doing anything that could be impacted by failover.
- * This must be held by any child objects of this connection such as the session, producers and consumers.
+ * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
+ * held by any child objects of this connection such as the session, producers and consumers.
*/
private final Object _failoverMutex = new Object();
/**
- * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels
- * per session and we must prevent the client from opening too many. Zero means unlimited.
+ * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
+ * and we must prevent the client from opening too many. Zero means unlimited.
*/
private long _maximumChannelCount;
- /**
- * The maximum size of frame supported by the server
- */
+ /** The maximum size of frame supported by the server */
private long _maximumFrameSize;
/**
@@ -93,26 +91,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
private AMQProtocolHandler _protocolHandler;
- /**
- * Maps from session id (Integer) to AMQSession instance
- */
+ /** Maps from session id (Integer) to AMQSession instance */
private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
private String _clientName;
- /**
- * The user name to use for authentication
- */
+ /** The user name to use for authentication */
private String _username;
- /**
- * The password to use for authentication
- */
+ /** The password to use for authentication */
private String _password;
- /**
- * The virtual path to connect to on the AMQ server
- */
+ /** The virtual path to connect to on the AMQ server */
private String _virtualHost;
private ExceptionListener _exceptionListener;
@@ -122,14 +112,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private ConnectionURL _connectionURL;
/**
- * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for
- * message publication.
+ * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
+ * publication.
*/
private boolean _started;
- /**
- * Policy dictating how to failover
- */
+ /** Policy dictating how to failover */
private FailoverPolicy _failoverPolicy;
/*
@@ -148,9 +136,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
private QpidConnectionMetaData _connectionMetaData;
- /**
- * Configuration info for SSL
- */
+ /** Configuration info for SSL */
private SSLConfiguration _sslConfiguration;
private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
@@ -164,6 +150,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* @param password password
* @param clientName clientid
* @param virtualHost virtualhost
+ *
* @throws AMQException
* @throws URLSyntaxException
*/
@@ -182,6 +169,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* @param password password
* @param clientName clientid
* @param virtualHost virtualhost
+ *
* @throws AMQException
* @throws URLSyntaxException
*/
@@ -238,7 +226,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
- _logger.info("Connection:" + connectionURL);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Connection:" + connectionURL);
+ }
_sslConfiguration = sslConfig;
if (connectionURL == null)
{
@@ -297,11 +288,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
lastException = e;
- _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause());
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause());
+ }
}
}
- _logger.debug("Are we connected:" + _connected);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Are we connected:" + _connected);
+ }
if (!_connected)
{
@@ -402,7 +399,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
catch (Exception e)
{
- _logger.info("Unable to connect to broker at " + bd);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Unable to connect to broker at " + bd);
+ }
attemptReconnection();
}
return false;
@@ -421,11 +421,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (!(e instanceof AMQException))
{
- _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
+ }
}
else
{
- _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails());
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails());
+ }
}
}
}
@@ -437,8 +443,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/**
* Get the details of the currently active broker
*
- * @return null if no broker is active (i.e. no successful connection has been made, or
- * the BrokerDetail instance otherwise
+ * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance
+ * otherwise
*/
public BrokerDetails getActiveBrokerDetails()
{
@@ -593,12 +599,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions
- * where specified in the JMS spec
+ * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in
+ * the JMS spec
*
* @param transacted
* @param acknowledgeMode
+ *
* @return QueueSession
+ *
* @throws JMSException
*/
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
@@ -607,12 +615,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions
- * where specified in the JMS spec
+ * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in
+ * the JMS spec
*
* @param transacted
* @param acknowledgeMode
+ *
* @return TopicSession
+ *
* @throws JMSException
*/
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
@@ -718,11 +728,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Marks all sessions and their children as closed without sending any protocol messages. Useful when
- * you need to mark objects "visible" in userland as closed after failover or other significant event that
- * impacts the connection.
- * <p/>
- * The caller must hold the failover mutex before calling this method.
+ * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
+ * mark objects "visible" in userland as closed after failover or other significant event that impacts the
+ * connection. <p/> The caller must hold the failover mutex before calling this method.
*/
private void markAllSessionsClosed()
{
@@ -740,9 +748,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/**
* Close all the sessions, either due to normal connection closure or due to an error occurring.
*
- * @param cause if not null, the error that is causing this shutdown
- * <p/>
- * The caller must hold the failover mutex before calling this method.
+ * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex
+ * before calling this method.
*/
private void closeAllSessions(Throwable cause, long timeout) throws JMSException
{
@@ -891,6 +898,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* Fire the preFailover event to the registered connection listener (if any)
*
* @param redirect true if this is the result of a redirect request rather than a connection error
+ *
* @return true if no listener or listener does not veto change
*/
public boolean firePreFailover(boolean redirect)
@@ -904,10 +912,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Fire the preResubscribe event to the registered connection listener (if any). If the listener
- * vetoes resubscription then all the sessions are closed.
+ * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes
+ * resubscription then all the sessions are closed.
*
* @return true if no listener or listener does not veto resubscription.
+ *
* @throws JMSException
*/
public boolean firePreResubscribe() throws JMSException
@@ -927,9 +936,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- /**
- * Fires a failover complete event to the registered connection listener (if any).
- */
+ /** Fires a failover complete event to the registered connection listener (if any). */
public void fireFailoverComplete()
{
if (_connectionListener != null)
@@ -939,8 +946,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * In order to protect the consistency of the connection and its child sessions, consumers and producers,
- * the "failover mutex" must be held when doing any operations that could be corrupted during failover.
+ * In order to protect the consistency of the connection and its child sessions, consumers and producers, the
+ * "failover mutex" must be held when doing any operations that could be corrupted during failover.
*
* @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs.
*/
@@ -950,8 +957,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * If failover is taking place this will block until it has completed. If failover
- * is not taking place it will return immediately.
+ * If failover is taking place this will block until it has completed. If failover is not taking place it will
+ * return immediately.
*
* @throws InterruptedException
*/
@@ -961,18 +968,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Invoked by the AMQProtocolSession when a protocol session exception has occurred.
- * This method sends the exception to a JMS exception listener, if configured, and
- * propagates the exception to sessions, which in turn will propagate to consumers.
- * This allows synchronous consumers to have exceptions thrown to them.
+ * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
+ * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
+ * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
*
* @param cause the exception
*/
public void exceptionReceived(Throwable cause)
{
- _logger.debug("Connection Close done by:" + Thread.currentThread().getName());
- _logger.debug("exceptionReceived is ", cause);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause);
+ }
final JMSException je;
if (cause instanceof JMSException)
@@ -1012,7 +1020,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
- _logger.info("Closing AMQConnection due to :" + cause.getMessage());
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing AMQConnection due to :" + cause.getMessage());
+ }
_closed.set(true);
closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 7ab26f3b47..dc2ffc38c4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1629,9 +1629,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Message received in session with channel id " + _channelId);
+ _logger.debug("Message[" + (message.getDeliverBody() == null ?
+ "B:" + message.getBounceBody() : "D:" + message.getDeliverBody())
+ + "] received in session with channel id " + _channelId);
}
+ startDistpatcherIfNecessary();
+
_queue.add(message);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index 57d987712a..9c8e9188ec 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -60,36 +60,41 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
AMQShortString reason = method.replyText;
- // TODO: check whether channel id of zero is appropriate
- // Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, method.getMajor(), method.getMinor()));
-
- if (errorCode != AMQConstant.REPLY_SUCCESS)
+ try
{
- if(errorCode == AMQConstant.NOT_ALLOWED)
+ // TODO: check whether channel id of zero is appropriate
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short) 0, method.getMajor(), method.getMinor()));
+
+ if (errorCode != AMQConstant.REPLY_SUCCESS)
{
- _logger.info("Authentication Error:"+Thread.currentThread().getName());
+ if (errorCode == AMQConstant.NOT_ALLOWED)
+ {
+ _logger.info("Authentication Error:" + Thread.currentThread().getName());
- protocolSession.closeProtocolSession();
+ protocolSession.closeProtocolSession();
- //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
- stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
+ //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
+ stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
- throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString());
- }
- else
- {
- _logger.info("Connection close received with error code " + errorCode);
+ throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString());
+ }
+ else
+ {
+ _logger.info("Connection close received with error code " + errorCode);
- throw new AMQConnectionClosedException(errorCode, "Error: " + reason);
+ throw new AMQConnectionClosedException(errorCode, "Error: " + reason);
+ }
}
}
+ finally
+ {
+ // this actually closes the connection in the case where it is not an error.
- // this actually closes the connection in the case where it is not an error.
-
- protocolSession.closeProtocolSession();
+ protocolSession.closeProtocolSession();
- stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 09dd6ecfff..825baf95d1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -59,25 +59,22 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
/**
- * The state manager is responsible for managing the state of the protocol session.
- * <p/>
- * For each AMQProtocolHandler there is a separate state manager.
+ * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
+ * there is a separate state manager.
*/
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
private AMQProtocolSession _protocolSession;
- /**
- * The current state
- */
+ /** The current state */
private AMQState _currentState;
/**
- * Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
- * The class must be a subclass of AMQFrame.
+ * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
+ * AMQFrame.
*/
- private final Map _state2HandlersMap = new HashMap();
+ protected final Map _state2HandlersMap = new HashMap();
private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
private final Object _stateLock = new Object();
@@ -87,7 +84,7 @@ public class AMQStateManager implements AMQMethodListener
{
this(null);
}
-
+
public AMQStateManager(AMQProtocolSession protocolSession)
{
@@ -98,7 +95,7 @@ public class AMQStateManager implements AMQMethodListener
{
_protocolSession = protocolSession;
_currentState = state;
- if(register)
+ if (register)
{
registerListeners();
}
@@ -194,7 +191,7 @@ public class AMQStateManager implements AMQMethodListener
final Class clazz = frame.getClass();
if (_logger.isDebugEnabled())
{
- _logger.debug("Looking for state transition handler for frame " + clazz);
+ _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + clazz);
}
final Map classToHandlerMap = (Map) _state2HandlersMap.get(currentState);
@@ -228,12 +225,12 @@ public class AMQStateManager implements AMQMethodListener
public void attainState(final AMQState s) throws AMQException
{
- synchronized(_stateLock)
+ synchronized (_stateLock)
{
final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
long waitTime = MAXIMUM_STATE_WAIT_TIME;
- while(_currentState != s && waitTime > 0)
+ while (_currentState != s && waitTime > 0)
{
try
{
@@ -243,12 +240,12 @@ public class AMQStateManager implements AMQMethodListener
{
_logger.warn("Thread interrupted");
}
- if(_currentState != s)
+ if (_currentState != s)
{
waitTime = waitUntilTime - System.currentTimeMillis();
}
}
- if(_currentState != s)
+ if (_currentState != s)
{
_logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
new file mode 100644
index 0000000000..efbc2b9c78
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import junit.framework.TestCase;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import javax.jms.JMSException;
+import javax.jms.ExceptionListener;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.Queue;
+
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.log4j.Logger;
+
+public class ChannelCloseTest extends TestCase implements ExceptionListener, ConnectionListener
+{
+ private static final Logger _logger = Logger.getLogger(ChannelCloseTest.class);
+
+ Connection _connection;
+ private String _brokerlist = "vm://:1";
+ private Session _session;
+ private static final long SYNC_TIMEOUT = 500;
+ private int TEST = 0;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+
+ /*
+ close channel, use chanel with same id ensure error.
+ */
+ public void testReusingChannelAfterFullClosure()
+ {
+ _connection = newConnection();
+
+ //Create Producer
+ try
+ {
+ _connection.start();
+
+ createChannelAndTest(1);
+
+ // Cause it to close
+ try
+ {
+ _logger.info("Testing invalid exchange");
+ declareExchange(1, "", "name_that_will_lookup_to_null", false);
+ fail("Exchange name is empty so this should fail ");
+ }
+ catch (AMQException e)
+ {
+ assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
+ }
+
+ // Check that
+ try
+ {
+ _logger.info("Testing valid exchange should fail");
+ declareExchange(1, "topic", "amq.topic", false);
+ fail("This should not succeed as the channel should be closed ");
+ }
+ catch (AMQException e)
+ {
+ assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
+
+ _connection = newConnection();
+ }
+
+ checkSendingMessage();
+
+ _session.close();
+ _connection.close();
+
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /*
+ close channel and send guff then send ok no errors
+ */
+ public void testSendingMethodsAfterClose()
+ {
+ try
+ {
+ _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='"
+ + _brokerlist + "'");
+
+ ((AMQConnection) _connection).setConnectionListener(this);
+
+
+ _connection.setExceptionListener(this);
+
+ //Change the StateManager for one that doesn't respond with Close-OKs
+ AMQStateManager oldStateManager = ((AMQConnection) _connection).getProtocolHandler().getStateManager();
+
+ _session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ _connection.start();
+
+ //Test connection
+ checkSendingMessage();
+
+ //Set StateManager to manager that ignores Close-oks
+ AMQProtocolSession protocolSession = ((AMQConnection) _connection).getProtocolHandler().getProtocolSession();
+ AMQStateManager newStateManager = new TestCloseStateManager(protocolSession);
+ newStateManager.changeState(oldStateManager.getCurrentState());
+
+ ((AMQConnection) _connection).getProtocolHandler().setStateManager(newStateManager);
+
+ final int TEST_CHANNEL = 1;
+ _logger.info("Testing Channel(" + TEST_CHANNEL + ") Creation");
+
+ createChannelAndTest(TEST_CHANNEL);
+
+ // Cause it to close
+ try
+ {
+ _logger.info("Closing Channel - invalid exchange");
+ declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", false);
+ fail("Exchange name is empty so this should fail ");
+ }
+ catch (AMQException e)
+ {
+ assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
+ }
+
+ try
+ {
+ // Send other methods that should be ignored
+ // send them no wait as server will ignore them
+ _logger.info("Tested known exchange - should ignore");
+ declareExchange(TEST_CHANNEL, "topic", "amq.topic", true);
+
+ _logger.info("Tested known invalid exchange - should ignore");
+ declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", true);
+
+ _logger.info("Tested known invalid exchange - should ignore");
+ declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", true);
+
+ // Send sync .. server will igore and timy oue
+ _logger.info("Tested known invalid exchange - should ignore");
+ declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", false);
+ }
+ catch (AMQTimeoutException te)
+ {
+ assertEquals("Request should timeout", AMQConstant.REQUEST_TIMEOUT, te.getErrorCode());
+ }
+ catch (AMQException e)
+ {
+ fail("This should not fail as all requests should be ignored");
+ }
+
+ _logger.info("Sending Close");
+ // Send Close-ok
+ sendClose(TEST_CHANNEL);
+
+ _logger.info("Re-opening channel");
+
+ createChannelAndTest(TEST_CHANNEL);
+
+ //Test connection is still ok
+
+ checkSendingMessage();
+
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+
+ }
+ catch (URLSyntaxException e)
+ {
+ fail(e.getMessage());
+ }
+ finally
+ {
+ try
+ {
+ _session.close();
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ }
+
+ private void createChannelAndTest(int channel)
+ {
+ //Create A channel
+ try
+ {
+ createChannel(channel);
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+
+ // Test it is ok
+ try
+ {
+ declareExchange(channel, "topic", "amq.topic", false);
+ _logger.info("Tested known exchange");
+ }
+ catch (AMQException e)
+ {
+ fail("This should not fail as this is the default exchange details");
+ }
+ }
+
+ private void sendClose(int channel)
+ {
+ AMQFrame frame = ChannelCloseOkBody.createAMQFrame(channel,
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion());
+
+ ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame);
+ }
+
+
+ private void checkSendingMessage() throws JMSException
+ {
+ TEST++;
+ _logger.info("Test creating producer which will use channel id 1");
+
+ Queue queue = _session.createQueue("CCT_test_validation_queue" + TEST);
+
+ MessageConsumer consumer = _session.createConsumer(queue);
+
+ MessageProducer producer = _session.createProducer(queue);
+
+ final String MESSAGE = "CCT_Test_Message";
+ producer.send(_session.createTextMessage(MESSAGE));
+
+ Message msg = consumer.receive(2000);
+
+ assertNotNull("Received messages should not be null.", msg);
+ assertEquals("Message received not what we sent", MESSAGE, ((TextMessage) msg).getText());
+ }
+
+ private Connection newConnection()
+ {
+ AMQConnection connection = null;
+ try
+ {
+ connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='"
+ + _brokerlist + "'");
+
+ connection.setConnectionListener(this);
+
+ _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ connection.start();
+
+ }
+ catch (JMSException e)
+ {
+ fail("Creating new connection when:"+e.getMessage());
+ }
+ catch (AMQException e)
+ {
+ fail("Creating new connection when:"+e.getMessage());
+ }
+ catch (URLSyntaxException e)
+ {
+ fail("Creating new connection when:"+e.getMessage());
+ }
+
+
+ return connection;
+ }
+
+ private void declareExchange(int channelId, String _type, String _name, boolean nowait) throws AMQException
+ {
+ AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(channelId,
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(),
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ new AMQShortString(_name), // exchange
+ false, // internal
+ nowait, // nowait
+ true, // passive
+ 0, // ticket
+ new AMQShortString(_type)); // type
+
+ if (nowait)
+ {
+ ((AMQConnection) _connection).getProtocolHandler().writeFrame(exchangeDeclare);
+ }
+ else
+ {
+ ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT);
+ }
+ }
+
+ private void createChannel(int channelId) throws AMQException
+ {
+ ((AMQConnection) _connection).getProtocolHandler().syncWrite(
+ ChannelOpenBody.createAMQFrame(channelId,
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(),
+ null), // outOfBand
+ ChannelOpenOkBody.class);
+
+ }
+
+
+ public void onException(JMSException jmsException)
+ {
+ //_logger.info("CCT" + jmsException);
+ fail(jmsException.getMessage());
+ }
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return false;
+ }
+
+ public boolean preResubscribe()
+ {
+ return false;
+ }
+
+ public void failoverComplete()
+ {
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestChannelCloseMethodHandlerNoCloseOk.java
new file mode 100644
index 0000000000..69cbdd6a09
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestChannelCloseMethodHandlerNoCloseOk.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+
+public class TestChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(TestChannelCloseMethodHandlerNoCloseOk.class);
+
+ private static TestChannelCloseMethodHandlerNoCloseOk _handler = new TestChannelCloseMethodHandlerNoCloseOk();
+
+ public static TestChannelCloseMethodHandlerNoCloseOk getInstance()
+ {
+ return _handler;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ {
+ _logger.debug("ChannelClose method received");
+ ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+
+ AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
+ AMQShortString reason = method.replyText;
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
+ }
+
+ // For this test Method Handler .. don't send Close-OK
+// // TODO: Be aware of possible changes to parameter order as versions change.
+// AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor());
+// protocolSession.writeFrame(frame);
+ if (errorCode != AMQConstant.REPLY_SUCCESS)
+ {
+ _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason);
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ throw new AMQNoConsumersException("Error: " + reason, null);
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ throw new AMQNoRouteException("Error: " + reason, null);
+ }
+ else if (errorCode == AMQConstant.INVALID_SELECTOR)
+ {
+ _logger.debug("Broker responded with Invalid Selector.");
+
+ throw new AMQInvalidSelectorException(String.valueOf(reason));
+ }
+ else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
+ {
+ _logger.debug("Broker responded with Invalid Routing Key.");
+
+ throw new AMQInvalidRoutingKeyException(String.valueOf(reason));
+ }
+ else
+ {
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
+ }
+
+ }
+ protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+ }
+} \ No newline at end of file
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestCloseStateManager.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestCloseStateManager.java
new file mode 100644
index 0000000000..d643b467ea
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestCloseStateManager.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.handler.ConnectionStartMethodHandler;
+import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
+import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
+import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
+import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
+import org.apache.qpid.client.handler.ChannelCloseMethodHandler;
+import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler;
+import org.apache.qpid.client.handler.BasicDeliverMethodHandler;
+import org.apache.qpid.client.handler.BasicReturnMethodHandler;
+import org.apache.qpid.client.handler.BasicCancelOkMethodHandler;
+import org.apache.qpid.client.handler.ChannelFlowOkMethodHandler;
+import org.apache.qpid.client.handler.QueueDeleteOkMethodHandler;
+import org.apache.qpid.client.handler.ExchangeBoundOkMethodHandler;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.BasicReturnBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+
+import java.util.Map;
+import java.util.HashMap;
+
+public class TestCloseStateManager extends AMQStateManager
+{
+ public TestCloseStateManager(AMQProtocolSession protocolSession)
+ {
+ super(protocolSession);
+ }
+
+ protected void registerListeners()
+ {
+ Map frame2handlerMap = new HashMap();
+
+ // we need to register a map for the null (i.e. all state) handlers otherwise you get
+ // a stack overflow in the handler searching code when you present it with a frame for which
+ // no handlers are registered
+ //
+ _state2HandlersMap.put(null, frame2handlerMap);
+
+ frame2handlerMap = new HashMap();
+ frame2handlerMap.put(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance());
+ frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
+ _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
+
+ frame2handlerMap = new HashMap();
+ frame2handlerMap.put(ConnectionTuneBody.class, ConnectionTuneMethodHandler.getInstance());
+ frame2handlerMap.put(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance());
+ frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
+ _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
+
+ frame2handlerMap = new HashMap();
+ frame2handlerMap.put(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance());
+ frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
+ _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap);
+
+ //
+ // ConnectionOpen handlers
+ //
+ frame2handlerMap = new HashMap();
+ // Use Test Handler for Close methods to not send Close-OKs
+ frame2handlerMap.put(ChannelCloseBody.class, TestChannelCloseMethodHandlerNoCloseOk.getInstance());
+
+ frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
+ frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
+ frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
+ frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
+ frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
+ frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
+ frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
+ frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
+ _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
+ }
+
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
index 6af681f479..f1f973542d 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
@@ -20,10 +20,12 @@
*/
package org.apache.qpid;
+import org.apache.qpid.protocol.AMQConstant;
+
public class AMQTimeoutException extends AMQException
{
public AMQTimeoutException(String message)
{
- super(message);
+ super(AMQConstant.REQUEST_TIMEOUT, message);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
index 05365de137..0c4736a348 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
@@ -90,6 +90,8 @@ public final class AMQConstant
public static final AMQConstant IN_USE = new AMQConstant(406, "In use", true);
+ public static final AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "Request Timeout", true);
+
public static final AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true);
public static final AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true);
@@ -100,7 +102,7 @@ public final class AMQConstant
public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true);
- public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true);
+ public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true);
public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true);