diff options
author | Keith Wall <kwall@apache.org> | 2011-10-11 16:00:17 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2011-10-11 16:00:17 +0000 |
commit | ec94396bd9c3e5e05376dbbc0254d3030e0b1728 (patch) | |
tree | 67694ba6d2ee37879a839b3420b279a9e8b26512 | |
parent | 355e17c342fc45932d3ed05bf0500f78f9e93be3 (diff) | |
download | qpid-python-ec94396bd9c3e5e05376dbbc0254d3030e0b1728.tar.gz |
QPID-3542: Java client does not ack non-matching messages when using client side selectors (CPP Broker)
Applied patch from Andrew MacBean <andymacbean@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1181861 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 76 insertions, 106 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 3c24c67f9b..5fba351d8a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -150,13 +150,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (isMessageListenerSet() && capacity == 0) { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); + messageFlow(); } _logger.debug("messageOk, trying to notify"); super.notifyMessage(jmsMessage); } + else + { + // if we are synchronously waiting for a message + // and messages are not pre-fetched we then need to request another one + if(capacity == 0) + { + messageFlow(); + } + } } catch (AMQException e) { @@ -245,6 +252,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _logger.debug("messageOk " + messageOk); _logger.debug("_preAcquire " + _preAcquire); } + if (!messageOk) { if (_preAcquire) @@ -261,19 +269,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (_logger.isDebugEnabled()) { - _logger.debug("Message not OK, releasing"); + _logger.debug("filterMessage - not ack'ing messaage as not aquired"); } - releaseMessage(message); - } - // if we are syncrhonously waiting for a message - // and messages are not prefetched we then need to request another one - if(capacity == 0) - { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); } } + // now we need to acquire this message if needed // this is the case of queue with a message selector set if (!_preAcquire && messageOk && !isNoConsume()) @@ -285,6 +285,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM messageOk = acquireMessage(message); _logger.debug("filterMessage - message acquire status : " + messageOk); } + return messageOk; } @@ -295,38 +296,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM * @param message The message to be acknowledged * @throws AMQException If the message cannot be acquired due to some internal error. */ - private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException + private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException { - if (!_preAcquire) - { - RangeSet ranges = new RangeSet(); - ranges.add((int) message.getDeliveryTag()); - _0_10session.messageAcknowledge - (ranges, - _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); - - AMQException amqe = _0_10session.getCurrentException(); - if (amqe != null) - { - throw amqe; - } - } - } + final RangeSet ranges = new RangeSet(); + ranges.add((int) message.getDeliveryTag()); + _0_10session.messageAcknowledge + (ranges, + _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); - /** - * Release a message - * - * @param message The message to be released - * @throws AMQException If the message cannot be released due to some internal error. - */ - private void releaseMessage(AbstractJMSMessage message) throws AMQException - { - if (_preAcquire) + final AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) { - RangeSet ranges = new RangeSet(); - ranges.add((int) message.getDeliveryTag()); - _0_10session.getQpidSession().messageRelease(ranges); - _0_10session.sync(); + throw amqe; } } @@ -337,25 +318,28 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM * @return true if the message has been acquired, false otherwise. * @throws AMQException If the message cannot be acquired due to some internal error. */ - private boolean acquireMessage(AbstractJMSMessage message) throws AMQException + private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException { boolean result = false; - if (!_preAcquire) - { - RangeSet ranges = new RangeSet(); - ranges.add((int) message.getDeliveryTag()); + final RangeSet ranges = new RangeSet(); + ranges.add((int) message.getDeliveryTag()); - Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get(); + final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get(); - RangeSet acquired = acq.getTransfers(); - if (acquired != null && acquired.size() > 0) - { - result = true; - } + final RangeSet acquired = acq.getTransfers(); + if (acquired != null && acquired.size() > 0) + { + result = true; } return result; } + private void messageFlow() + { + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } public void setMessageListener(final MessageListener messageListener) throws JMSException { @@ -364,9 +348,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (messageListener != null && capacity == 0) { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); + messageFlow(); } if (messageListener != null && !_synchronousQueue.isEmpty()) { @@ -389,9 +371,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (_0_10session.isStarted() && _syncReceive.get()) { - _0_10session.getQpidSession().messageFlow - (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); + messageFlow(); } } @@ -412,9 +392,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty()) { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); + messageFlow(); } Object o = super.getMessageFromQueue(l); if (o == null && _0_10session.isStarted()) diff --git a/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java b/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java index 4159986090..40718c6435 100644 --- a/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java +++ b/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java @@ -37,9 +37,9 @@ public class JMSSelectorFilter implements MessageFilter public JMSSelectorFilter(String selector) throws AMQInternalException { _selector = selector; - if (JMSSelectorFilter._logger.isDebugEnabled()) + if (_logger.isDebugEnabled()) { - JMSSelectorFilter._logger.debug("Created JMSSelectorFilter with selector:" + _selector); + _logger.debug("Created JMSSelectorFilter with selector:" + _selector); } _matcher = new SelectorParser().parse(selector); } @@ -49,16 +49,16 @@ public class JMSSelectorFilter implements MessageFilter try { boolean match = _matcher.matches(message); - if (JMSSelectorFilter._logger.isDebugEnabled()) + if (_logger.isDebugEnabled()) { - JMSSelectorFilter._logger.debug(message + " match(" + match + ") selector(" + System + _logger.debug(message + " match(" + match + ") selector(" + System .identityHashCode(_selector) + "):" + _selector); } return match; } catch (AMQInternalException e) { - JMSSelectorFilter._logger.warn("Caght exception when evaluating message selector for message " + message, e); + _logger.warn("Caught exception when evaluating message selector for message " + message, e); } return false; } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 0b1aeef8e9..a7a06a357a 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -20,9 +20,14 @@ */ package org.apache.qpid.test.unit.topic; +import javax.jms.Connection; import javax.jms.InvalidDestinationException; +import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; @@ -30,7 +35,6 @@ import javax.jms.TopicSubscriber; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.AMQTopicSessionAdaptor; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -306,51 +310,42 @@ public class TopicSessionTest extends QpidBrokerTestCase } /** - * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber - * due to a selector can be leaked. - * @throws Exception + * This tests was added to demonstrate QPID-3542. The Java Client when used with the CPP Broker was failing to + * ack messages received that did not match the selector. This meant the messages remained indefinitely on the Broker. */ - public void testNonMatchingMessagesDoNotFillQueue() throws Exception + public void testNonMatchingMessagesHandledCorrectly() throws Exception { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - - // Setup Topic - AMQTopic topic = new AMQTopic(con, "testNoLocal"); - - TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); + final String topicName = getName(); + final String clientId = "clientId" + topicName; + final Connection con1 = getConnection(); + final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Topic topic1 = session1.createTopic(topicName); // Setup subscriber with selector - TopicSubscriber selector = session.createSubscriber(topic, "Selector = 'select'", false); - TopicPublisher publisher = session.createPublisher(topic); + final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false); + final MessageProducer publisher = session1.createProducer(topic1); - con.start(); - TextMessage m; - TextMessage message; + con1.start(); // Send non-matching message - message = session.createTextMessage("non-matching 1"); - publisher.publish(message); - session.commit(); - - // Send and consume matching message - message = session.createTextMessage("hello"); - message.setStringProperty("Selector", "select"); + final Message sentMessage = session1.createTextMessage("hello"); + sentMessage.setStringProperty("Selector", "nonMatch"); + publisher.send(sentMessage); - publisher.publish(message); - session.commit(); + // Try to consume non-message, expect this to fail. + final Message message1 = subscriberWithSelector.receive(1000); + assertNull("should not have received message", message1); + subscriberWithSelector.close(); - m = (TextMessage) selector.receive(1000); - assertNotNull("should have received message", m); - assertEquals("Message contents were wrong", "hello", m.getText()); - - // Send non-matching message - message = session.createTextMessage("non-matching 2"); - publisher.publish(message); - session.commit(); + session1.close(); - // Assert queue count is 0 - long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic); - assertEquals("Queue depth was wrong", 0, depth); + // Now recreate the session and subscriber (same clientid) but without selector and check that the message still + // is not received. This defect meant that such a message would be received. + final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Topic topic2 = session2.createTopic(topicName); + final TopicSubscriber sameSubscriberWithoutSelector = session2.createDurableSubscriber(topic2, clientId, null, false); + final Message message2 = sameSubscriberWithoutSelector.receive(1000); + assertNull("still should not have received message", message2); } } diff --git a/java/test-profiles/CPPExcludes b/java/test-profiles/CPPExcludes index d05f42e4b7..39b4d542db 100755 --- a/java/test-profiles/CPPExcludes +++ b/java/test-profiles/CPPExcludes @@ -64,9 +64,6 @@ org.apache.qpid.test.unit.client.connection.ConnectionTest#testDefaultExchanges // 0-10 c++ broker in cpp.testprofile is started with no auth so won't pass this test org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection -// c++ broker doesn't do selectors, so this will fail -org.apache.qpid.test.unit.topic.TopicSessionTest#testNonMatchingMessagesDoNotFillQueue - // InVM Broker tests org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#* |