summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-19 13:48:17 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-19 13:48:17 +0000
commitc24ca9c39287a58b3a5692e0744423fc88781906 (patch)
tree496c32352e26d69e76b0bd52226d3f855b4f2523
parent65e0321d1b0145dc0d68a324e5a7cda661d5754b (diff)
downloadqpid-python-c24ca9c39287a58b3a5692e0744423fc88781906.tar.gz
QPID-379 Bounced Messages do not appear in connection exception listener.
The previous commit that started the Dispatcher was wrong and caused a lot of failures. This will address that problem by providing a thread pool on the client connection object to deliver bounced messages to the exception handler. Tidied up MessageListenerTests so all the asserts are in the given test. Renamed TestChannelCloseMethodHandlerNoCloseOk as surefire picks it up as a test case. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@509202 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java45
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java101
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java31
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java43
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java96
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java3
6 files changed, 226 insertions, 93 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index adf2a4bda2..1fb1c51890 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -62,6 +62,9 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
@@ -144,6 +147,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
+ private final ExecutorService _taskPool = Executors.newCachedThreadPool();
+
/**
* @param broker brokerdetails
* @param username username
@@ -716,8 +722,31 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
+ long startCloseTime = System.currentTimeMillis();
+
+ _taskPool.shutdown();
closeAllSessions(null, timeout);
+
+ if (!_taskPool.isTerminated())
+ {
+ try
+ {
+ //adjust timeout
+ long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+
+ _taskPool.awaitTermination(taskPoolTimeout , TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("Interrupted while shutting down connection thread pool.");
+ }
+ }
+
+ //adjust timeout
+ timeout = adjustTimeout(timeout, startCloseTime);
+
_protocolHandler.closeConnection(timeout);
+
}
catch (AMQException e)
{
@@ -727,6 +756,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
+ private long adjustTimeout(long timeout, long startTime)
+ {
+ long now = System.currentTimeMillis();
+ timeout -= now - startTime;
+ if (timeout < 0)
+ {
+ timeout = 0;
+ }
+ return timeout;
+ }
+
/**
* Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
* mark objects "visible" in userland as closed after failover or other significant event that impacts the
@@ -1147,4 +1187,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_temporaryQueueExchangeName = temporaryQueueExchangeName;
}
+
+ public void performConnectionTask(Runnable task)
+ {
+ _taskPool.execute(task);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index dc2ffc38c4..fe77acfabc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -72,7 +72,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
@@ -192,7 +191,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _hasMessageListeners;
-
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
@@ -277,42 +275,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- else
- {
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
- false,
- message.getBounceBody().exchange,
- message.getBounceBody().routingKey,
- message.getContentHeader(),
- message.getBodies());
-
- AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
- AMQShortString reason = message.getBounceBody().replyText;
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
- //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
- }
- else
- {
- _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
- }
-
- }
- catch (Exception e)
- {
- _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
- }
- }
}
public void close()
@@ -1384,7 +1346,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (topicName.indexOf('/') == -1)
{
- return new AMQTopic(getDefaultTopicExchangeName(),new AMQShortString(topicName));
+ return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
}
else
{
@@ -1474,8 +1436,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// if the queue is bound to the exchange but NOT for this topic, then the JMS spec
// says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(),dest.getAMQQueueName()) &&
- !isQueueBound(dest.getExchangeName(),dest.getAMQQueueName(), topicName))
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) &&
+ !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
{
deleteQueue(dest.getAMQQueueName());
}
@@ -1634,9 +1596,59 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
+ "] received in session with channel id " + _channelId);
}
- startDistpatcherIfNecessary();
+ if (message.getDeliverBody() == null)
+ {
+ // Return of the bounced message.
+ returnBouncedMessage(message);
+ }
+ else
+ {
+ _queue.add(message);
+ }
+ }
+
+ private void returnBouncedMessage(final UnprocessedMessage message)
+ {
+ _connection.performConnectionTask(
+ new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // Bounced message is processed here, away from the mina thread
+ AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
+ false,
+ message.getBounceBody().exchange,
+ message.getBounceBody().routingKey,
+ message.getContentHeader(),
+ message.getBodies());
+
+ AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
+ AMQShortString reason = message.getBounceBody().replyText;
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+ }
+ else
+ {
+ _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+ }
- _queue.add(message);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
+ }
+ }
+ });
}
/**
@@ -1882,7 +1894,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session");
}
- if(!(topic instanceof AMQTopic))
+ if (!(topic instanceof AMQTopic))
{
throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName());
}
@@ -1917,7 +1929,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
-
public int getTicket()
{
return _ticket;
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index 11a39df10f..6062528d43 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -42,16 +42,13 @@ import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
/**
- * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
- * <p/>
- * The message delivery process:
- * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
- * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start
- * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
- * session can run in any order and a synchronous put/poll will block the dispatcher).
- * <p/>
- * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
- * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
*/
public class MessageListenerMultiConsumerTest extends TestCase
{
@@ -66,7 +63,6 @@ public class MessageListenerMultiConsumerTest extends TestCase
private MessageConsumer _consumer1;
private MessageConsumer _consumer2;
- private boolean _testAsync;
private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock
protected void setUp() throws Exception
@@ -116,16 +112,10 @@ public class MessageListenerMultiConsumerTest extends TestCase
producerConnection.close();
- _testAsync = false;
}
protected void tearDown() throws Exception
{
- //Should have recieved all async messages
- if (_testAsync)
- {
- assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
- }
_clientConnection.close();
super.tearDown();
@@ -161,8 +151,6 @@ public class MessageListenerMultiConsumerTest extends TestCase
public void testAsynchronousRecieve() throws Exception
{
- _testAsync = true;
-
_consumer1.setMessageListener(new MessageListener()
{
public void onMessage(Message message)
@@ -173,7 +161,7 @@ public class MessageListenerMultiConsumerTest extends TestCase
if (receivedCount1 == MSG_COUNT / 2)
{
- _allMessagesSent.countDown();
+ _allMessagesSent.countDown();
}
}
@@ -198,13 +186,14 @@ public class MessageListenerMultiConsumerTest extends TestCase
try
{
- _allMessagesSent.await(2000, TimeUnit.MILLISECONDS);
+ _allMessagesSent.await(4000, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
//do nothing
}
+ assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
index 3e44888459..dc01005247 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
@@ -21,6 +21,8 @@
package org.apache.qpid.client;
import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -38,18 +40,17 @@ import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.AMQBindingURL;
/**
- * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
- * <p/>
- * The message delivery process:
- * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
- * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
- * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
- * session can run in any order and a synchronous put/poll will block the dispatcher).
- * <p/>
- * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
- * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
*/
public class MessageListenerTest extends TestCase implements MessageListener
{
@@ -61,7 +62,7 @@ public class MessageListenerTest extends TestCase implements MessageListener
private int receivedCount = 0;
private MessageConsumer _consumer;
private Connection _clientConnection;
- private boolean _testAsync;
+ private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT);
protected void setUp() throws Exception
{
@@ -71,9 +72,9 @@ public class MessageListenerTest extends TestCase implements MessageListener
InitialContextFactory factory = new PropertiesFileInitialContextFactory();
Hashtable<String, String> env = new Hashtable<String, String>();
-
+
env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'");
- env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+ env.put("queue.queue", "MessageListenerTest");
_context = factory.getInitialContext(env);
@@ -86,7 +87,6 @@ public class MessageListenerTest extends TestCase implements MessageListener
Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
_consumer = clientSession.createConsumer(queue);
//Create Producer
@@ -106,16 +106,10 @@ public class MessageListenerTest extends TestCase implements MessageListener
producerConnection.close();
- _testAsync = false;
}
protected void tearDown() throws Exception
{
- //Should have recieved all async messages
- if (_testAsync)
- {
- assertEquals(MSG_COUNT, receivedCount);
- }
_clientConnection.close();
super.tearDown();
@@ -125,7 +119,6 @@ public class MessageListenerTest extends TestCase implements MessageListener
public void testSynchronousRecieve() throws Exception
{
-
for (int msg = 0; msg < MSG_COUNT; msg++)
{
assertTrue(_consumer.receive(2000) != null);
@@ -134,21 +127,20 @@ public class MessageListenerTest extends TestCase implements MessageListener
public void testAsynchronousRecieve() throws Exception
{
- _testAsync = true;
-
_consumer.setMessageListener(this);
-
_logger.info("Waiting 3 seconds for messages");
try
{
- Thread.sleep(2000);
+ _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
//do nothing
}
+ //Should have recieved all async messages
+ assertEquals(MSG_COUNT, receivedCount);
}
@@ -157,6 +149,7 @@ public class MessageListenerTest extends TestCase implements MessageListener
_logger.info("Received Message(" + receivedCount + "):" + message);
receivedCount++;
+ _awaitMessages.countDown();
}
public static junit.framework.Test suite()
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
new file mode 100644
index 0000000000..e70196dff2
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.AMQShortString;
+
+public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandlerNoCloseOk.class);
+
+ private static ChannelCloseMethodHandlerNoCloseOk _handler = new ChannelCloseMethodHandlerNoCloseOk();
+
+ public static ChannelCloseMethodHandlerNoCloseOk getInstance()
+ {
+ return _handler;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ {
+ _logger.debug("ChannelClose method received");
+ ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+
+ AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
+ AMQShortString reason = method.replyText;
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
+ }
+
+ // For this test Method Handler .. don't send Close-OK
+// // TODO: Be aware of possible changes to parameter order as versions change.
+// AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor());
+// protocolSession.writeFrame(frame);
+ if (errorCode != AMQConstant.REPLY_SUCCESS)
+ {
+ _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason);
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ throw new AMQNoConsumersException("Error: " + reason, null);
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ throw new AMQNoRouteException("Error: " + reason, null);
+ }
+ else if (errorCode == AMQConstant.INVALID_SELECTOR)
+ {
+ _logger.debug("Broker responded with Invalid Selector.");
+
+ throw new AMQInvalidSelectorException(String.valueOf(reason));
+ }
+ else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
+ {
+ _logger.debug("Broker responded with Invalid Routing Key.");
+
+ throw new AMQInvalidRoutingKeyException(String.valueOf(reason));
+ }
+ else
+ {
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
+ }
+
+ }
+ protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+ }
+} \ No newline at end of file
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
index 1ed9750338..d128f30727 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
@@ -27,7 +27,6 @@ import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseMethodHandler;
import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler;
import org.apache.qpid.client.handler.BasicDeliverMethodHandler;
import org.apache.qpid.client.handler.BasicReturnMethodHandler;
@@ -91,7 +90,7 @@ public class NoCloseOKStateManager extends AMQStateManager
//
frame2handlerMap = new HashMap();
// Use Test Handler for Close methods to not send Close-OKs
- frame2handlerMap.put(ChannelCloseBody.class, TestChannelCloseMethodHandlerNoCloseOk.getInstance());
+ frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandlerNoCloseOk.getInstance());
frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());