summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-10-11 16:00:17 +0000
committerKeith Wall <kwall@apache.org>2011-10-11 16:00:17 +0000
commitec94396bd9c3e5e05376dbbc0254d3030e0b1728 (patch)
tree67694ba6d2ee37879a839b3420b279a9e8b26512
parent355e17c342fc45932d3ed05bf0500f78f9e93be3 (diff)
downloadqpid-python-ec94396bd9c3e5e05376dbbc0254d3030e0b1728.tar.gz
QPID-3542: Java client does not ack non-matching messages when using client side selectors (CPP Broker)
Applied patch from Andrew MacBean <andymacbean@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1181861 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java102
-rw-r--r--java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java10
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java67
-rwxr-xr-xjava/test-profiles/CPPExcludes3
4 files changed, 76 insertions, 106 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 3c24c67f9b..5fba351d8a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -150,13 +150,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (isMessageListenerSet() && capacity == 0)
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
}
+ else
+ {
+ // if we are synchronously waiting for a message
+ // and messages are not pre-fetched we then need to request another one
+ if(capacity == 0)
+ {
+ messageFlow();
+ }
+ }
}
catch (AMQException e)
{
@@ -245,6 +252,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_logger.debug("messageOk " + messageOk);
_logger.debug("_preAcquire " + _preAcquire);
}
+
if (!messageOk)
{
if (_preAcquire)
@@ -261,19 +269,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Message not OK, releasing");
+ _logger.debug("filterMessage - not ack'ing messaage as not aquired");
}
- releaseMessage(message);
- }
- // if we are syncrhonously waiting for a message
- // and messages are not prefetched we then need to request another one
- if(capacity == 0)
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
}
}
+
// now we need to acquire this message if needed
// this is the case of queue with a message selector set
if (!_preAcquire && messageOk && !isNoConsume())
@@ -285,6 +285,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
messageOk = acquireMessage(message);
_logger.debug("filterMessage - message acquire status : " + messageOk);
}
+
return messageOk;
}
@@ -295,38 +296,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
* @param message The message to be acknowledged
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
+ private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
{
- if (!_preAcquire)
- {
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.messageAcknowledge
- (ranges,
- _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
-
- AMQException amqe = _0_10session.getCurrentException();
- if (amqe != null)
- {
- throw amqe;
- }
- }
- }
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
+ _0_10session.messageAcknowledge
+ (ranges,
+ _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
- /**
- * Release a message
- *
- * @param message The message to be released
- * @throws AMQException If the message cannot be released due to some internal error.
- */
- private void releaseMessage(AbstractJMSMessage message) throws AMQException
- {
- if (_preAcquire)
+ final AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
{
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.getQpidSession().messageRelease(ranges);
- _0_10session.sync();
+ throw amqe;
}
}
@@ -337,25 +318,28 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
* @return true if the message has been acquired, false otherwise.
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
+ private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
{
boolean result = false;
- if (!_preAcquire)
- {
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
- Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
+ final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
- RangeSet acquired = acq.getTransfers();
- if (acquired != null && acquired.size() > 0)
- {
- result = true;
- }
+ final RangeSet acquired = acq.getTransfers();
+ if (acquired != null && acquired.size() > 0)
+ {
+ result = true;
}
return result;
}
+ private void messageFlow()
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+ }
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
@@ -364,9 +348,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (messageListener != null && capacity == 0)
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
if (messageListener != null && !_synchronousQueue.isEmpty())
{
@@ -389,9 +371,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (_0_10session.isStarted() && _syncReceive.get())
{
- _0_10session.getQpidSession().messageFlow
- (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
}
@@ -412,9 +392,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
Object o = super.getMessageFromQueue(l);
if (o == null && _0_10session.isStarted())
diff --git a/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java b/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
index 4159986090..40718c6435 100644
--- a/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
+++ b/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
@@ -37,9 +37,9 @@ public class JMSSelectorFilter implements MessageFilter
public JMSSelectorFilter(String selector) throws AMQInternalException
{
_selector = selector;
- if (JMSSelectorFilter._logger.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
- JMSSelectorFilter._logger.debug("Created JMSSelectorFilter with selector:" + _selector);
+ _logger.debug("Created JMSSelectorFilter with selector:" + _selector);
}
_matcher = new SelectorParser().parse(selector);
}
@@ -49,16 +49,16 @@ public class JMSSelectorFilter implements MessageFilter
try
{
boolean match = _matcher.matches(message);
- if (JMSSelectorFilter._logger.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
- JMSSelectorFilter._logger.debug(message + " match(" + match + ") selector(" + System
+ _logger.debug(message + " match(" + match + ") selector(" + System
.identityHashCode(_selector) + "):" + _selector);
}
return match;
}
catch (AMQInternalException e)
{
- JMSSelectorFilter._logger.warn("Caght exception when evaluating message selector for message " + message, e);
+ _logger.warn("Caught exception when evaluating message selector for message " + message, e);
}
return false;
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 0b1aeef8e9..a7a06a357a 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -20,9 +20,14 @@
*/
package org.apache.qpid.test.unit.topic;
+import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
@@ -30,7 +35,6 @@ import javax.jms.TopicSubscriber;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQTopicSessionAdaptor;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -306,51 +310,42 @@ public class TopicSessionTest extends QpidBrokerTestCase
}
/**
- * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber
- * due to a selector can be leaked.
- * @throws Exception
+ * This tests was added to demonstrate QPID-3542. The Java Client when used with the CPP Broker was failing to
+ * ack messages received that did not match the selector. This meant the messages remained indefinitely on the Broker.
*/
- public void testNonMatchingMessagesDoNotFillQueue() throws Exception
+ public void testNonMatchingMessagesHandledCorrectly() throws Exception
{
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-
- // Setup Topic
- AMQTopic topic = new AMQTopic(con, "testNoLocal");
-
- TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
+ final String topicName = getName();
+ final String clientId = "clientId" + topicName;
+ final Connection con1 = getConnection();
+ final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Topic topic1 = session1.createTopic(topicName);
// Setup subscriber with selector
- TopicSubscriber selector = session.createSubscriber(topic, "Selector = 'select'", false);
- TopicPublisher publisher = session.createPublisher(topic);
+ final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false);
+ final MessageProducer publisher = session1.createProducer(topic1);
- con.start();
- TextMessage m;
- TextMessage message;
+ con1.start();
// Send non-matching message
- message = session.createTextMessage("non-matching 1");
- publisher.publish(message);
- session.commit();
-
- // Send and consume matching message
- message = session.createTextMessage("hello");
- message.setStringProperty("Selector", "select");
+ final Message sentMessage = session1.createTextMessage("hello");
+ sentMessage.setStringProperty("Selector", "nonMatch");
+ publisher.send(sentMessage);
- publisher.publish(message);
- session.commit();
+ // Try to consume non-message, expect this to fail.
+ final Message message1 = subscriberWithSelector.receive(1000);
+ assertNull("should not have received message", message1);
+ subscriberWithSelector.close();
- m = (TextMessage) selector.receive(1000);
- assertNotNull("should have received message", m);
- assertEquals("Message contents were wrong", "hello", m.getText());
-
- // Send non-matching message
- message = session.createTextMessage("non-matching 2");
- publisher.publish(message);
- session.commit();
+ session1.close();
- // Assert queue count is 0
- long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic);
- assertEquals("Queue depth was wrong", 0, depth);
+ // Now recreate the session and subscriber (same clientid) but without selector and check that the message still
+ // is not received. This defect meant that such a message would be received.
+ final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Topic topic2 = session2.createTopic(topicName);
+ final TopicSubscriber sameSubscriberWithoutSelector = session2.createDurableSubscriber(topic2, clientId, null, false);
+ final Message message2 = sameSubscriberWithoutSelector.receive(1000);
+ assertNull("still should not have received message", message2);
}
}
diff --git a/java/test-profiles/CPPExcludes b/java/test-profiles/CPPExcludes
index d05f42e4b7..39b4d542db 100755
--- a/java/test-profiles/CPPExcludes
+++ b/java/test-profiles/CPPExcludes
@@ -64,9 +64,6 @@ org.apache.qpid.test.unit.client.connection.ConnectionTest#testDefaultExchanges
// 0-10 c++ broker in cpp.testprofile is started with no auth so won't pass this test
org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection
-// c++ broker doesn't do selectors, so this will fail
-org.apache.qpid.test.unit.topic.TopicSessionTest#testNonMatchingMessagesDoNotFillQueue
-
// InVM Broker tests
org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#*