summaryrefslogtreecommitdiff
path: root/cpp/src/tests/QueueTest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/QueueTest.cpp')
-rw-r--r--cpp/src/tests/QueueTest.cpp981
1 files changed, 921 insertions, 60 deletions
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 20b3d90eb6..6c2adf5c87 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,29 +18,44 @@
* under the License.
*
*/
+#include "MessageUtils.h"
#include "unit_test.h"
+#include "test_tools.h"
#include "qpid/Exception.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/client/QueueOptions.h"
+#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
#include <iostream>
#include "boost/format.hpp"
using boost::intrusive_ptr;
using namespace qpid;
using namespace qpid::broker;
+using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
+namespace qpid {
+namespace tests {
+
class TestConsumer : public virtual Consumer{
public:
- typedef boost::shared_ptr<TestConsumer> shared_ptr;
+ typedef boost::shared_ptr<TestConsumer> shared_ptr;
intrusive_ptr<Message> last;
bool received;
- TestConsumer(): received(false) {};
+ TestConsumer(bool acquire = true):Consumer(acquire), received(false) {};
virtual bool deliver(QueuedMessage& msg){
last = msg.payload;
@@ -53,19 +68,20 @@ public:
class FailOnDeliver : public Deliverable
{
- Message msg;
+ boost::intrusive_ptr<Message> msg;
public:
- void deliverTo(Queue::shared_ptr& queue)
+ FailOnDeliver() : msg(MessageUtils::createMessage()) {}
+ void deliverTo(const boost::shared_ptr<Queue>& queue)
{
throw Exception(QPID_MSG("Invalid delivery to " << queue->getName()));
}
- Message& getMessage() { return msg; }
+ Message& getMessage() { return *(msg.get()); }
};
-intrusive_ptr<Message> message(std::string exchange, std::string routingKey) {
+intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey) {
intrusive_ptr<Message> msg(new Message());
- AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0));
- AMQFrame header(in_place<AMQHeaderBody>());
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
msg->getFrames().append(method);
msg->getFrames().append(header);
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
@@ -77,68 +93,69 @@ QPID_AUTO_TEST_SUITE(QueueTestSuite)
QPID_AUTO_TEST_CASE(testAsyncMessage) {
Queue::shared_ptr queue(new Queue("my_test_queue", true));
intrusive_ptr<Message> received;
-
- TestConsumer c1;
+
+ TestConsumer::shared_ptr c1(new TestConsumer());
queue->consume(c1);
-
-
+
+
//Test basic delivery:
- intrusive_ptr<Message> msg1 = message("e", "A");
- msg1->enqueueAsync();//this is done on enqueue which is not called from process
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process
queue->process(msg1);
sleep(2);
-
- BOOST_CHECK(!c1.received);
+
+ BOOST_CHECK(!c1->received);
msg1->enqueueComplete();
-
+
received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg1.get(), received.get());
+ BOOST_CHECK_EQUAL(msg1.get(), received.get());
}
-
-
+
+
QPID_AUTO_TEST_CASE(testAsyncMessageCount){
Queue::shared_ptr queue(new Queue("my_test_queue", true));
- intrusive_ptr<Message> msg1 = message("e", "A");
- msg1->enqueueAsync();//this is done on enqueue which is not called from process
-
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process
+
queue->process(msg1);
sleep(2);
uint32_t compval=0;
- BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
+ BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
msg1->enqueueComplete();
compval=1;
- BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
+ BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
+ BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
}
QPID_AUTO_TEST_CASE(testConsumers){
Queue::shared_ptr queue(new Queue("my_queue", true));
-
+
//Test adding consumers:
- TestConsumer c1;
- TestConsumer c2;
+ TestConsumer::shared_ptr c1(new TestConsumer());
+ TestConsumer::shared_ptr c2(new TestConsumer());
queue->consume(c1);
queue->consume(c2);
-
+
BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount());
-
+
//Test basic delivery:
- intrusive_ptr<Message> msg1 = message("e", "A");
- intrusive_ptr<Message> msg2 = message("e", "B");
- intrusive_ptr<Message> msg3 = message("e", "C");
-
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = create_message("e", "C");
+
queue->deliver(msg1);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg1.get(), c1.last.get());
-
+ BOOST_CHECK_EQUAL(msg1.get(), c1->last.get());
+
queue->deliver(msg2);
BOOST_CHECK(queue->dispatch(c2));
- BOOST_CHECK_EQUAL(msg2.get(), c2.last.get());
-
- c1.received = false;
+ BOOST_CHECK_EQUAL(msg2.get(), c2->last.get());
+
+ c1->received = false;
queue->deliver(msg3);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg3.get(), c1.last.get());
-
+ BOOST_CHECK_EQUAL(msg3.get(), c1->last.get());
+
//Test cancellation:
queue->cancel(c1);
BOOST_CHECK_EQUAL(uint32_t(1), queue->getConsumerCount());
@@ -152,15 +169,15 @@ QPID_AUTO_TEST_CASE(testRegistry){
registry.declare("queue1", true, true);
registry.declare("queue2", true, true);
registry.declare("queue3", true, true);
-
+
BOOST_CHECK(registry.find("queue1"));
BOOST_CHECK(registry.find("queue2"));
BOOST_CHECK(registry.find("queue3"));
-
+
registry.destroy("queue1");
registry.destroy("queue2");
registry.destroy("queue3");
-
+
BOOST_CHECK(!registry.find("queue1"));
BOOST_CHECK(!registry.find("queue2"));
BOOST_CHECK(!registry.find("queue3"));
@@ -168,17 +185,17 @@ QPID_AUTO_TEST_CASE(testRegistry){
QPID_AUTO_TEST_CASE(testDequeue){
Queue::shared_ptr queue(new Queue("my_queue", true));
- intrusive_ptr<Message> msg1 = message("e", "A");
- intrusive_ptr<Message> msg2 = message("e", "B");
- intrusive_ptr<Message> msg3 = message("e", "C");
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = create_message("e", "C");
intrusive_ptr<Message> received;
-
+
queue->deliver(msg1);
queue->deliver(msg2);
queue->deliver(msg3);
-
+
BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
-
+
received = queue->get().payload;
BOOST_CHECK_EQUAL(msg1.get(), received.get());
BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount());
@@ -187,23 +204,22 @@ QPID_AUTO_TEST_CASE(testDequeue){
BOOST_CHECK_EQUAL(msg2.get(), received.get());
BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount());
- TestConsumer consumer;
+ TestConsumer::shared_ptr consumer(new TestConsumer());
queue->consume(consumer);
queue->dispatch(consumer);
- if (!consumer.received)
+ if (!consumer->received)
sleep(2);
- BOOST_CHECK_EQUAL(msg3.get(), consumer.last.get());
+ BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
received = queue->get().payload;
BOOST_CHECK(!received);
BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
-
+
}
-QPID_AUTO_TEST_CASE(testBound)
-{
+QPID_AUTO_TEST_CASE(testBound){
//test the recording of bindings, and use of those to allow a queue to be unbound
string key("my-key");
FieldTable args;
@@ -231,11 +247,856 @@ QPID_AUTO_TEST_CASE(testBound)
queue->unbind(exchanges, queue);
//ensure the remaining exchanges don't still have the queue bound to them:
- FailOnDeliver deliverable;
+ FailOnDeliver deliverable;
exchange1->route(deliverable, key, &args);
exchange3->route(deliverable, key, &args);
}
-QPID_AUTO_TEST_SUITE_END()
+QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
+ client::QueueOptions args;
+ args.setPersistLastNode();
+
+ Queue::shared_ptr queue(new Queue("my-queue", true));
+ queue->configure(args);
+
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = create_message("e", "C");
+
+ //enqueue 2 messages
+ queue->deliver(msg1);
+ queue->deliver(msg2);
+
+ //change mode
+ queue->setLastNodeFailure();
+
+ //enqueue 1 message
+ queue->deliver(msg3);
+
+ //check all have persistent ids.
+ BOOST_CHECK(msg1->isPersistent());
+ BOOST_CHECK(msg2->isPersistent());
+ BOOST_CHECK(msg3->isPersistent());
+
+}
+
+
+QPID_AUTO_TEST_CASE(testSeek){
+
+ Queue::shared_ptr queue(new Queue("my-queue", true));
+
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = create_message("e", "C");
+
+ //enqueue 2 messages
+ queue->deliver(msg1);
+ queue->deliver(msg2);
+ queue->deliver(msg3);
+
+ TestConsumer::shared_ptr consumer(new TestConsumer(false));
+ SequenceNumber seq(2);
+ consumer->position = seq;
+
+ QueuedMessage qm;
+ queue->dispatch(consumer);
+
+ BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
+ queue->dispatch(consumer);
+ queue->dispatch(consumer); // make sure over-run is safe
+
+}
+
+QPID_AUTO_TEST_CASE(testSearch){
+
+ Queue::shared_ptr queue(new Queue("my-queue", true));
+
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = create_message("e", "C");
+
+ //enqueue 2 messages
+ queue->deliver(msg1);
+ queue->deliver(msg2);
+ queue->deliver(msg3);
+
+ SequenceNumber seq(2);
+ QueuedMessage qm = queue->find(seq);
+
+ BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
+
+ queue->acquire(qm);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
+ SequenceNumber seq1(3);
+ QueuedMessage qm1 = queue->find(seq1);
+ BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
+
+}
+const std::string nullxid = "";
+
+class SimpleDummyCtxt : public TransactionContext {};
+
+class DummyCtxt : public TPCTransactionContext
+{
+ const std::string xid;
+ public:
+ DummyCtxt(const std::string& _xid) : xid(_xid) {}
+ static std::string getXid(TransactionContext& ctxt)
+ {
+ DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt));
+ return c ? c->xid : nullxid;
+ }
+};
+
+class TestMessageStoreOC : public MessageStore
+{
+ std::set<std::string> prepared;
+ uint64_t nextPersistenceId;
+ public:
+
+ uint enqCnt;
+ uint deqCnt;
+ bool error;
+
+ TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {}
+ ~TestMessageStoreOC(){}
+
+ virtual void dequeue(TransactionContext*,
+ const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
+ const PersistableQueue& /*queue*/)
+ {
+ if (error) throw Exception("Dequeue error test");
+ deqCnt++;
+ }
+
+ virtual void enqueue(TransactionContext*,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& /* queue */)
+ {
+ if (error) throw Exception("Enqueue error test");
+ enqCnt++;
+ msg->enqueueComplete();
+ }
+
+ void createError()
+ {
+ error=true;
+ }
+
+ bool init(const Options*) { return true; }
+ void truncateInit(const bool) {}
+ void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); }
+ void destroy(PersistableQueue&) {}
+ void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); }
+ void destroy(const PersistableExchange&) {}
+ void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
+ void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
+ void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); }
+ void destroy(const PersistableConfig&) {}
+ void stage(const boost::intrusive_ptr<PersistableMessage>&) {}
+ void destroy(PersistableMessage&) {}
+ void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {}
+ void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&,
+ std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); }
+ void flush(const qpid::broker::PersistableQueue&) {}
+ uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; }
+
+ std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); }
+ std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); }
+ void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); }
+ void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
+ void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
+ void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); }
+
+ void recover(RecoveryManager&) {}
+};
+
+
+QPID_AUTO_TEST_CASE(testLVQOrdering){
+
+ client::QueueOptions args;
+ // set queue mode
+ args.setOrdering(client::LVQ);
+
+ Queue::shared_ptr queue(new Queue("my-queue", true ));
+ queue->configure(args);
+
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg4 = create_message("e", "D");
+ intrusive_ptr<Message> received;
+
+ //set deliever match for LVQ a,b,c,a
+
+ string key;
+ args.getLVQKey(key);
+ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
+
+
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+ msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+ msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+
+ //enqueue 4 message
+ queue->deliver(msg1);
+ queue->deliver(msg2);
+ queue->deliver(msg3);
+ queue->deliver(msg4);
+
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+
+ received = queue->get().payload;
+ BOOST_CHECK_EQUAL(msg4.get(), received.get());
+
+ received = queue->get().payload;
+ BOOST_CHECK_EQUAL(msg2.get(), received.get());
+
+ received = queue->get().payload;
+ BOOST_CHECK_EQUAL(msg3.get(), received.get());
+
+ intrusive_ptr<Message> msg5 = create_message("e", "A");
+ intrusive_ptr<Message> msg6 = create_message("e", "B");
+ intrusive_ptr<Message> msg7 = create_message("e", "C");
+ msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+ msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+ queue->deliver(msg5);
+ queue->deliver(msg6);
+ queue->deliver(msg7);
+
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+
+ received = queue->get().payload;
+ BOOST_CHECK_EQUAL(msg5.get(), received.get());
+
+ received = queue->get().payload;
+ BOOST_CHECK_EQUAL(msg6.get(), received.get());
+
+ received = queue->get().payload;
+ BOOST_CHECK_EQUAL(msg7.get(), received.get());
+
+}
+
+QPID_AUTO_TEST_CASE(testLVQEmptyKey){
+
+ client::QueueOptions args;
+ // set queue mode
+ args.setOrdering(client::LVQ);
+ Queue::shared_ptr queue(new Queue("my-queue", true ));
+ queue->configure(args);
+
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+
+ string key;
+ args.getLVQKey(key);
+ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
+
+
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ queue->deliver(msg1);
+ queue->deliver(msg2);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
+
+}
+
+QPID_AUTO_TEST_CASE(testLVQAcquire){
+
+ client::QueueOptions args;
+ // set queue mode
+ args.setOrdering(client::LVQ);
+
+ Queue::shared_ptr queue(new Queue("my-queue", true ));
+ queue->configure(args);
+
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg4 = create_message("e", "D");
+ intrusive_ptr<Message> msg5 = create_message("e", "F");
+ intrusive_ptr<Message> msg6 = create_message("e", "G");
+
+ //set deliever match for LVQ a,b,c,a
+
+ string key;
+ args.getLVQKey(key);
+ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
+
+
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+ msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+ msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+ msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+
+ //enqueue 4 message
+ queue->deliver(msg1);
+ queue->deliver(msg2);
+ queue->deliver(msg3);
+ queue->deliver(msg4);
+
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+
+ framing::SequenceNumber sequence(1);
+ QueuedMessage qmsg(queue.get(), msg1, sequence);
+ QueuedMessage qmsg2(queue.get(), msg2, ++sequence);
+
+ BOOST_CHECK(!queue->acquire(qmsg));
+ BOOST_CHECK(queue->acquire(qmsg2));
+
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
+
+ queue->deliver(msg5);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+
+ // set mode to no browse and check
+ args.setOrdering(client::LVQ_NO_BROWSE);
+ queue->configure(args);
+ TestConsumer::shared_ptr c1(new TestConsumer(false));
+
+ queue->dispatch(c1);
+ queue->dispatch(c1);
+ queue->dispatch(c1);
+
+ queue->deliver(msg6);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+
+ intrusive_ptr<Message> received;
+ received = queue->get().payload;
+ BOOST_CHECK_EQUAL(msg4.get(), received.get());
+
+}
+
+QPID_AUTO_TEST_CASE(testLVQMultiQueue){
+
+ client::QueueOptions args;
+ // set queue mode
+ args.setOrdering(client::LVQ);
+
+ Queue::shared_ptr queue1(new Queue("my-queue", true ));
+ Queue::shared_ptr queue2(new Queue("my-queue", true ));
+ intrusive_ptr<Message> received;
+ queue1->configure(args);
+ queue2->configure(args);
+
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "A");
+
+ string key;
+ args.getLVQKey(key);
+ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
+
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+
+ queue1->deliver(msg1);
+ queue2->deliver(msg1);
+ queue1->deliver(msg2);
+
+ received = queue1->get().payload;
+ BOOST_CHECK_EQUAL(msg2.get(), received.get());
+
+ received = queue2->get().payload;
+ BOOST_CHECK_EQUAL(msg1.get(), received.get());
+
+}
+
+QPID_AUTO_TEST_CASE(testLVQRecover){
+
+/* simulate this
+ 1. start 2 nodes
+ 2. create cluster durable lvq
+ 3. send a transient message to the queue
+ 4. kill one of the nodes (to trigger force persistent behaviour)...
+ 5. then restart it (to turn off force persistent behaviour)
+ 6. send another transient message with same lvq key as in 3
+ 7. kill the second node again (retrigger force persistent)
+ 8. stop and recover the first node
+*/
+ TestMessageStoreOC testStore;
+ client::QueueOptions args;
+ // set queue mode
+ args.setOrdering(client::LVQ);
+ args.setPersistLastNode();
+
+ Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
+ intrusive_ptr<Message> received;
+ queue1->configure(args);
+
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "A");
+ // 2
+ string key;
+ args.getLVQKey(key);
+ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
+
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ // 3
+ queue1->deliver(msg1);
+ // 4
+ queue1->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
+ // 5
+ queue1->clearLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
+ // 6
+ queue1->deliver(msg2);
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
+ queue1->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
+ BOOST_CHECK_EQUAL(testStore.deqCnt, 1u);
+}
+
+void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
+{
+ for (uint i = 0; i < count; i++) {
+ intrusive_ptr<Message> m = create_message("exchange", "key");
+ if (i % 2) {
+ if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl);
+ } else {
+ if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl);
+ }
+ m->setTimestamp(new broker::ExpiryPolicy);
+ queue.deliver(m);
+ }
+}
+
+QPID_AUTO_TEST_CASE(testPurgeExpired) {
+ Queue queue("my-queue");
+ addMessagesToQueue(10, queue);
+ BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u);
+ ::usleep(300*1000);
+ queue.purgeExpired();
+ BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u);
+}
+
+QPID_AUTO_TEST_CASE(testQueueCleaner) {
+ Timer timer;
+ QueueRegistry queues;
+ Queue::shared_ptr queue = queues.declare("my-queue").first;
+ addMessagesToQueue(10, *queue, 200, 400);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
+
+ QueueCleaner cleaner(queues, timer);
+ cleaner.start(100 * qpid::sys::TIME_MSEC);
+ ::usleep(300*1000);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
+ ::usleep(300*1000);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
+}
+
+QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
+
+ TestMessageStoreOC testStore;
+ client::QueueOptions args;
+ args.setPersistLastNode();
+
+ Queue::shared_ptr queue1(new Queue("queue1", true, &testStore ));
+ queue1->configure(args);
+ Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
+ queue2->configure(args);
+
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+
+ queue1->deliver(msg1);
+ queue2->deliver(msg1);
+
+ //change mode
+ queue1->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
+ queue2->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
+
+ // check they don't get stored twice
+ queue1->setLastNodeFailure();
+ queue2->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
+
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+ queue1->deliver(msg2);
+ queue2->deliver(msg2);
+
+ queue1->clearLastNodeFailure();
+ queue2->clearLastNodeFailure();
+ // check only new messages get forced
+ queue1->setLastNodeFailure();
+ queue2->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
+
+ // check no failure messages are stored
+ queue1->clearLastNodeFailure();
+ queue2->clearLastNodeFailure();
+
+ intrusive_ptr<Message> msg3 = create_message("e", "B");
+ queue1->deliver(msg3);
+ queue2->deliver(msg3);
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
+ queue1->setLastNodeFailure();
+ queue2->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 6u);
+
+ // check requeue 1
+ intrusive_ptr<Message> msg4 = create_message("e", "C");
+ intrusive_ptr<Message> msg5 = create_message("e", "D");
+
+ framing::SequenceNumber sequence(1);
+ QueuedMessage qmsg1(queue1.get(), msg4, sequence);
+ QueuedMessage qmsg2(queue2.get(), msg5, ++sequence);
+
+ queue1->requeue(qmsg1);
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 7u);
+
+ // check requeue 2
+ queue2->clearLastNodeFailure();
+ queue2->requeue(qmsg2);
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 7u);
+ queue2->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
+
+ queue2->clearLastNodeFailure();
+ queue2->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
+}
+
+QPID_AUTO_TEST_CASE(testLastNodeRecoverAndFail){
+/*
+simulate this:
+ 1. start two nodes
+ 2. create cluster durable queue and add some messages
+ 3. kill one node (trigger force-persistent behaviour)
+ 4. stop and recover remaining node
+ 5. add another node
+ 6. kill that new node again
+make sure that an attempt to re-enqueue a message does not happen which will
+result in the last man standing exiting with an error.
+
+we need to make sure that recover is safe, i.e. messages are
+not requeued to the store.
+*/
+ TestMessageStoreOC testStore;
+ client::QueueOptions args;
+ // set queue mode
+ args.setPersistLastNode();
+
+ Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
+ intrusive_ptr<Message> received;
+ queue1->configure(args);
+
+ // check requeue 1
+ intrusive_ptr<Message> msg1 = create_message("e", "C");
+ intrusive_ptr<Message> msg2 = create_message("e", "D");
+
+ queue1->recover(msg1);
+
+ queue1->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
+
+ queue1->clearLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
+
+ queue1->deliver(msg2);
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
+ queue1->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
+
+}
+
+QPID_AUTO_TEST_CASE(testLastNodeJournalError){
+/*
+simulate store exception going into last node standing
+
+*/
+ TestMessageStoreOC testStore;
+ client::QueueOptions args;
+ // set queue mode
+ args.setPersistLastNode();
+
+ Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
+ intrusive_ptr<Message> received;
+ queue1->configure(args);
+
+ // check requeue 1
+ intrusive_ptr<Message> msg1 = create_message("e", "C");
+
+ queue1->deliver(msg1);
+ testStore.createError();
+
+ ScopedSuppressLogging sl; // Suppress messages for expected errors.
+ queue1->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
+
+}
+
+intrusive_ptr<Message> mkMsg(MessageStore& store, std::string content = "", bool durable = false)
+{
+ intrusive_ptr<Message> msg = MessageUtils::createMessage("", "", durable);
+ if (content.size()) MessageUtils::addContent(msg, content);
+ msg->setStore(&store);
+ return msg;
+}
+
+QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
+
+ TestMessageStoreOC testStore;
+ client::QueueOptions args0; // No size policy
+ client::QueueOptions args1;
+ args1.setSizePolicy(FLOW_TO_DISK, 0, 1);
+ client::QueueOptions args2;
+ args2.setSizePolicy(FLOW_TO_DISK, 0, 2);
+
+ // --- Fanout exchange bound to single transient queue -------------------------------------------------------------
+
+ FanOutExchange sbtFanout1("sbtFanout1", false, args0); // single binding to transient queue
+ Queue::shared_ptr tq1(new Queue("tq1", true)); // transient w/ limit
+ tq1->configure(args1);
+ sbtFanout1.bind(tq1, "", 0);
+
+ intrusive_ptr<Message> msg01 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg01(msg01);
+ sbtFanout1.route(dmsg01, "", 0); // Brings queue 1 to capacity limit
+ msg01->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg01->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
+
+ intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg02(msg02);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException);
+ msg02->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg02->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
+
+ intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
+ DeliverableMessage dmsg03(msg03);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException);
+ msg03->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg03->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
+
+ intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content
+ DeliverableMessage dmsg04(msg04);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException);
+ msg04->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg04->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
+
+ intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content
+ DeliverableMessage dmsg05(msg05);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException);
+ msg05->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg05->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
+
+ // --- Fanout exchange bound to single durable queue ---------------------------------------------------------------
+
+ FanOutExchange sbdFanout2("sbdFanout2", false, args0); // single binding to durable queue
+ Queue::shared_ptr dq2(new Queue("dq2", true, &testStore)); // durable w/ limit
+ dq2->configure(args1);
+ sbdFanout2.bind(dq2, "", 0);
+
+ intrusive_ptr<Message> msg06 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg06(msg06);
+ sbdFanout2.route(dmsg06, "", 0); // Brings queue 2 to capacity limit
+ msg06->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg06->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, dq2->getMessageCount());
+
+ intrusive_ptr<Message> msg07 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg07(msg07);
+ sbdFanout2.route(dmsg07, "", 0);
+ msg07->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg07->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(2u, dq2->getMessageCount());
+
+ intrusive_ptr<Message> msg08 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
+ DeliverableMessage dmsg08(msg08);
+ sbdFanout2.route(dmsg08, "", 0);
+ msg08->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg08->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(3u, dq2->getMessageCount());
+
+ intrusive_ptr<Message> msg09 = mkMsg(testStore); // transient no content
+ DeliverableMessage dmsg09(msg09);
+ sbdFanout2.route(dmsg09, "", 0);
+ msg09->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg09->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(4u, dq2->getMessageCount());
+
+ intrusive_ptr<Message> msg10 = mkMsg(testStore, "", true); // durable no content
+ DeliverableMessage dmsg10(msg10);
+ sbdFanout2.route(dmsg10, "", 0);
+ msg10->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg10->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(5u, dq2->getMessageCount());
+
+ // --- Fanout exchange bound to multiple durable queues ------------------------------------------------------------
+
+ FanOutExchange mbdFanout3("mbdFanout3", false, args0); // multiple bindings to durable queues
+ Queue::shared_ptr dq3(new Queue("dq3", true, &testStore)); // durable w/ limit 2
+ dq3->configure(args2);
+ mbdFanout3.bind(dq3, "", 0);
+ Queue::shared_ptr dq4(new Queue("dq4", true, &testStore)); // durable w/ limit 1
+ dq4->configure(args1);
+ mbdFanout3.bind(dq4, "", 0);
+ Queue::shared_ptr dq5(new Queue("dq5", true, &testStore)); // durable no limit
+ dq5->configure(args0);
+ mbdFanout3.bind(dq5, "", 0);
+
+ intrusive_ptr<Message> msg11 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg11(msg11);
+ mbdFanout3.route(dmsg11, "", 0); // Brings queues 3 and 4 to capacity limit
+ msg11->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg11->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(1u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(1u, dq5->getMessageCount());
+
+ intrusive_ptr<Message> msg12 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg12(msg12);
+ mbdFanout3.route(dmsg12, "", 0);
+ msg12->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg12->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point!
+ BOOST_CHECK_EQUAL(2u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(2u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(2u, dq5->getMessageCount());
+
+ intrusive_ptr<Message> msg13 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
+ DeliverableMessage dmsg13(msg13);
+ mbdFanout3.route(dmsg13, "", 0);
+ msg13->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg13->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(3u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(3u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(3u, dq5->getMessageCount());
+
+ intrusive_ptr<Message> msg14 = mkMsg(testStore); // transient no content
+ DeliverableMessage dmsg14(msg14);
+ mbdFanout3.route(dmsg14, "", 0);
+ msg14->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg14->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point!
+ BOOST_CHECK_EQUAL(4u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(4u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(4u, dq5->getMessageCount());
+
+ intrusive_ptr<Message> msg15 = mkMsg(testStore, "", true); // durable no content
+ DeliverableMessage dmsg15(msg15);
+ mbdFanout3.route(dmsg15, "", 0);
+ msg15->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg15->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(5u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(5u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(5u, dq5->getMessageCount());
+
+ // Bind a transient queue, this should block the release of any further messages.
+ // Note: this will result in a violation of the count policy of dq3 and dq4 - but this
+ // is expected until a better overall multi-queue design is implemented. Similarly
+ // for the other tests in this section.
+
+ Queue::shared_ptr tq6(new Queue("tq6", true)); // transient no limit
+ tq6->configure(args0);
+ mbdFanout3.bind(tq6, "", 0);
+
+ intrusive_ptr<Message> msg16 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg16(msg16);
+ mbdFanout3.route(dmsg16, "", 0);
+ msg16->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg16->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(6u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(6u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(6u, dq5->getMessageCount());
+
+ intrusive_ptr<Message> msg17 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
+ DeliverableMessage dmsg17(msg17);
+ mbdFanout3.route(dmsg17, "", 0);
+ msg17->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg17->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(7u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(7u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(7u, dq5->getMessageCount());
+
+ intrusive_ptr<Message> msg18 = mkMsg(testStore); // transient no content
+ DeliverableMessage dmsg18(msg18);
+ mbdFanout3.route(dmsg18, "", 0);
+ msg18->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg18->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(8u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(8u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(8u, dq5->getMessageCount());
+
+ intrusive_ptr<Message> msg19 = mkMsg(testStore, "", true); // durable no content
+ DeliverableMessage dmsg19(msg19);
+ mbdFanout3.route(dmsg19, "", 0);
+ msg19->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg19->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(9u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(9u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(9u, dq5->getMessageCount());
+
+
+ // --- Fanout exchange bound to multiple durable and transient queues ----------------------------------------------
+
+ FanOutExchange mbmFanout4("mbmFanout4", false, args0); // multiple bindings to durable/transient queues
+ Queue::shared_ptr dq7(new Queue("dq7", true, &testStore)); // durable no limit
+ dq7->configure(args0);
+ mbmFanout4.bind(dq7, "", 0);
+ Queue::shared_ptr dq8(new Queue("dq8", true, &testStore)); // durable w/ limit
+ dq8->configure(args1);
+ mbmFanout4.bind(dq8, "", 0);
+ Queue::shared_ptr tq9(new Queue("tq9", true)); // transient no limit
+ tq9->configure(args0);
+ mbmFanout4.bind(tq9, "", 0);
+
+ intrusive_ptr<Message> msg20 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg20(msg20);
+ mbmFanout4.route(dmsg20, "", 0); // Brings queue 7 to capacity limit
+ msg20->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg20->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, dq7->getMessageCount());
+ BOOST_CHECK_EQUAL(1u, dq8->getMessageCount());
+ BOOST_CHECK_EQUAL(1u, tq9->getMessageCount());
+
+ intrusive_ptr<Message> msg21 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg21(msg21);
+ mbmFanout4.route(dmsg21, "", 0);
+ msg21->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg21->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(2u, dq7->getMessageCount()); // over limit
+ BOOST_CHECK_EQUAL(2u, dq8->getMessageCount());
+ BOOST_CHECK_EQUAL(2u, tq9->getMessageCount());
+
+ intrusive_ptr<Message> msg22 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
+ DeliverableMessage dmsg22(msg22);
+ mbmFanout4.route(dmsg22, "", 0);
+ msg22->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg22->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(3u, dq7->getMessageCount()); // over limit
+ BOOST_CHECK_EQUAL(3u, dq8->getMessageCount()); // over limit
+ BOOST_CHECK_EQUAL(3u, tq9->getMessageCount());
+
+ intrusive_ptr<Message> msg23 = mkMsg(testStore); // transient no content
+ DeliverableMessage dmsg23(msg23);
+ mbmFanout4.route(dmsg23, "", 0);
+ msg23->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg23->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(4u, dq7->getMessageCount()); // over limit
+ BOOST_CHECK_EQUAL(4u, dq8->getMessageCount()); // over limit
+ BOOST_CHECK_EQUAL(4u, tq9->getMessageCount());
+
+ intrusive_ptr<Message> msg24 = mkMsg(testStore, "", true); // durable no content
+ DeliverableMessage dmsg24(msg24);
+ mbmFanout4.route(dmsg24, "", 0);
+ msg24->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg24->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(5u, dq7->getMessageCount()); // over limit
+ BOOST_CHECK_EQUAL(5u, dq8->getMessageCount()); // over limit
+ BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
+}
+
+
+QPID_AUTO_TEST_SUITE_END()
+}} // namespace qpid::tests