summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-03-30 16:01:23 +0000
committerRobert Gemmell <robbie@apache.org>2011-03-30 16:01:23 +0000
commit34e71040bd3d94549d6ad100db2e7f481e840cbe (patch)
treebed48cfdf43b33108164d0fad33f085fbdf0f008 /java/broker/src
parentc6a894d5e9f86527443086ae67d7dff668f5d800 (diff)
downloadqpid-python-34e71040bd3d94549d6ad100db2e7f481e840cbe.tar.gz
QPID-3167: add a unit test of SimpleAMQQueue#processQueue to check delivery when subscriptions with unique selectors are in use
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1087000 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java79
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java55
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java125
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java20
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java2
20 files changed, 251 insertions, 74 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index dd3046cd01..08eb05680c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -346,7 +346,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
finally
{
long bodySize = _currentMessage.getSize();
- long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().properties).getTimestamp();
+ long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().getProperties()).getTimestamp();
_session.registerMessageReceived(bodySize, timestamp);
_currentMessage = null;
}
@@ -1079,8 +1079,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
private boolean checkMessageUserId(ContentHeaderBody header)
{
AMQShortString userID =
- header.properties instanceof BasicContentHeaderProperties
- ? ((BasicContentHeaderProperties) header.properties).getUserId()
+ header.getProperties() instanceof BasicContentHeaderProperties
+ ? ((BasicContentHeaderProperties) header.getProperties()).getUserId()
: null;
return (!MSG_AUTH || _session.getPrincipal().getName().equals(userID == null? "" : userID.toString()));
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java
index 194835ac02..84a1642578 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java
@@ -37,7 +37,7 @@ public class ContentHeaderBodyAdapter implements AMQMessageHeader
private BasicContentHeaderProperties getProperties()
{
- return (BasicContentHeaderProperties) _contentHeaderBody.properties;
+ return (BasicContentHeaderProperties) _contentHeaderBody.getProperties();
}
public String getCorrelationId()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
index 30bea7b6e6..66cb7ed83b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
@@ -161,7 +161,7 @@ public class MessageMetaData implements StorableMessageMetaData
public boolean isPersistent()
{
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.properties);
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.getProperties());
return properties.getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT;
}
@@ -229,7 +229,7 @@ public class MessageMetaData implements StorableMessageMetaData
{
private BasicContentHeaderProperties getProperties()
{
- return (BasicContentHeaderProperties) getContentHeaderBody().properties;
+ return (BasicContentHeaderProperties) getContentHeaderBody().getProperties();
}
public String getCorrelationId()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 77c4e8fc23..c8eb118b11 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -507,7 +507,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
{
List<String> list = new ArrayList<String>();
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.getProperties();
list.add("reply-to = " + headerProperties.getReplyToAsString());
list.add("propertyFlags = " + headerProperties.getPropertyFlags());
list.add("ApplicationID = " + headerProperties.getAppIdAsString());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 2d2fb3a214..3e3288404f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -96,9 +96,9 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
public void setExpiration()
{
long expiration =
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration();
+ ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
long timestamp =
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp();
+ ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp();
if (SYNCHED_CLOCKS)
{
@@ -193,8 +193,8 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
public boolean isPersistent()
{
- return getContentHeader().properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() ==
+ return getContentHeader().getProperties() instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() ==
BasicContentHeaderProperties.PERSISTENT;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
new file mode 100644
index 0000000000..44b7c95535
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.pool.ReadWriteRunnable;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.queue.QueueRunner;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
+
+
+public class QueueRunner implements ReadWriteRunnable
+{
+ private static final Logger _logger = Logger.getLogger(QueueRunner.class);
+
+ private String _name;
+ private SimpleAMQQueue _queue;
+
+ public QueueRunner(SimpleAMQQueue queue, long count)
+ {
+ _queue = queue;
+ _name = "QueueRunner-" + count + "-" + queue.getLogActor();
+ }
+
+ public void run()
+ {
+ String originalName = Thread.currentThread().getName();
+ try
+ {
+ Thread.currentThread().setName(_name);
+ CurrentActor.set(_queue.getLogActor());
+
+ _queue.processQueue(this);
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ Thread.currentThread().setName(originalName);
+ }
+ }
+
+ public boolean isRead()
+ {
+ return false;
+ }
+
+ public boolean isWrite()
+ {
+ return true;
+ }
+
+ public String toString()
+ {
+ return _name;
+ }
+} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 0e3f7b2625..4890c00047 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1585,7 +1585,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void deliverAsync()
{
- Runner runner = new Runner(_stateChangeCount.incrementAndGet());
+ QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1604,52 +1604,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asyncDelivery.execute(flusher);
}
-
- private class Runner implements ReadWriteRunnable
- {
- String _name;
- public Runner(long count)
- {
- _name = "QueueRunner-" + count + "-" + _logActor;
- }
-
- public void run()
- {
- String originalName = Thread.currentThread().getName();
- try
- {
- Thread.currentThread().setName(_name);
- CurrentActor.set(_logActor);
-
- processQueue(this);
- }
- catch (AMQException e)
- {
- _logger.error(e);
- }
- finally
- {
- CurrentActor.remove();
- Thread.currentThread().setName(originalName);
- }
- }
-
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
-
- public String toString()
- {
- return _name;
- }
- }
-
public void flushSubscription(Subscription sub) throws AMQException
{
// Access control
@@ -1834,7 +1788,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
* @param runner the Runner to schedule
* @throws AMQException
*/
- private void processQueue(Runnable runner) throws AMQException
+ public void processQueue(QueueRunner runner) throws AMQException
{
long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
@@ -2289,4 +2243,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
}
+
+ public LogActor getLogActor()
+ {
+ return _logActor;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index a20436f029..68e47fd86a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -441,7 +441,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
Struct[] headers = new Struct[] { deliveryProps, messageProps };
BasicContentHeaderProperties properties =
- (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().properties;
+ (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
if(exchange != null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
index 4fd4999b19..806e161bbc 100644
--- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
@@ -364,7 +364,7 @@ public class Show extends AbstractCommand
{
if(msg instanceof AMQMessage)
{
- headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().properties);
+ headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().getProperties());
}
}
catch (AMQException e)
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 7b58966a4c..9e831b2a8e 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -276,7 +276,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
static ContentHeaderBody getContentHeader(FieldTable headers)
{
ContentHeaderBody header = new ContentHeaderBody();
- header.properties = getProperties(headers);
+ header.setProperties(getProperties(headers));
return header;
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index f72961c03c..403a290a0f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -396,7 +396,7 @@ public class TopicExchangeTest extends InternalBrokerBaseCase
IncomingMessage message = new IncomingMessage(info);
final ContentHeaderBody chb = new ContentHeaderBody();
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
- chb.properties = props;
+ chb.setProperties(props);
message.setContentHeaderBody(chb);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index d52f4c03f3..3961b3b355 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -96,7 +96,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
AMQMessage msg = super.createMessage(id);
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
props.setPriority(i);
- msg.getContentHeaderBody().properties = props;
+ msg.getContentHeaderBody().setProperties(props);
return msg;
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 0707cab3d5..a8bddcf6bf 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -277,7 +277,7 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
- contentHeaderBody.properties = props;
+ contentHeaderBody.setProperties(props);
contentHeaderBody.bodySize = size; // in bytes
IncomingMessage message = new IncomingMessage(publish);
message.setContentHeaderBody(contentHeaderBody);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 5b72cfac40..365353e734 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -402,8 +402,8 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
- contentHeaderBody.properties = new BasicContentHeaderProperties();
- ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
+ contentHeaderBody.setProperties(new BasicContentHeaderProperties());
+ ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1));
IncomingMessage msg = new IncomingMessage(publish);
msg.setContentHeaderBody(contentHeaderBody);
return msg;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 04608275a3..0f5374b3e5 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -126,7 +126,7 @@ public class AckTest extends InternalBrokerBaseCase
//IncomingMessage msg2 = null;
BasicContentHeaderProperties b = new BasicContentHeaderProperties();
ContentHeaderBody cb = new ContentHeaderBody();
- cb.properties = b;
+ cb.setProperties(b);
if (persistent)
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 67d093d00a..41ca751684 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -660,8 +660,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
// Create IncomingMessage and nondurable queue
final IncomingMessage msg = new IncomingMessage(info);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.properties = new BasicContentHeaderProperties();
- ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ contentHeaderBody.setProperties(new BasicContentHeaderProperties());
+ ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) 2);
msg.setContentHeaderBody(contentHeaderBody);
final ArrayList<BaseQueue> qs = new ArrayList<BaseQueue>();
@@ -707,6 +707,111 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
}
+ /**
+ * processQueue() is used when asynchronously delivering messages to
+ * subscriptions which could not be delivered immediately during the
+ * enqueue() operation.
+ *
+ * A defect within the method would mean that delivery of these messages may
+ * not occur should the Runner stop before all messages have been processed.
+ * Such a defect was discovered when Selectors were used such that one and
+ * only one subscription can/will accept any given messages, but multiple
+ * subscriptions are present, and one of the earlier subscriptions receives
+ * more messages than the others.
+ *
+ * This test is to validate that the processQueue() method is able to
+ * correctly deliver all of the messages present for asynchronous delivery
+ * to subscriptions in such a scenario.
+ */
+ public void testProcessQueueWithUniqueSelectors() throws Exception
+ {
+ TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory();
+ SimpleAMQQueue testQueue = new SimpleAMQQueue("testQueue", false, "testOwner",false,
+ false, _virtualHost, factory, null)
+ {
+ @Override
+ public void deliverAsync(Subscription sub)
+ {
+ // do nothing, i.e prevent deliveries by the SubFlushRunner
+ // when registering the new subscriptions
+ }
+ };
+
+ // retrieve the QueueEntryList the queue creates and insert the test
+ // messages, thus avoiding straight-through delivery attempts during
+ //enqueue() process.
+ QueueEntryList list = factory.getQueueEntryList();
+ assertNotNull("QueueEntryList should have been created", list);
+
+ QueueEntry msg1 = list.add(createMessage(1L));
+ QueueEntry msg2 = list.add(createMessage(2L));
+ QueueEntry msg3 = list.add(createMessage(3L));
+ QueueEntry msg4 = list.add(createMessage(4L));
+ QueueEntry msg5 = list.add(createMessage(5L));
+
+ // Create lists of the entries each subscription should be interested
+ // in.Bias over 50% of the messages to the first subscription so that
+ // the later subscriptions reject them and report being done before
+ // the first subscription as the processQueue method proceeds.
+ List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3);
+ List<QueueEntry> msgListSub2 = createEntriesList(msg4);
+ List<QueueEntry> msgListSub3 = createEntriesList(msg5);
+
+ MockSubscription sub1 = new MockSubscription(msgListSub1);
+ MockSubscription sub2 = new MockSubscription(msgListSub2);
+ MockSubscription sub3 = new MockSubscription(msgListSub3);
+
+ // register the subscriptions
+ testQueue.registerSubscription(sub1, false);
+ testQueue.registerSubscription(sub2, false);
+ testQueue.registerSubscription(sub3, false);
+
+ //check that no messages have been delivered to the
+ //subscriptions during registration
+ assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size());
+ assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size());
+ assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
+
+ // call processQueue to deliver the messages
+ testQueue.processQueue(new QueueRunner(testQueue, 1)
+ {
+ @Override
+ public void run()
+ {
+ // we dont actually want/need this runner to do any work
+ // because we we are already doing it!
+ }
+ });
+
+ // check expected messages delivered to correct consumers
+ verifyRecievedMessages(msgListSub1, sub1.getMessages());
+ verifyRecievedMessages(msgListSub2, sub2.getMessages());
+ verifyRecievedMessages(msgListSub3, sub3.getMessages());
+ }
+
+ private List<QueueEntry> createEntriesList(QueueEntry... entries)
+ {
+ ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>();
+ for (QueueEntry entry : entries)
+ {
+ entriesList.add(entry);
+ }
+ return entriesList;
+ }
+
+ private void verifyRecievedMessages(List<QueueEntry> expected,
+ List<QueueEntry> delivered)
+ {
+ assertEquals("Consumer did not receive the expected number of messages",
+ expected.size(), delivered.size());
+
+ for (QueueEntry msg : expected)
+ {
+ assertTrue("Consumer did not recieve msg: "
+ + msg.getMessage().getMessageNumber(), delivered.contains(msg));
+ }
+ }
+
public class TestMessage extends AMQMessage
{
private final long _tag;
@@ -747,4 +852,20 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
AMQMessage messageA = new TestMessage(id, id, info);
return messageA;
}
+
+ class TestSimpleQueueEntryListFactory implements QueueEntryListFactory
+ {
+ QueueEntryList _list;
+
+ public QueueEntryList createQueueEntryList(AMQQueue queue)
+ {
+ _list = new SimpleQueueEntryList(queue);
+ return _list;
+ }
+
+ public QueueEntryList getQueueEntryList()
+ {
+ return _list;
+ }
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 3ebe631f62..62ceb68208 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -589,7 +589,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
headerBody.classId = BasicConsumeBodyImpl.CLASS_ID;
headerBody.bodySize = 0;
- headerBody.properties = properties;
+ headerBody.setProperties(properties);
try
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
index a75cbe8662..2d41eb9899 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
@@ -102,7 +102,7 @@ public class ReferenceCountingTest extends QpidTestCase
ContentHeaderBody chb = new ContentHeaderBody();
BasicContentHeaderProperties bchp = new BasicContentHeaderProperties();
bchp.setDeliveryMode((byte)2);
- chb.properties = bchp;
+ chb.setProperties(bchp);
return chb;
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 1ec134e90e..6fbc627d8c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.subscription;
*/
import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -45,6 +46,7 @@ public class MockSubscription implements Subscription
private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
+ private List<QueueEntry> _acceptEntries = null;
private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
@@ -54,6 +56,15 @@ public class MockSubscription implements Subscription
// Create a simple ID that increments for ever new Subscription
private final long _subscriptionID = idGenerator.getAndIncrement();
+ public MockSubscription()
+ {
+ }
+
+ public MockSubscription(List<QueueEntry> acceptEntries)
+ {
+ _acceptEntries = acceptEntries;
+ }
+
public void close()
{
_closed = true;
@@ -119,8 +130,15 @@ public class MockSubscription implements Subscription
_stateChangeLock.lock();
}
- public boolean hasInterest(QueueEntry msg)
+ public boolean hasInterest(QueueEntry entry)
{
+ if(_acceptEntries != null)
+ {
+ //simulate selector behaviour, only signal
+ //interest in the dictated queue entries
+ return _acceptEntries.contains(entry);
+ }
+
return true;
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 925b161118..ff94942457 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -243,7 +243,7 @@ public class InternalBrokerBaseCase extends QpidTestCase
//Make Message Persistent
properties.setDeliveryMode((byte) 2);
- _headerBody.properties = properties;
+ _headerBody.setProperties(properties);
channel.publishContentHeader(_headerBody);
}