diff options
Diffstat (limited to 'java/client/src')
6 files changed, 226 insertions, 93 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index adf2a4bda2..1fb1c51890 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -62,6 +62,9 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { @@ -144,6 +147,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ + private final ExecutorService _taskPool = Executors.newCachedThreadPool(); + /** * @param broker brokerdetails * @param username username @@ -716,8 +722,31 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { + long startCloseTime = System.currentTimeMillis(); + + _taskPool.shutdown(); closeAllSessions(null, timeout); + + if (!_taskPool.isTerminated()) + { + try + { + //adjust timeout + long taskPoolTimeout = adjustTimeout(timeout, startCloseTime); + + _taskPool.awaitTermination(taskPoolTimeout , TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + _logger.info("Interrupted while shutting down connection thread pool."); + } + } + + //adjust timeout + timeout = adjustTimeout(timeout, startCloseTime); + _protocolHandler.closeConnection(timeout); + } catch (AMQException e) { @@ -727,6 +756,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } + private long adjustTimeout(long timeout, long startTime) + { + long now = System.currentTimeMillis(); + timeout -= now - startTime; + if (timeout < 0) + { + timeout = 0; + } + return timeout; + } + /** * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to * mark objects "visible" in userland as closed after failover or other significant event that impacts the @@ -1147,4 +1187,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _temporaryQueueExchangeName = temporaryQueueExchangeName; } + + public void performConnectionTask(Runnable task) + { + _taskPool.execute(task); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index dc2ffc38c4..fe77acfabc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -72,7 +72,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.BlockingMethodFrameListener; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; @@ -192,7 +191,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _hasMessageListeners; - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread @@ -277,42 +275,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - else - { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, - false, - message.getBounceBody().exchange, - message.getBounceBody().routingKey, - message.getContentHeader(), - message.getBodies()); - - AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); - AMQShortString reason = message.getBounceBody().replyText; - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS) - { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); - } - else if (errorCode == AMQConstant.NO_ROUTE) - { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); - } - else - { - _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); - } - - } - catch (Exception e) - { - _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); - } - } } public void close() @@ -1384,7 +1346,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (topicName.indexOf('/') == -1) { - return new AMQTopic(getDefaultTopicExchangeName(),new AMQShortString(topicName)); + return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); } else { @@ -1474,8 +1436,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // if the queue is bound to the exchange but NOT for this topic, then the JMS spec // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(),dest.getAMQQueueName()) && - !isQueueBound(dest.getExchangeName(),dest.getAMQQueueName(), topicName)) + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) && + !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) { deleteQueue(dest.getAMQQueueName()); } @@ -1634,9 +1596,59 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi + "] received in session with channel id " + _channelId); } - startDistpatcherIfNecessary(); + if (message.getDeliverBody() == null) + { + // Return of the bounced message. + returnBouncedMessage(message); + } + else + { + _queue.add(message); + } + } + + private void returnBouncedMessage(final UnprocessedMessage message) + { + _connection.performConnectionTask( + new Runnable() + { + public void run() + { + try + { + // Bounced message is processed here, away from the mina thread + AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, + false, + message.getBounceBody().exchange, + message.getBounceBody().routingKey, + message.getContentHeader(), + message.getBodies()); + + AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); + AMQShortString reason = message.getBounceBody().replyText; + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + if (errorCode == AMQConstant.NO_CONSUMERS) + { + _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); + } + else + { + _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); + } - _queue.add(message); + } + catch (Exception e) + { + _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); + } + } + }); } /** @@ -1882,7 +1894,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session"); } - if(!(topic instanceof AMQTopic)) + if (!(topic instanceof AMQTopic)) { throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName()); } @@ -1917,7 +1929,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } - public int getTicket() { return _ticket; diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java index 11a39df10f..6062528d43 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -42,16 +42,13 @@ import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; /** - * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue - * <p/> - * The message delivery process: - * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s - * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start - * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a - * session can run in any order and a synchronous put/poll will block the dispatcher). - * <p/> - * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered - * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery + * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread + * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at + * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple + * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting + * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining + * messages will be left on the queue and lost, subsequent messages on the session will arrive first. */ public class MessageListenerMultiConsumerTest extends TestCase { @@ -66,7 +63,6 @@ public class MessageListenerMultiConsumerTest extends TestCase private MessageConsumer _consumer1; private MessageConsumer _consumer2; - private boolean _testAsync; private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock protected void setUp() throws Exception @@ -116,16 +112,10 @@ public class MessageListenerMultiConsumerTest extends TestCase producerConnection.close(); - _testAsync = false; } protected void tearDown() throws Exception { - //Should have recieved all async messages - if (_testAsync) - { - assertEquals(MSG_COUNT, receivedCount1 + receivedCount2); - } _clientConnection.close(); super.tearDown(); @@ -161,8 +151,6 @@ public class MessageListenerMultiConsumerTest extends TestCase public void testAsynchronousRecieve() throws Exception { - _testAsync = true; - _consumer1.setMessageListener(new MessageListener() { public void onMessage(Message message) @@ -173,7 +161,7 @@ public class MessageListenerMultiConsumerTest extends TestCase if (receivedCount1 == MSG_COUNT / 2) { - _allMessagesSent.countDown(); + _allMessagesSent.countDown(); } } @@ -198,13 +186,14 @@ public class MessageListenerMultiConsumerTest extends TestCase try { - _allMessagesSent.await(2000, TimeUnit.MILLISECONDS); + _allMessagesSent.await(4000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { //do nothing } + assertEquals(MSG_COUNT, receivedCount1 + receivedCount2); } diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java index 3e44888459..dc01005247 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java @@ -21,6 +21,8 @@ package org.apache.qpid.client; import java.util.Hashtable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -38,18 +40,17 @@ import junit.framework.TestCase; import org.apache.log4j.Logger; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.AMQBindingURL; /** - * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue - * <p/> - * The message delivery process: - * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s - * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start - * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a - * session can run in any order and a synchronous put/poll will block the dispatcher). - * <p/> - * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered - * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery + * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread + * take()s from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at + * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple + * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting + * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining + * messages will be left on the queue and lost, subsequent messages on the session will arrive first. */ public class MessageListenerTest extends TestCase implements MessageListener { @@ -61,7 +62,7 @@ public class MessageListenerTest extends TestCase implements MessageListener private int receivedCount = 0; private MessageConsumer _consumer; private Connection _clientConnection; - private boolean _testAsync; + private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT); protected void setUp() throws Exception { @@ -71,9 +72,9 @@ public class MessageListenerTest extends TestCase implements MessageListener InitialContextFactory factory = new PropertiesFileInitialContextFactory(); Hashtable<String, String> env = new Hashtable<String, String>(); - + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'"); - env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + env.put("queue.queue", "MessageListenerTest"); _context = factory.getInitialContext(env); @@ -86,7 +87,6 @@ public class MessageListenerTest extends TestCase implements MessageListener Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - _consumer = clientSession.createConsumer(queue); //Create Producer @@ -106,16 +106,10 @@ public class MessageListenerTest extends TestCase implements MessageListener producerConnection.close(); - _testAsync = false; } protected void tearDown() throws Exception { - //Should have recieved all async messages - if (_testAsync) - { - assertEquals(MSG_COUNT, receivedCount); - } _clientConnection.close(); super.tearDown(); @@ -125,7 +119,6 @@ public class MessageListenerTest extends TestCase implements MessageListener public void testSynchronousRecieve() throws Exception { - for (int msg = 0; msg < MSG_COUNT; msg++) { assertTrue(_consumer.receive(2000) != null); @@ -134,21 +127,20 @@ public class MessageListenerTest extends TestCase implements MessageListener public void testAsynchronousRecieve() throws Exception { - _testAsync = true; - _consumer.setMessageListener(this); - _logger.info("Waiting 3 seconds for messages"); try { - Thread.sleep(2000); + _awaitMessages.await(3000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { //do nothing } + //Should have recieved all async messages + assertEquals(MSG_COUNT, receivedCount); } @@ -157,6 +149,7 @@ public class MessageListenerTest extends TestCase implements MessageListener _logger.info("Received Message(" + receivedCount + "):" + message); receivedCount++; + _awaitMessages.countDown(); } public static junit.framework.Test suite() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java new file mode 100644 index 0000000000..e70196dff2 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.client.channelclose; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.AMQShortString; + +public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandlerNoCloseOk.class); + + private static ChannelCloseMethodHandlerNoCloseOk _handler = new ChannelCloseMethodHandlerNoCloseOk(); + + public static ChannelCloseMethodHandlerNoCloseOk getInstance() + { + return _handler; + } + + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + { + _logger.debug("ChannelClose method received"); + ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); + + AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); + AMQShortString reason = method.replyText; + if (_logger.isDebugEnabled()) + { + _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); + } + + // For this test Method Handler .. don't send Close-OK +// // TODO: Be aware of possible changes to parameter order as versions change. +// AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor()); +// protocolSession.writeFrame(frame); + if (errorCode != AMQConstant.REPLY_SUCCESS) + { + _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason); + if (errorCode == AMQConstant.NO_CONSUMERS) + { + throw new AMQNoConsumersException("Error: " + reason, null); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + throw new AMQNoRouteException("Error: " + reason, null); + } + else if (errorCode == AMQConstant.INVALID_SELECTOR) + { + _logger.debug("Broker responded with Invalid Selector."); + + throw new AMQInvalidSelectorException(String.valueOf(reason)); + } + else if (errorCode == AMQConstant.INVALID_ROUTING_KEY) + { + _logger.debug("Broker responded with Invalid Routing Key."); + + throw new AMQInvalidRoutingKeyException(String.valueOf(reason)); + } + else + { + throw new AMQChannelClosedException(errorCode, "Error: " + reason); + } + + } + protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason)); + } +}
\ No newline at end of file diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java index 1ed9750338..d128f30727 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java @@ -27,7 +27,6 @@ import org.apache.qpid.client.handler.ConnectionCloseMethodHandler; import org.apache.qpid.client.handler.ConnectionTuneMethodHandler; import org.apache.qpid.client.handler.ConnectionSecureMethodHandler; import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler; -import org.apache.qpid.client.handler.ChannelCloseMethodHandler; import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler; import org.apache.qpid.client.handler.BasicDeliverMethodHandler; import org.apache.qpid.client.handler.BasicReturnMethodHandler; @@ -91,7 +90,7 @@ public class NoCloseOKStateManager extends AMQStateManager // frame2handlerMap = new HashMap(); // Use Test Handler for Close methods to not send Close-OKs - frame2handlerMap.put(ChannelCloseBody.class, TestChannelCloseMethodHandlerNoCloseOk.getInstance()); + frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandlerNoCloseOk.getInstance()); frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance()); frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); |