/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "MessageUtils.h" #include "unit_test.h" #include "test_tools.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/ExpiryPolicy.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/QueueOptions.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/QueuePolicy.h" #include "qpid/broker/QueueFlowLimit.h" #include #include "boost/format.hpp" using boost::intrusive_ptr; using namespace qpid; using namespace qpid::broker; using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; namespace qpid { namespace tests { class TestConsumer : public virtual Consumer{ public: typedef boost::shared_ptr shared_ptr; QueuedMessage last; bool received; TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {}; virtual bool deliver(QueuedMessage& msg){ last = msg; received = true; return true; }; void notify() {} void cancel() {} OwnershipToken* getSession() { return 0; } }; class FailOnDeliver : public Deliverable { boost::intrusive_ptr msg; public: FailOnDeliver() : msg(MessageUtils::createMessage()) {} void deliverTo(const boost::shared_ptr& queue) { throw Exception(QPID_MSG("Invalid delivery to " << queue->getName())); } Message& getMessage() { return *(msg.get()); } }; intrusive_ptr create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) { intrusive_ptr msg(new Message()); AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); AMQFrame header((AMQHeaderBody())); msg->getFrames().append(method); msg->getFrames().append(header); msg->getFrames().getHeaders()->get(true)->setRoutingKey(routingKey); if (ttl) msg->getFrames().getHeaders()->get(true)->setTtl(ttl); return msg; } QPID_AUTO_TEST_SUITE(QueueTestSuite) QPID_AUTO_TEST_CASE(testAsyncMessage) { Queue::shared_ptr queue(new Queue("my_test_queue", true)); intrusive_ptr received; TestConsumer::shared_ptr c1(new TestConsumer()); queue->consume(c1); //Test basic delivery: intrusive_ptr msg1 = create_message("e", "A"); msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process queue->process(msg1); sleep(2); BOOST_CHECK(!c1->received); msg1->enqueueComplete(); received = queue->get().payload; BOOST_CHECK_EQUAL(msg1.get(), received.get()); } QPID_AUTO_TEST_CASE(testAsyncMessageCount){ Queue::shared_ptr queue(new Queue("my_test_queue", true)); intrusive_ptr msg1 = create_message("e", "A"); msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process queue->process(msg1); sleep(2); uint32_t compval=0; BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount()); msg1->enqueueComplete(); compval=1; BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount()); BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); } QPID_AUTO_TEST_CASE(testConsumers){ Queue::shared_ptr queue(new Queue("my_queue", true)); //Test adding consumers: TestConsumer::shared_ptr c1(new TestConsumer()); TestConsumer::shared_ptr c2(new TestConsumer()); queue->consume(c1); queue->consume(c2); BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount()); //Test basic delivery: intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "B"); intrusive_ptr msg3 = create_message("e", "C"); queue->deliver(msg1); BOOST_CHECK(queue->dispatch(c1)); BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get()); queue->deliver(msg2); BOOST_CHECK(queue->dispatch(c2)); BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get()); c1->received = false; queue->deliver(msg3); BOOST_CHECK(queue->dispatch(c1)); BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get()); //Test cancellation: queue->cancel(c1); BOOST_CHECK_EQUAL(uint32_t(1), queue->getConsumerCount()); queue->cancel(c2); BOOST_CHECK_EQUAL(uint32_t(0), queue->getConsumerCount()); } QPID_AUTO_TEST_CASE(testRegistry){ //Test use of queues in registry: QueueRegistry registry; registry.declare("queue1", true, true); registry.declare("queue2", true, true); registry.declare("queue3", true, true); BOOST_CHECK(registry.find("queue1")); BOOST_CHECK(registry.find("queue2")); BOOST_CHECK(registry.find("queue3")); registry.destroy("queue1"); registry.destroy("queue2"); registry.destroy("queue3"); BOOST_CHECK(!registry.find("queue1")); BOOST_CHECK(!registry.find("queue2")); BOOST_CHECK(!registry.find("queue3")); } QPID_AUTO_TEST_CASE(testDequeue){ Queue::shared_ptr queue(new Queue("my_queue", true)); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "B"); intrusive_ptr msg3 = create_message("e", "C"); intrusive_ptr received; queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); received = queue->get().payload; BOOST_CHECK_EQUAL(msg1.get(), received.get()); BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount()); received = queue->get().payload; BOOST_CHECK_EQUAL(msg2.get(), received.get()); BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount()); TestConsumer::shared_ptr consumer(new TestConsumer()); queue->consume(consumer); queue->dispatch(consumer); if (!consumer->received) sleep(2); BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get()); BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); received = queue->get().payload; BOOST_CHECK(!received); BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); } QPID_AUTO_TEST_CASE(testBound){ //test the recording of bindings, and use of those to allow a queue to be unbound string key("my-key"); FieldTable args; Queue::shared_ptr queue(new Queue("my-queue", true)); ExchangeRegistry exchanges; //establish bindings from exchange->queue and notify the queue as it is bound: Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first; exchange1->bind(queue, key, &args); queue->bound(exchange1->getName(), key, args); Exchange::shared_ptr exchange2 = exchanges.declare("my-exchange-2", "fanout").first; exchange2->bind(queue, key, &args); queue->bound(exchange2->getName(), key, args); Exchange::shared_ptr exchange3 = exchanges.declare("my-exchange-3", "topic").first; exchange3->bind(queue, key, &args); queue->bound(exchange3->getName(), key, args); //delete one of the exchanges: exchanges.destroy(exchange2->getName()); exchange2.reset(); //unbind the queue from all exchanges it knows it has been bound to: queue->unbind(exchanges); //ensure the remaining exchanges don't still have the queue bound to them: FailOnDeliver deliverable; exchange1->route(deliverable, key, &args); exchange3->route(deliverable, key, &args); } QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ client::QueueOptions args; args.setPersistLastNode(); Queue::shared_ptr queue(new Queue("my-queue", true)); queue->configure(args); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "B"); intrusive_ptr msg3 = create_message("e", "C"); //enqueue 2 messages queue->deliver(msg1); queue->deliver(msg2); //change mode queue->setLastNodeFailure(); //enqueue 1 message queue->deliver(msg3); //check all have persistent ids. BOOST_CHECK(msg1->isPersistent()); BOOST_CHECK(msg2->isPersistent()); BOOST_CHECK(msg3->isPersistent()); } QPID_AUTO_TEST_CASE(testSeek){ Queue::shared_ptr queue(new Queue("my-queue", true)); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "B"); intrusive_ptr msg3 = create_message("e", "C"); //enqueue 2 messages queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); TestConsumer::shared_ptr consumer(new TestConsumer("test", false)); SequenceNumber seq(2); consumer->setPosition(seq); QueuedMessage qm; queue->dispatch(consumer); BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get()); queue->dispatch(consumer); queue->dispatch(consumer); // make sure over-run is safe } QPID_AUTO_TEST_CASE(testSearch){ Queue::shared_ptr queue(new Queue("my-queue", true)); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "B"); intrusive_ptr msg3 = create_message("e", "C"); //enqueue 2 messages queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); SequenceNumber seq(2); QueuedMessage qm; TestConsumer::shared_ptr c1(new TestConsumer()); BOOST_CHECK(queue->find(seq, qm)); BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue()); queue->acquire(qm, c1->getName()); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); SequenceNumber seq1(3); QueuedMessage qm1; BOOST_CHECK(queue->find(seq1, qm1)); BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue()); } const std::string nullxid = ""; class SimpleDummyCtxt : public TransactionContext {}; class DummyCtxt : public TPCTransactionContext { const std::string xid; public: DummyCtxt(const std::string& _xid) : xid(_xid) {} static std::string getXid(TransactionContext& ctxt) { DummyCtxt* c(dynamic_cast(&ctxt)); return c ? c->xid : nullxid; } }; class TestMessageStoreOC : public MessageStore { std::set prepared; uint64_t nextPersistenceId; public: uint enqCnt; uint deqCnt; bool error; TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {} ~TestMessageStoreOC(){} virtual void dequeue(TransactionContext*, const boost::intrusive_ptr& /*msg*/, const PersistableQueue& /*queue*/) { if (error) throw Exception("Dequeue error test"); deqCnt++; } virtual void enqueue(TransactionContext*, const boost::intrusive_ptr& msg, const PersistableQueue& /* queue */) { if (error) throw Exception("Enqueue error test"); enqCnt++; msg->enqueueComplete(); } void createError() { error=true; } bool init(const Options*) { return true; } void truncateInit(const bool) {} void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); } void destroy(PersistableQueue&) {} void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); } void destroy(const PersistableExchange&) {} void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {} void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {} void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); } void destroy(const PersistableConfig&) {} void stage(const boost::intrusive_ptr&) {} void destroy(PersistableMessage&) {} void appendContent(const boost::intrusive_ptr&, const std::string&) {} void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr&, std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); } void flush(const qpid::broker::PersistableQueue&) {} uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; } std::auto_ptr begin() { return std::auto_ptr(new SimpleDummyCtxt()); } std::auto_ptr begin(const std::string& xid) { return std::auto_ptr(new DummyCtxt(xid)); } void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); } void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); } void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); } void collectPreparedXids(std::set& out) { out.insert(prepared.begin(), prepared.end()); } void recover(RecoveryManager&) {} }; QPID_AUTO_TEST_CASE(testLVQOrdering){ client::QueueOptions args; // set queue mode args.setOrdering(client::LVQ); Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "B"); intrusive_ptr msg3 = create_message("e", "C"); intrusive_ptr msg4 = create_message("e", "D"); intrusive_ptr received; //set deliever match for LVQ a,b,c,a string key; args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); msg1->insertCustomProperty(key,"a"); msg2->insertCustomProperty(key,"b"); msg3->insertCustomProperty(key,"c"); msg4->insertCustomProperty(key,"a"); //enqueue 4 message queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); queue->deliver(msg4); BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); received = queue->get().payload; BOOST_CHECK_EQUAL(msg4.get(), received.get()); received = queue->get().payload; BOOST_CHECK_EQUAL(msg2.get(), received.get()); received = queue->get().payload; BOOST_CHECK_EQUAL(msg3.get(), received.get()); intrusive_ptr msg5 = create_message("e", "A"); intrusive_ptr msg6 = create_message("e", "B"); intrusive_ptr msg7 = create_message("e", "C"); msg5->insertCustomProperty(key,"a"); msg6->insertCustomProperty(key,"b"); msg7->insertCustomProperty(key,"c"); queue->deliver(msg5); queue->deliver(msg6); queue->deliver(msg7); BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); received = queue->get().payload; BOOST_CHECK_EQUAL(msg5.get(), received.get()); received = queue->get().payload; BOOST_CHECK_EQUAL(msg6.get(), received.get()); received = queue->get().payload; BOOST_CHECK_EQUAL(msg7.get(), received.get()); } QPID_AUTO_TEST_CASE(testLVQEmptyKey){ client::QueueOptions args; // set queue mode args.setOrdering(client::LVQ); Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "B"); string key; args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); msg1->insertCustomProperty(key,"a"); queue->deliver(msg1); queue->deliver(msg2); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); } QPID_AUTO_TEST_CASE(testLVQAcquire){ client::QueueOptions args; // set queue mode args.setOrdering(client::LVQ); // disable flow control, as this test violates the enqueue/dequeue sequence. args.setInt(QueueFlowLimit::flowStopCountKey, 0); Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "B"); intrusive_ptr msg3 = create_message("e", "C"); intrusive_ptr msg4 = create_message("e", "D"); intrusive_ptr msg5 = create_message("e", "F"); intrusive_ptr msg6 = create_message("e", "G"); //set deliever match for LVQ a,b,c,a string key; args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); msg1->insertCustomProperty(key,"a"); msg2->insertCustomProperty(key,"b"); msg3->insertCustomProperty(key,"c"); msg4->insertCustomProperty(key,"a"); msg5->insertCustomProperty(key,"b"); msg6->insertCustomProperty(key,"c"); //enqueue 4 message queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); queue->deliver(msg4); BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); framing::SequenceNumber sequence(1); QueuedMessage qmsg(queue.get(), msg1, sequence); QueuedMessage qmsg2(queue.get(), msg2, ++sequence); framing::SequenceNumber sequence1(10); QueuedMessage qmsg3(queue.get(), 0, sequence1); TestConsumer::shared_ptr dummy(new TestConsumer()); BOOST_CHECK(!queue->acquire(qmsg, dummy->getName())); BOOST_CHECK(queue->acquire(qmsg2, dummy->getName())); // Acquire the massage again to test failure case. BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName())); BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName())); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); queue->deliver(msg5); BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); // set mode to no browse and check args.setOrdering(client::LVQ_NO_BROWSE); queue->configure(args); TestConsumer::shared_ptr c1(new TestConsumer("test", false)); queue->dispatch(c1); queue->dispatch(c1); queue->dispatch(c1); queue->deliver(msg6); BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); intrusive_ptr received; received = queue->get().payload; BOOST_CHECK_EQUAL(msg4.get(), received.get()); } QPID_AUTO_TEST_CASE(testLVQMultiQueue){ client::QueueOptions args; // set queue mode args.setOrdering(client::LVQ); Queue::shared_ptr queue1(new Queue("my-queue", true )); Queue::shared_ptr queue2(new Queue("my-queue", true )); intrusive_ptr received; queue1->configure(args); queue2->configure(args); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "A"); string key; args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); msg1->insertCustomProperty(key,"a"); msg2->insertCustomProperty(key,"a"); queue1->deliver(msg1); queue2->deliver(msg1); queue1->deliver(msg2); received = queue1->get().payload; BOOST_CHECK_EQUAL(msg2.get(), received.get()); received = queue2->get().payload; BOOST_CHECK_EQUAL(msg1.get(), received.get()); } QPID_AUTO_TEST_CASE(testLVQRecover){ /* simulate this 1. start 2 nodes 2. create cluster durable lvq 3. send a transient message to the queue 4. kill one of the nodes (to trigger force persistent behaviour)... 5. then restart it (to turn off force persistent behaviour) 6. send another transient message with same lvq key as in 3 7. kill the second node again (retrigger force persistent) 8. stop and recover the first node */ TestMessageStoreOC testStore; client::QueueOptions args; // set queue mode args.setOrdering(client::LVQ); args.setPersistLastNode(); Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); intrusive_ptr received; queue1->create(args); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "A"); // 2 string key; args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); msg1->insertCustomProperty(key,"a"); msg2->insertCustomProperty(key,"a"); // 3 queue1->deliver(msg1); // 4 queue1->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); // 5 queue1->clearLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); // 6 queue1->deliver(msg2); BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); queue1->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 2u); BOOST_CHECK_EQUAL(testStore.deqCnt, 1u); } void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) { for (uint i = 0; i < count; i++) { intrusive_ptr m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl); m->computeExpiration(new broker::ExpiryPolicy); queue.deliver(m); } } QPID_AUTO_TEST_CASE(testPurgeExpired) { Queue queue("my-queue"); addMessagesToQueue(10, queue); BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u); ::usleep(300*1000); queue.purgeExpired(0); BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u); } QPID_AUTO_TEST_CASE(testQueueCleaner) { Timer timer; QueueRegistry queues; Queue::shared_ptr queue = queues.declare("my-queue").first; addMessagesToQueue(10, *queue, 200, 400); BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); QueueCleaner cleaner(queues, &timer); cleaner.start(100 * qpid::sys::TIME_MSEC); ::usleep(300*1000); BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u); ::usleep(300*1000); BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); } namespace { // helper for group tests void verifyAcquire( Queue::shared_ptr queue, TestConsumer::shared_ptr c, std::deque& results, const std::string& expectedGroup, const int expectedId ) { queue->dispatch(c); results.push_back(c->last); std::string group = c->last.payload->getProperties()->getApplicationHeaders().getAsString("GROUP-ID"); int id = c->last.payload->getProperties()->getApplicationHeaders().getAsInt("MY-ID"); BOOST_CHECK_EQUAL( group, expectedGroup ); BOOST_CHECK_EQUAL( id, expectedId ); } } QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { // // Verify that consumers of grouped messages own the groups once a message is acquired, // and release the groups once all acquired messages have been dequeued or requeued // FieldTable args; Queue::shared_ptr queue(new Queue("my_queue", true)); args.setString("qpid.group_header_key", "GROUP-ID"); args.setInt("qpid.shared_msg_group", 1); queue->configure(args); std::string groups[] = { std::string("a"), std::string("a"), std::string("a"), std::string("b"), std::string("b"), std::string("b"), std::string("c"), std::string("c"), std::string("c") }; for (int i = 0; i < 9; ++i) { intrusive_ptr msg = create_message("e", "A"); msg->insertCustomProperty("GROUP-ID", groups[i]); msg->insertCustomProperty("MY-ID", i); queue->deliver(msg); } // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---, BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount()); TestConsumer::shared_ptr c1(new TestConsumer("C1")); TestConsumer::shared_ptr c2(new TestConsumer("C2")); queue->consume(c1); queue->consume(c2); std::deque dequeMeC1; std::deque dequeMeC2; verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0) verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3) // now let c1 complete the 'a-0' message - this should free the 'a' group queue->dequeue( 0, dequeMeC1.front() ); dequeMeC1.pop_front(); // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, --- // now c2 should pick up the next 'a-1', since it is oldest free verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b" // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, --- // c1 should only be able to snarf up the first "c" message now... verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c" // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1 // hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired) queue->dequeue( 0, dequeMeC2.front() ); dequeMeC2.pop_front(); // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1 // b group is free, c is owned by c1 - c1's next get should grab 'b-4' verifyAcquire(queue, c1, dequeMeC1, "b", 4 ); // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1 // c2 can now only grab a-2, and that's all verifyAcquire(queue, c2, dequeMeC2, "a", 2 ); // now C2 can't get any more, since C1 owns "b" and "c" group... bool gotOne = queue->dispatch(c2); BOOST_CHECK( !gotOne ); // hmmm... what if c1 now dequeues "c-6"? (now only own's b-4) queue->dequeue( 0, dequeMeC1.front() ); dequeMeC1.pop_front(); // Queue = a-1, a-2, b-4, b-5, c-7, c-8... // Owners= ^C2, ^C2, ^C1, ^C1, ---, --- // c2 can now grab c-7 verifyAcquire(queue, c2, dequeMeC2, "c", 7 ); // Queue = a-1, a-2, b-4, b-5, c-7, c-8... // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2 // what happens if C-2 "requeues" a-1 and a-2? queue->requeue( dequeMeC2.front() ); dequeMeC2.pop_front(); queue->requeue( dequeMeC2.front() ); dequeMeC2.pop_front(); // now just has c-7 acquired // Queue = a-1, a-2, b-4, b-5, c-7, c-8... // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2 // now c1 will grab a-1 and a-2... verifyAcquire(queue, c1, dequeMeC1, "a", 1 ); verifyAcquire(queue, c1, dequeMeC1, "a", 2 ); // Queue = a-1, a-2, b-4, b-5, c-7, c-8... // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2 // c2 can now acquire c-8 only verifyAcquire(queue, c2, dequeMeC2, "c", 8 ); // and c1 can get b-5 verifyAcquire(queue, c1, dequeMeC1, "b", 5 ); // should be no more acquire-able for anyone now: gotOne = queue->dispatch(c1); BOOST_CHECK( !gotOne ); gotOne = queue->dispatch(c2); BOOST_CHECK( !gotOne ); // requeue all of C1's acquired messages, then cancel C1 while (!dequeMeC1.empty()) { queue->requeue(dequeMeC1.front()); dequeMeC1.pop_front(); } queue->cancel(c1); // Queue = a-1, a-2, b-4, b-5, c-7, c-8... // Owners= ---, ---, ---, ---, ^C2, ^C2 // b-4, a-1, a-2, b-5 all should be available, right? verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); while (!dequeMeC2.empty()) { queue->dequeue(0, dequeMeC2.front()); dequeMeC2.pop_front(); } // Queue = a-2, b-4, b-5 // Owners= ---, ---, --- TestConsumer::shared_ptr c3(new TestConsumer("C3")); std::deque dequeMeC3; verifyAcquire(queue, c3, dequeMeC3, "a", 2 ); verifyAcquire(queue, c2, dequeMeC2, "b", 4 ); // Queue = a-2, b-4, b-5 // Owners= ^C3, ^C2, ^C2 gotOne = queue->dispatch(c3); BOOST_CHECK( !gotOne ); verifyAcquire(queue, c2, dequeMeC2, "b", 5 ); while (!dequeMeC2.empty()) { queue->dequeue(0, dequeMeC2.front()); dequeMeC2.pop_front(); } // Queue = a-2, // Owners= ^C3, intrusive_ptr msg = create_message("e", "A"); msg->insertCustomProperty("GROUP-ID", "a"); msg->insertCustomProperty("MY-ID", 9); queue->deliver(msg); // Queue = a-2, a-9 // Owners= ^C3, ^C3 gotOne = queue->dispatch(c2); BOOST_CHECK( !gotOne ); msg = create_message("e", "A"); msg->insertCustomProperty("GROUP-ID", "b"); msg->insertCustomProperty("MY-ID", 10); queue->deliver(msg); // Queue = a-2, a-9, b-10 // Owners= ^C3, ^C3, ---- verifyAcquire(queue, c2, dequeMeC2, "b", 10 ); verifyAcquire(queue, c3, dequeMeC3, "a", 9 ); gotOne = queue->dispatch(c3); BOOST_CHECK( !gotOne ); queue->cancel(c2); queue->cancel(c3); } QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { // // Verify that the same default group name is automatically applied to messages that // do not specify a group name. // FieldTable args; Queue::shared_ptr queue(new Queue("my_queue", true)); args.setString("qpid.group_header_key", "GROUP-ID"); args.setInt("qpid.shared_msg_group", 1); queue->configure(args); for (int i = 0; i < 3; ++i) { intrusive_ptr msg = create_message("e", "A"); // no "GROUP-ID" header msg->insertCustomProperty("MY-ID", i); queue->deliver(msg); } // Queue = 0, 1, 2 BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); TestConsumer::shared_ptr c1(new TestConsumer("C1")); TestConsumer::shared_ptr c2(new TestConsumer("C2")); queue->consume(c1); queue->consume(c2); std::deque dequeMeC1; std::deque dequeMeC2; queue->dispatch(c1); // c1 now owns default group (acquired 0) dequeMeC1.push_back(c1->last); int id = c1->last.payload->getProperties()->getApplicationHeaders().getAsInt("MY-ID"); BOOST_CHECK_EQUAL( id, 0 ); bool gotOne = queue->dispatch(c2); // c2 should get nothing BOOST_CHECK( !gotOne ); queue->dispatch(c1); // c1 now acquires 1 dequeMeC1.push_back(c1->last); id = c1->last.payload->getProperties()->getApplicationHeaders().getAsInt("MY-ID"); BOOST_CHECK_EQUAL( id, 1 ); gotOne = queue->dispatch(c2); // c2 should still get nothing BOOST_CHECK( !gotOne ); while (!dequeMeC1.empty()) { queue->dequeue(0, dequeMeC1.front()); dequeMeC1.pop_front(); } // now default group should be available... queue->dispatch(c2); // c2 now owns default group (acquired 2) id = c2->last.payload->getProperties()->getApplicationHeaders().getAsInt("MY-ID"); BOOST_CHECK_EQUAL( id, 2 ); gotOne = queue->dispatch(c1); // c1 should get nothing BOOST_CHECK( !gotOne ); queue->cancel(c1); queue->cancel(c2); } QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ TestMessageStoreOC testStore; client::QueueOptions args; args.setPersistLastNode(); Queue::shared_ptr queue1(new Queue("queue1", true, &testStore )); queue1->create(args); Queue::shared_ptr queue2(new Queue("queue2", true, &testStore )); queue2->create(args); intrusive_ptr msg1 = create_message("e", "A"); queue1->deliver(msg1); queue2->deliver(msg1); //change mode queue1->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 2u); // check they don't get stored twice queue1->setLastNodeFailure(); queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 2u); intrusive_ptr msg2 = create_message("e", "B"); queue1->deliver(msg2); queue2->deliver(msg2); queue1->clearLastNodeFailure(); queue2->clearLastNodeFailure(); // check only new messages get forced queue1->setLastNodeFailure(); queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 4u); // check no failure messages are stored queue1->clearLastNodeFailure(); queue2->clearLastNodeFailure(); intrusive_ptr msg3 = create_message("e", "B"); queue1->deliver(msg3); queue2->deliver(msg3); BOOST_CHECK_EQUAL(testStore.enqCnt, 4u); queue1->setLastNodeFailure(); queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 6u); // check requeue 1 intrusive_ptr msg4 = create_message("e", "C"); intrusive_ptr msg5 = create_message("e", "D"); framing::SequenceNumber sequence(1); QueuedMessage qmsg1(queue1.get(), msg4, sequence); QueuedMessage qmsg2(queue2.get(), msg5, ++sequence); queue1->requeue(qmsg1); BOOST_CHECK_EQUAL(testStore.enqCnt, 7u); // check requeue 2 queue2->clearLastNodeFailure(); queue2->requeue(qmsg2); BOOST_CHECK_EQUAL(testStore.enqCnt, 7u); queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 8u); queue2->clearLastNodeFailure(); queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 8u); } QPID_AUTO_TEST_CASE(testLastNodeRecoverAndFail){ /* simulate this: 1. start two nodes 2. create cluster durable queue and add some messages 3. kill one node (trigger force-persistent behaviour) 4. stop and recover remaining node 5. add another node 6. kill that new node again make sure that an attempt to re-enqueue a message does not happen which will result in the last man standing exiting with an error. we need to make sure that recover is safe, i.e. messages are not requeued to the store. */ TestMessageStoreOC testStore; client::QueueOptions args; // set queue mode args.setPersistLastNode(); Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); intrusive_ptr received; queue1->create(args); // check requeue 1 intrusive_ptr msg1 = create_message("e", "C"); intrusive_ptr msg2 = create_message("e", "D"); queue1->recover(msg1); queue1->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 0u); queue1->clearLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 0u); queue1->deliver(msg2); BOOST_CHECK_EQUAL(testStore.enqCnt, 0u); queue1->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); } QPID_AUTO_TEST_CASE(testLastNodeJournalError){ /* simulate store exception going into last node standing */ TestMessageStoreOC testStore; client::QueueOptions args; // set queue mode args.setPersistLastNode(); Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); intrusive_ptr received; queue1->configure(args); // check requeue 1 intrusive_ptr msg1 = create_message("e", "C"); queue1->deliver(msg1); testStore.createError(); ScopedSuppressLogging sl; // Suppress messages for expected errors. queue1->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 0u); } intrusive_ptr mkMsg(MessageStore& store, std::string content = "", bool durable = false) { intrusive_ptr msg = MessageUtils::createMessage("", "", durable); if (content.size()) MessageUtils::addContent(msg, content); msg->setStore(&store); return msg; } QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ TestMessageStoreOC testStore; client::QueueOptions args0; // No size policy client::QueueOptions args1; args1.setSizePolicy(FLOW_TO_DISK, 0, 1); client::QueueOptions args2; args2.setSizePolicy(FLOW_TO_DISK, 0, 2); // --- Fanout exchange bound to single transient queue ------------------------------------------------------------- FanOutExchange sbtFanout1("sbtFanout1", false, args0); // single binding to transient queue Queue::shared_ptr tq1(new Queue("tq1", true)); // transient w/ limit tq1->configure(args1); sbtFanout1.bind(tq1, "", 0); intrusive_ptr msg01 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg01(msg01); sbtFanout1.route(dmsg01, "", 0); // Brings queue 1 to capacity limit msg01->tryReleaseContent(); BOOST_CHECK_EQUAL(msg01->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); intrusive_ptr msg02 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg02(msg02); { ScopedSuppressLogging sl; // suppress expected error messages. BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException); } msg02->tryReleaseContent(); BOOST_CHECK_EQUAL(msg02->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); intrusive_ptr msg03 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content DeliverableMessage dmsg03(msg03); { ScopedSuppressLogging sl; // suppress expected error messages. BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException); } msg03->tryReleaseContent(); BOOST_CHECK_EQUAL(msg03->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); intrusive_ptr msg04 = mkMsg(testStore); // transient no content DeliverableMessage dmsg04(msg04); { ScopedSuppressLogging sl; // suppress expected error messages. BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException); } msg04->tryReleaseContent(); BOOST_CHECK_EQUAL(msg04->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); intrusive_ptr msg05 = mkMsg(testStore, "", true); // durable no content DeliverableMessage dmsg05(msg05); { ScopedSuppressLogging sl; // suppress expected error messages. BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException); } msg05->tryReleaseContent(); BOOST_CHECK_EQUAL(msg05->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); // --- Fanout exchange bound to single durable queue --------------------------------------------------------------- FanOutExchange sbdFanout2("sbdFanout2", false, args0); // single binding to durable queue Queue::shared_ptr dq2(new Queue("dq2", true, &testStore)); // durable w/ limit dq2->configure(args1); sbdFanout2.bind(dq2, "", 0); intrusive_ptr msg06 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg06(msg06); sbdFanout2.route(dmsg06, "", 0); // Brings queue 2 to capacity limit msg06->tryReleaseContent(); BOOST_CHECK_EQUAL(msg06->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, dq2->getMessageCount()); intrusive_ptr msg07 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg07(msg07); sbdFanout2.route(dmsg07, "", 0); msg07->tryReleaseContent(); BOOST_CHECK_EQUAL(msg07->isContentReleased(), true); BOOST_CHECK_EQUAL(2u, dq2->getMessageCount()); intrusive_ptr msg08 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content DeliverableMessage dmsg08(msg08); sbdFanout2.route(dmsg08, "", 0); msg08->tryReleaseContent(); BOOST_CHECK_EQUAL(msg08->isContentReleased(), true); BOOST_CHECK_EQUAL(3u, dq2->getMessageCount()); intrusive_ptr msg09 = mkMsg(testStore); // transient no content DeliverableMessage dmsg09(msg09); sbdFanout2.route(dmsg09, "", 0); msg09->tryReleaseContent(); BOOST_CHECK_EQUAL(msg09->isContentReleased(), true); BOOST_CHECK_EQUAL(4u, dq2->getMessageCount()); intrusive_ptr msg10 = mkMsg(testStore, "", true); // durable no content DeliverableMessage dmsg10(msg10); sbdFanout2.route(dmsg10, "", 0); msg10->tryReleaseContent(); BOOST_CHECK_EQUAL(msg10->isContentReleased(), true); BOOST_CHECK_EQUAL(5u, dq2->getMessageCount()); // --- Fanout exchange bound to multiple durable queues ------------------------------------------------------------ FanOutExchange mbdFanout3("mbdFanout3", false, args0); // multiple bindings to durable queues Queue::shared_ptr dq3(new Queue("dq3", true, &testStore)); // durable w/ limit 2 dq3->configure(args2); mbdFanout3.bind(dq3, "", 0); Queue::shared_ptr dq4(new Queue("dq4", true, &testStore)); // durable w/ limit 1 dq4->configure(args1); mbdFanout3.bind(dq4, "", 0); Queue::shared_ptr dq5(new Queue("dq5", true, &testStore)); // durable no limit dq5->configure(args0); mbdFanout3.bind(dq5, "", 0); intrusive_ptr msg11 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg11(msg11); mbdFanout3.route(dmsg11, "", 0); // Brings queues 3 and 4 to capacity limit msg11->tryReleaseContent(); BOOST_CHECK_EQUAL(msg11->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, dq3->getMessageCount()); BOOST_CHECK_EQUAL(1u, dq4->getMessageCount()); BOOST_CHECK_EQUAL(1u, dq5->getMessageCount()); intrusive_ptr msg12 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg12(msg12); mbdFanout3.route(dmsg12, "", 0); msg12->tryReleaseContent(); BOOST_CHECK_EQUAL(msg12->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point! BOOST_CHECK_EQUAL(2u, dq3->getMessageCount()); BOOST_CHECK_EQUAL(2u, dq4->getMessageCount()); BOOST_CHECK_EQUAL(2u, dq5->getMessageCount()); intrusive_ptr msg13 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content DeliverableMessage dmsg13(msg13); mbdFanout3.route(dmsg13, "", 0); msg13->tryReleaseContent(); BOOST_CHECK_EQUAL(msg13->isContentReleased(), true); BOOST_CHECK_EQUAL(3u, dq3->getMessageCount()); BOOST_CHECK_EQUAL(3u, dq4->getMessageCount()); BOOST_CHECK_EQUAL(3u, dq5->getMessageCount()); intrusive_ptr msg14 = mkMsg(testStore); // transient no content DeliverableMessage dmsg14(msg14); mbdFanout3.route(dmsg14, "", 0); msg14->tryReleaseContent(); BOOST_CHECK_EQUAL(msg14->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point! BOOST_CHECK_EQUAL(4u, dq3->getMessageCount()); BOOST_CHECK_EQUAL(4u, dq4->getMessageCount()); BOOST_CHECK_EQUAL(4u, dq5->getMessageCount()); intrusive_ptr msg15 = mkMsg(testStore, "", true); // durable no content DeliverableMessage dmsg15(msg15); mbdFanout3.route(dmsg15, "", 0); msg15->tryReleaseContent(); BOOST_CHECK_EQUAL(msg15->isContentReleased(), true); BOOST_CHECK_EQUAL(5u, dq3->getMessageCount()); BOOST_CHECK_EQUAL(5u, dq4->getMessageCount()); BOOST_CHECK_EQUAL(5u, dq5->getMessageCount()); // Bind a transient queue, this should block the release of any further messages. // Note: this will result in a violation of the count policy of dq3 and dq4 - but this // is expected until a better overall multi-queue design is implemented. Similarly // for the other tests in this section. Queue::shared_ptr tq6(new Queue("tq6", true)); // transient no limit tq6->configure(args0); mbdFanout3.bind(tq6, "", 0); intrusive_ptr msg16 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg16(msg16); mbdFanout3.route(dmsg16, "", 0); msg16->tryReleaseContent(); BOOST_CHECK_EQUAL(msg16->isContentReleased(), false); BOOST_CHECK_EQUAL(6u, dq3->getMessageCount()); BOOST_CHECK_EQUAL(6u, dq4->getMessageCount()); BOOST_CHECK_EQUAL(6u, dq5->getMessageCount()); intrusive_ptr msg17 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content DeliverableMessage dmsg17(msg17); mbdFanout3.route(dmsg17, "", 0); msg17->tryReleaseContent(); BOOST_CHECK_EQUAL(msg17->isContentReleased(), false); BOOST_CHECK_EQUAL(7u, dq3->getMessageCount()); BOOST_CHECK_EQUAL(7u, dq4->getMessageCount()); BOOST_CHECK_EQUAL(7u, dq5->getMessageCount()); intrusive_ptr msg18 = mkMsg(testStore); // transient no content DeliverableMessage dmsg18(msg18); mbdFanout3.route(dmsg18, "", 0); msg18->tryReleaseContent(); BOOST_CHECK_EQUAL(msg18->isContentReleased(), false); BOOST_CHECK_EQUAL(8u, dq3->getMessageCount()); BOOST_CHECK_EQUAL(8u, dq4->getMessageCount()); BOOST_CHECK_EQUAL(8u, dq5->getMessageCount()); intrusive_ptr msg19 = mkMsg(testStore, "", true); // durable no content DeliverableMessage dmsg19(msg19); mbdFanout3.route(dmsg19, "", 0); msg19->tryReleaseContent(); BOOST_CHECK_EQUAL(msg19->isContentReleased(), false); BOOST_CHECK_EQUAL(9u, dq3->getMessageCount()); BOOST_CHECK_EQUAL(9u, dq4->getMessageCount()); BOOST_CHECK_EQUAL(9u, dq5->getMessageCount()); // --- Fanout exchange bound to multiple durable and transient queues ---------------------------------------------- FanOutExchange mbmFanout4("mbmFanout4", false, args0); // multiple bindings to durable/transient queues Queue::shared_ptr dq7(new Queue("dq7", true, &testStore)); // durable no limit dq7->configure(args0); mbmFanout4.bind(dq7, "", 0); Queue::shared_ptr dq8(new Queue("dq8", true, &testStore)); // durable w/ limit dq8->configure(args1); mbmFanout4.bind(dq8, "", 0); Queue::shared_ptr tq9(new Queue("tq9", true)); // transient no limit tq9->configure(args0); mbmFanout4.bind(tq9, "", 0); intrusive_ptr msg20 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg20(msg20); mbmFanout4.route(dmsg20, "", 0); // Brings queue 7 to capacity limit msg20->tryReleaseContent(); BOOST_CHECK_EQUAL(msg20->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, dq7->getMessageCount()); BOOST_CHECK_EQUAL(1u, dq8->getMessageCount()); BOOST_CHECK_EQUAL(1u, tq9->getMessageCount()); intrusive_ptr msg21 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg21(msg21); mbmFanout4.route(dmsg21, "", 0); msg21->tryReleaseContent(); BOOST_CHECK_EQUAL(msg21->isContentReleased(), false); BOOST_CHECK_EQUAL(2u, dq7->getMessageCount()); // over limit BOOST_CHECK_EQUAL(2u, dq8->getMessageCount()); BOOST_CHECK_EQUAL(2u, tq9->getMessageCount()); intrusive_ptr msg22 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content DeliverableMessage dmsg22(msg22); mbmFanout4.route(dmsg22, "", 0); msg22->tryReleaseContent(); BOOST_CHECK_EQUAL(msg22->isContentReleased(), false); BOOST_CHECK_EQUAL(3u, dq7->getMessageCount()); // over limit BOOST_CHECK_EQUAL(3u, dq8->getMessageCount()); // over limit BOOST_CHECK_EQUAL(3u, tq9->getMessageCount()); intrusive_ptr msg23 = mkMsg(testStore); // transient no content DeliverableMessage dmsg23(msg23); mbmFanout4.route(dmsg23, "", 0); msg23->tryReleaseContent(); BOOST_CHECK_EQUAL(msg23->isContentReleased(), false); BOOST_CHECK_EQUAL(4u, dq7->getMessageCount()); // over limit BOOST_CHECK_EQUAL(4u, dq8->getMessageCount()); // over limit BOOST_CHECK_EQUAL(4u, tq9->getMessageCount()); intrusive_ptr msg24 = mkMsg(testStore, "", true); // durable no content DeliverableMessage dmsg24(msg24); mbmFanout4.route(dmsg24, "", 0); msg24->tryReleaseContent(); BOOST_CHECK_EQUAL(msg24->isContentReleased(), false); BOOST_CHECK_EQUAL(5u, dq7->getMessageCount()); // over limit BOOST_CHECK_EQUAL(5u, dq8->getMessageCount()); // over limit BOOST_CHECK_EQUAL(5u, tq9->getMessageCount()); } QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests