summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java45
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java101
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java31
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java43
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java96
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java3
6 files changed, 226 insertions, 93 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index adf2a4bda2..1fb1c51890 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -62,6 +62,9 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
@@ -144,6 +147,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
+ private final ExecutorService _taskPool = Executors.newCachedThreadPool();
+
/**
* @param broker brokerdetails
* @param username username
@@ -716,8 +722,31 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
+ long startCloseTime = System.currentTimeMillis();
+
+ _taskPool.shutdown();
closeAllSessions(null, timeout);
+
+ if (!_taskPool.isTerminated())
+ {
+ try
+ {
+ //adjust timeout
+ long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+
+ _taskPool.awaitTermination(taskPoolTimeout , TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("Interrupted while shutting down connection thread pool.");
+ }
+ }
+
+ //adjust timeout
+ timeout = adjustTimeout(timeout, startCloseTime);
+
_protocolHandler.closeConnection(timeout);
+
}
catch (AMQException e)
{
@@ -727,6 +756,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
+ private long adjustTimeout(long timeout, long startTime)
+ {
+ long now = System.currentTimeMillis();
+ timeout -= now - startTime;
+ if (timeout < 0)
+ {
+ timeout = 0;
+ }
+ return timeout;
+ }
+
/**
* Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
* mark objects "visible" in userland as closed after failover or other significant event that impacts the
@@ -1147,4 +1187,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_temporaryQueueExchangeName = temporaryQueueExchangeName;
}
+
+ public void performConnectionTask(Runnable task)
+ {
+ _taskPool.execute(task);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index dc2ffc38c4..fe77acfabc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -72,7 +72,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
@@ -192,7 +191,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _hasMessageListeners;
-
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
@@ -277,42 +275,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- else
- {
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
- false,
- message.getBounceBody().exchange,
- message.getBounceBody().routingKey,
- message.getContentHeader(),
- message.getBodies());
-
- AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
- AMQShortString reason = message.getBounceBody().replyText;
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
- //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
- }
- else
- {
- _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
- }
-
- }
- catch (Exception e)
- {
- _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
- }
- }
}
public void close()
@@ -1384,7 +1346,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (topicName.indexOf('/') == -1)
{
- return new AMQTopic(getDefaultTopicExchangeName(),new AMQShortString(topicName));
+ return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
}
else
{
@@ -1474,8 +1436,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// if the queue is bound to the exchange but NOT for this topic, then the JMS spec
// says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(),dest.getAMQQueueName()) &&
- !isQueueBound(dest.getExchangeName(),dest.getAMQQueueName(), topicName))
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) &&
+ !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
{
deleteQueue(dest.getAMQQueueName());
}
@@ -1634,9 +1596,59 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
+ "] received in session with channel id " + _channelId);
}
- startDistpatcherIfNecessary();
+ if (message.getDeliverBody() == null)
+ {
+ // Return of the bounced message.
+ returnBouncedMessage(message);
+ }
+ else
+ {
+ _queue.add(message);
+ }
+ }
+
+ private void returnBouncedMessage(final UnprocessedMessage message)
+ {
+ _connection.performConnectionTask(
+ new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // Bounced message is processed here, away from the mina thread
+ AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
+ false,
+ message.getBounceBody().exchange,
+ message.getBounceBody().routingKey,
+ message.getContentHeader(),
+ message.getBodies());
+
+ AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
+ AMQShortString reason = message.getBounceBody().replyText;
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+ }
+ else
+ {
+ _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+ }
- _queue.add(message);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
+ }
+ }
+ });
}
/**
@@ -1882,7 +1894,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session");
}
- if(!(topic instanceof AMQTopic))
+ if (!(topic instanceof AMQTopic))
{
throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName());
}
@@ -1917,7 +1929,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
-
public int getTicket()
{
return _ticket;
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index 11a39df10f..6062528d43 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -42,16 +42,13 @@ import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
/**
- * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
- * <p/>
- * The message delivery process:
- * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
- * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start
- * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
- * session can run in any order and a synchronous put/poll will block the dispatcher).
- * <p/>
- * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
- * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
*/
public class MessageListenerMultiConsumerTest extends TestCase
{
@@ -66,7 +63,6 @@ public class MessageListenerMultiConsumerTest extends TestCase
private MessageConsumer _consumer1;
private MessageConsumer _consumer2;
- private boolean _testAsync;
private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock
protected void setUp() throws Exception
@@ -116,16 +112,10 @@ public class MessageListenerMultiConsumerTest extends TestCase
producerConnection.close();
- _testAsync = false;
}
protected void tearDown() throws Exception
{
- //Should have recieved all async messages
- if (_testAsync)
- {
- assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
- }
_clientConnection.close();
super.tearDown();
@@ -161,8 +151,6 @@ public class MessageListenerMultiConsumerTest extends TestCase
public void testAsynchronousRecieve() throws Exception
{
- _testAsync = true;
-
_consumer1.setMessageListener(new MessageListener()
{
public void onMessage(Message message)
@@ -173,7 +161,7 @@ public class MessageListenerMultiConsumerTest extends TestCase
if (receivedCount1 == MSG_COUNT / 2)
{
- _allMessagesSent.countDown();
+ _allMessagesSent.countDown();
}
}
@@ -198,13 +186,14 @@ public class MessageListenerMultiConsumerTest extends TestCase
try
{
- _allMessagesSent.await(2000, TimeUnit.MILLISECONDS);
+ _allMessagesSent.await(4000, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
//do nothing
}
+ assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
index 3e44888459..dc01005247 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
@@ -21,6 +21,8 @@
package org.apache.qpid.client;
import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -38,18 +40,17 @@ import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.AMQBindingURL;
/**
- * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
- * <p/>
- * The message delivery process:
- * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
- * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
- * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
- * session can run in any order and a synchronous put/poll will block the dispatcher).
- * <p/>
- * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
- * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
*/
public class MessageListenerTest extends TestCase implements MessageListener
{
@@ -61,7 +62,7 @@ public class MessageListenerTest extends TestCase implements MessageListener
private int receivedCount = 0;
private MessageConsumer _consumer;
private Connection _clientConnection;
- private boolean _testAsync;
+ private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT);
protected void setUp() throws Exception
{
@@ -71,9 +72,9 @@ public class MessageListenerTest extends TestCase implements MessageListener
InitialContextFactory factory = new PropertiesFileInitialContextFactory();
Hashtable<String, String> env = new Hashtable<String, String>();
-
+
env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'");
- env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+ env.put("queue.queue", "MessageListenerTest");
_context = factory.getInitialContext(env);
@@ -86,7 +87,6 @@ public class MessageListenerTest extends TestCase implements MessageListener
Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
_consumer = clientSession.createConsumer(queue);
//Create Producer
@@ -106,16 +106,10 @@ public class MessageListenerTest extends TestCase implements MessageListener
producerConnection.close();
- _testAsync = false;
}
protected void tearDown() throws Exception
{
- //Should have recieved all async messages
- if (_testAsync)
- {
- assertEquals(MSG_COUNT, receivedCount);
- }
_clientConnection.close();
super.tearDown();
@@ -125,7 +119,6 @@ public class MessageListenerTest extends TestCase implements MessageListener
public void testSynchronousRecieve() throws Exception
{
-
for (int msg = 0; msg < MSG_COUNT; msg++)
{
assertTrue(_consumer.receive(2000) != null);
@@ -134,21 +127,20 @@ public class MessageListenerTest extends TestCase implements MessageListener
public void testAsynchronousRecieve() throws Exception
{
- _testAsync = true;
-
_consumer.setMessageListener(this);
-
_logger.info("Waiting 3 seconds for messages");
try
{
- Thread.sleep(2000);
+ _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
//do nothing
}
+ //Should have recieved all async messages
+ assertEquals(MSG_COUNT, receivedCount);
}
@@ -157,6 +149,7 @@ public class MessageListenerTest extends TestCase implements MessageListener
_logger.info("Received Message(" + receivedCount + "):" + message);
receivedCount++;
+ _awaitMessages.countDown();
}
public static junit.framework.Test suite()
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
new file mode 100644
index 0000000000..e70196dff2
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.AMQShortString;
+
+public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandlerNoCloseOk.class);
+
+ private static ChannelCloseMethodHandlerNoCloseOk _handler = new ChannelCloseMethodHandlerNoCloseOk();
+
+ public static ChannelCloseMethodHandlerNoCloseOk getInstance()
+ {
+ return _handler;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ {
+ _logger.debug("ChannelClose method received");
+ ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+
+ AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
+ AMQShortString reason = method.replyText;
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
+ }
+
+ // For this test Method Handler .. don't send Close-OK
+// // TODO: Be aware of possible changes to parameter order as versions change.
+// AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor());
+// protocolSession.writeFrame(frame);
+ if (errorCode != AMQConstant.REPLY_SUCCESS)
+ {
+ _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason);
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ throw new AMQNoConsumersException("Error: " + reason, null);
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ throw new AMQNoRouteException("Error: " + reason, null);
+ }
+ else if (errorCode == AMQConstant.INVALID_SELECTOR)
+ {
+ _logger.debug("Broker responded with Invalid Selector.");
+
+ throw new AMQInvalidSelectorException(String.valueOf(reason));
+ }
+ else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
+ {
+ _logger.debug("Broker responded with Invalid Routing Key.");
+
+ throw new AMQInvalidRoutingKeyException(String.valueOf(reason));
+ }
+ else
+ {
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
+ }
+
+ }
+ protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+ }
+} \ No newline at end of file
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
index 1ed9750338..d128f30727 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
@@ -27,7 +27,6 @@ import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseMethodHandler;
import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler;
import org.apache.qpid.client.handler.BasicDeliverMethodHandler;
import org.apache.qpid.client.handler.BasicReturnMethodHandler;
@@ -91,7 +90,7 @@ public class NoCloseOKStateManager extends AMQStateManager
//
frame2handlerMap = new HashMap();
// Use Test Handler for Close methods to not send Close-OKs
- frame2handlerMap.put(ChannelCloseBody.class, TestChannelCloseMethodHandlerNoCloseOk.getInstance());
+ frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandlerNoCloseOk.getInstance());
frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());