diff options
Diffstat (limited to 'cpp/src/tests/QueueTest.cpp')
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 444 |
1 files changed, 76 insertions, 368 deletions
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index aaa2721021..80c69ac386 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -36,9 +36,6 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/reply_exceptions.h" -#include "qpid/broker/QueuePolicy.h" -#include "qpid/broker/QueueFlowLimit.h" - #include <iostream> #include "boost/format.hpp" @@ -56,12 +53,12 @@ class TestConsumer : public virtual Consumer{ public: typedef boost::shared_ptr<TestConsumer> shared_ptr; - QueuedMessage last; + intrusive_ptr<Message> last; bool received; - TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {}; + TestConsumer(bool acquire = true):Consumer(acquire), received(false) {}; virtual bool deliver(QueuedMessage& msg){ - last = msg; + last = msg.payload; received = true; return true; }; @@ -81,14 +78,13 @@ public: Message& getMessage() { return *(msg.get()); } }; -intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) { +intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey) { intrusive_ptr<Message> msg(new Message()); AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); AMQFrame header((AMQHeaderBody())); msg->getFrames().append(method); msg->getFrames().append(header); msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); - if (ttl) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl); return msg; } @@ -149,16 +145,16 @@ QPID_AUTO_TEST_CASE(testConsumers){ queue->deliver(msg1); BOOST_CHECK(queue->dispatch(c1)); - BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get()); + BOOST_CHECK_EQUAL(msg1.get(), c1->last.get()); queue->deliver(msg2); BOOST_CHECK(queue->dispatch(c2)); - BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get()); + BOOST_CHECK_EQUAL(msg2.get(), c2->last.get()); c1->received = false; queue->deliver(msg3); BOOST_CHECK(queue->dispatch(c1)); - BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get()); + BOOST_CHECK_EQUAL(msg3.get(), c1->last.get()); //Test cancellation: queue->cancel(c1); @@ -214,7 +210,7 @@ QPID_AUTO_TEST_CASE(testDequeue){ if (!consumer->received) sleep(2); - BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get()); + BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get()); BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); received = queue->get().payload; @@ -248,7 +244,7 @@ QPID_AUTO_TEST_CASE(testBound){ exchange2.reset(); //unbind the queue from all exchanges it knows it has been bound to: - queue->unbind(exchanges); + queue->unbind(exchanges, queue); //ensure the remaining exchanges don't still have the queue bound to them: FailOnDeliver deliverable; @@ -258,26 +254,26 @@ QPID_AUTO_TEST_CASE(testBound){ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ client::QueueOptions args; - args.setPersistLastNode(); + args.setPersistLastNode(); - Queue::shared_ptr queue(new Queue("my-queue", true)); + Queue::shared_ptr queue(new Queue("my-queue", true)); queue->configure(args); intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "B"); intrusive_ptr<Message> msg3 = create_message("e", "C"); - //enqueue 2 messages + //enqueue 2 messages queue->deliver(msg1); queue->deliver(msg2); - //change mode - queue->setLastNodeFailure(); + //change mode + queue->setLastNodeFailure(); - //enqueue 1 message + //enqueue 1 message queue->deliver(msg3); - //check all have persistent ids. + //check all have persistent ids. BOOST_CHECK(msg1->isPersistent()); BOOST_CHECK(msg2->isPersistent()); BOOST_CHECK(msg3->isPersistent()); @@ -287,58 +283,54 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ QPID_AUTO_TEST_CASE(testSeek){ - Queue::shared_ptr queue(new Queue("my-queue", true)); + Queue::shared_ptr queue(new Queue("my-queue", true)); intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "B"); intrusive_ptr<Message> msg3 = create_message("e", "C"); - //enqueue 2 messages + //enqueue 2 messages queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); - TestConsumer::shared_ptr consumer(new TestConsumer("test", false)); + TestConsumer::shared_ptr consumer(new TestConsumer(false)); SequenceNumber seq(2); consumer->position = seq; QueuedMessage qm; queue->dispatch(consumer); - - BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get()); + + BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get()); queue->dispatch(consumer); queue->dispatch(consumer); // make sure over-run is safe - + } QPID_AUTO_TEST_CASE(testSearch){ - Queue::shared_ptr queue(new Queue("my-queue", true)); + Queue::shared_ptr queue(new Queue("my-queue", true)); intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "B"); intrusive_ptr<Message> msg3 = create_message("e", "C"); - //enqueue 2 messages + //enqueue 2 messages queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); SequenceNumber seq(2); - QueuedMessage qm; - TestConsumer::shared_ptr c1(new TestConsumer()); - - BOOST_CHECK(queue->find(seq, qm)); - + QueuedMessage qm = queue->find(seq); + BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue()); - - queue->acquire(qm, c1->getName()); + + queue->acquire(qm); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); SequenceNumber seq1(3); - QueuedMessage qm1; - BOOST_CHECK(queue->find(seq1, qm1)); + QueuedMessage qm1 = queue->find(seq1); BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue()); - + } const std::string nullxid = ""; @@ -424,10 +416,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ client::QueueOptions args; // set queue mode - args.setOrdering(client::LVQ); + args.setOrdering(client::LVQ); - Queue::shared_ptr queue(new Queue("my-queue", true )); - queue->configure(args); + Queue::shared_ptr queue(new Queue("my-queue", true )); + queue->configure(args); intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "B"); @@ -438,16 +430,16 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ //set deliever match for LVQ a,b,c,a string key; - args.getLVQKey(key); + args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->insertCustomProperty(key,"a"); - msg2->insertCustomProperty(key,"b"); - msg3->insertCustomProperty(key,"c"); - msg4->insertCustomProperty(key,"a"); + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); + msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); + msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); - //enqueue 4 message + //enqueue 4 message queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); @@ -467,9 +459,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ intrusive_ptr<Message> msg5 = create_message("e", "A"); intrusive_ptr<Message> msg6 = create_message("e", "B"); intrusive_ptr<Message> msg7 = create_message("e", "C"); - msg5->insertCustomProperty(key,"a"); - msg6->insertCustomProperty(key,"b"); - msg7->insertCustomProperty(key,"c"); + msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); + msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); queue->deliver(msg5); queue->deliver(msg6); queue->deliver(msg7); @@ -504,7 +496,7 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->insertCustomProperty(key,"a"); + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); queue->deliver(msg1); queue->deliver(msg2); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); @@ -516,8 +508,6 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ client::QueueOptions args; // set queue mode args.setOrdering(client::LVQ); - // disable flow control, as this test violates the enqueue/dequeue sequence. - args.setInt(QueueFlowLimit::flowStopCountKey, 0); Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); @@ -536,12 +526,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->insertCustomProperty(key,"a"); - msg2->insertCustomProperty(key,"b"); - msg3->insertCustomProperty(key,"c"); - msg4->insertCustomProperty(key,"a"); - msg5->insertCustomProperty(key,"b"); - msg6->insertCustomProperty(key,"c"); + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); + msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); + msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); + msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); //enqueue 4 message queue->deliver(msg1); @@ -556,13 +546,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ QueuedMessage qmsg2(queue.get(), msg2, ++sequence); framing::SequenceNumber sequence1(10); QueuedMessage qmsg3(queue.get(), 0, sequence1); - TestConsumer::shared_ptr dummy(new TestConsumer()); - BOOST_CHECK(!queue->acquire(qmsg, dummy->getName())); - BOOST_CHECK(queue->acquire(qmsg2, dummy->getName())); + BOOST_CHECK(!queue->acquire(qmsg)); + BOOST_CHECK(queue->acquire(qmsg2)); // Acquire the massage again to test failure case. - BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName())); - BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName())); + BOOST_CHECK(!queue->acquire(qmsg2)); + BOOST_CHECK(!queue->acquire(qmsg3)); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); @@ -572,7 +561,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ // set mode to no browse and check args.setOrdering(client::LVQ_NO_BROWSE); queue->configure(args); - TestConsumer::shared_ptr c1(new TestConsumer("test", false)); + TestConsumer::shared_ptr c1(new TestConsumer(false)); queue->dispatch(c1); queue->dispatch(c1); @@ -606,8 +595,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){ args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->insertCustomProperty(key,"a"); - msg2->insertCustomProperty(key,"a"); + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); queue1->deliver(msg1); queue2->deliver(msg1); @@ -641,7 +630,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); intrusive_ptr<Message> received; - queue1->create(args); + queue1->configure(args); intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "A"); @@ -650,9 +639,9 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->insertCustomProperty(key,"a"); - msg2->insertCustomProperty(key,"a"); - // 3 + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + // 3 queue1->deliver(msg1); // 4 queue1->setLastNodeFailure(); @@ -671,8 +660,13 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) { for (uint i = 0; i < count; i++) { - intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl); - m->computeExpiration(new broker::ExpiryPolicy); + intrusive_ptr<Message> m = create_message("exchange", "key"); + if (i % 2) { + if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl); + } else { + if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl); + } + m->setTimestamp(new broker::ExpiryPolicy); queue.deliver(m); } } @@ -682,7 +676,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) { addMessagesToQueue(10, queue); BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u); ::usleep(300*1000); - queue.purgeExpired(0); + queue.purgeExpired(); BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u); } @@ -693,7 +687,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) { addMessagesToQueue(10, *queue, 200, 400); BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); - QueueCleaner cleaner(queues, &timer); + QueueCleaner cleaner(queues, timer); cleaner.start(100 * qpid::sys::TIME_MSEC); ::usleep(300*1000); BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u); @@ -701,280 +695,6 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) { BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); } - -namespace { - // helper for group tests - void verifyAcquire( Queue::shared_ptr queue, - TestConsumer::shared_ptr c, - std::deque<QueuedMessage>& results, - const std::string& expectedGroup, - const int expectedId ) - { - queue->dispatch(c); - results.push_back(c->last); - std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); - int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); - BOOST_CHECK_EQUAL( group, expectedGroup ); - BOOST_CHECK_EQUAL( id, expectedId ); - } -} - -QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { - // - // Verify that consumers of grouped messages own the groups once a message is acquired, - // and release the groups once all acquired messages have been dequeued or requeued - // - FieldTable args; - Queue::shared_ptr queue(new Queue("my_queue", true)); - args.setString("qpid.group_header_key", "GROUP-ID"); - args.setInt("qpid.shared_msg_group", 1); - queue->configure(args); - - std::string groups[] = { std::string("a"), std::string("a"), std::string("a"), - std::string("b"), std::string("b"), std::string("b"), - std::string("c"), std::string("c"), std::string("c") }; - for (int i = 0; i < 9; ++i) { - intrusive_ptr<Message> msg = create_message("e", "A"); - msg->insertCustomProperty("GROUP-ID", groups[i]); - msg->insertCustomProperty("MY-ID", i); - queue->deliver(msg); - } - - // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... - // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---, - - BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount()); - - TestConsumer::shared_ptr c1(new TestConsumer("C1")); - TestConsumer::shared_ptr c2(new TestConsumer("C2")); - - queue->consume(c1); - queue->consume(c2); - - std::deque<QueuedMessage> dequeMeC1; - std::deque<QueuedMessage> dequeMeC2; - - - verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0) - verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3) - - // now let c1 complete the 'a-0' message - this should free the 'a' group - queue->dequeue( 0, dequeMeC1.front() ); - dequeMeC1.pop_front(); - - // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... - // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, --- - - // now c2 should pick up the next 'a-1', since it is oldest free - verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b" - - // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... - // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, --- - - // c1 should only be able to snarf up the first "c" message now... - verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c" - - // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... - // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1 - - // hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired) - queue->dequeue( 0, dequeMeC2.front() ); - dequeMeC2.pop_front(); - - // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... - // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1 - - // b group is free, c is owned by c1 - c1's next get should grab 'b-4' - verifyAcquire(queue, c1, dequeMeC1, "b", 4 ); - - // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... - // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1 - - // c2 can now only grab a-2, and that's all - verifyAcquire(queue, c2, dequeMeC2, "a", 2 ); - - // now C2 can't get any more, since C1 owns "b" and "c" group... - bool gotOne = queue->dispatch(c2); - BOOST_CHECK( !gotOne ); - - // hmmm... what if c1 now dequeues "c-6"? (now only own's b-4) - queue->dequeue( 0, dequeMeC1.front() ); - dequeMeC1.pop_front(); - - // Queue = a-1, a-2, b-4, b-5, c-7, c-8... - // Owners= ^C2, ^C2, ^C1, ^C1, ---, --- - - // c2 can now grab c-7 - verifyAcquire(queue, c2, dequeMeC2, "c", 7 ); - - // Queue = a-1, a-2, b-4, b-5, c-7, c-8... - // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2 - - // what happens if C-2 "requeues" a-1 and a-2? - queue->requeue( dequeMeC2.front() ); - dequeMeC2.pop_front(); - queue->requeue( dequeMeC2.front() ); - dequeMeC2.pop_front(); // now just has c-7 acquired - - // Queue = a-1, a-2, b-4, b-5, c-7, c-8... - // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2 - - // now c1 will grab a-1 and a-2... - verifyAcquire(queue, c1, dequeMeC1, "a", 1 ); - verifyAcquire(queue, c1, dequeMeC1, "a", 2 ); - - // Queue = a-1, a-2, b-4, b-5, c-7, c-8... - // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2 - - // c2 can now acquire c-8 only - verifyAcquire(queue, c2, dequeMeC2, "c", 8 ); - - // and c1 can get b-5 - verifyAcquire(queue, c1, dequeMeC1, "b", 5 ); - - // should be no more acquire-able for anyone now: - gotOne = queue->dispatch(c1); - BOOST_CHECK( !gotOne ); - gotOne = queue->dispatch(c2); - BOOST_CHECK( !gotOne ); - - // requeue all of C1's acquired messages, then cancel C1 - while (!dequeMeC1.empty()) { - queue->requeue(dequeMeC1.front()); - dequeMeC1.pop_front(); - } - queue->cancel(c1); - - // Queue = a-1, a-2, b-4, b-5, c-7, c-8... - // Owners= ---, ---, ---, ---, ^C2, ^C2 - - // b-4, a-1, a-2, b-5 all should be available, right? - verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); - - while (!dequeMeC2.empty()) { - queue->dequeue(0, dequeMeC2.front()); - dequeMeC2.pop_front(); - } - - // Queue = a-2, b-4, b-5 - // Owners= ---, ---, --- - - TestConsumer::shared_ptr c3(new TestConsumer("C3")); - std::deque<QueuedMessage> dequeMeC3; - - verifyAcquire(queue, c3, dequeMeC3, "a", 2 ); - verifyAcquire(queue, c2, dequeMeC2, "b", 4 ); - - // Queue = a-2, b-4, b-5 - // Owners= ^C3, ^C2, ^C2 - - gotOne = queue->dispatch(c3); - BOOST_CHECK( !gotOne ); - - verifyAcquire(queue, c2, dequeMeC2, "b", 5 ); - - while (!dequeMeC2.empty()) { - queue->dequeue(0, dequeMeC2.front()); - dequeMeC2.pop_front(); - } - - // Queue = a-2, - // Owners= ^C3, - - intrusive_ptr<Message> msg = create_message("e", "A"); - msg->insertCustomProperty("GROUP-ID", "a"); - msg->insertCustomProperty("MY-ID", 9); - queue->deliver(msg); - - // Queue = a-2, a-9 - // Owners= ^C3, ^C3 - - gotOne = queue->dispatch(c2); - BOOST_CHECK( !gotOne ); - - msg = create_message("e", "A"); - msg->insertCustomProperty("GROUP-ID", "b"); - msg->insertCustomProperty("MY-ID", 10); - queue->deliver(msg); - - // Queue = a-2, a-9, b-10 - // Owners= ^C3, ^C3, ---- - - verifyAcquire(queue, c2, dequeMeC2, "b", 10 ); - verifyAcquire(queue, c3, dequeMeC3, "a", 9 ); - - gotOne = queue->dispatch(c3); - BOOST_CHECK( !gotOne ); - - queue->cancel(c2); - queue->cancel(c3); -} - - -QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { - // - // Verify that the same default group name is automatically applied to messages that - // do not specify a group name. - // - FieldTable args; - Queue::shared_ptr queue(new Queue("my_queue", true)); - args.setString("qpid.group_header_key", "GROUP-ID"); - args.setInt("qpid.shared_msg_group", 1); - queue->configure(args); - - for (int i = 0; i < 3; ++i) { - intrusive_ptr<Message> msg = create_message("e", "A"); - // no "GROUP-ID" header - msg->insertCustomProperty("MY-ID", i); - queue->deliver(msg); - } - - // Queue = 0, 1, 2 - - BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); - - TestConsumer::shared_ptr c1(new TestConsumer("C1")); - TestConsumer::shared_ptr c2(new TestConsumer("C2")); - - queue->consume(c1); - queue->consume(c2); - - std::deque<QueuedMessage> dequeMeC1; - std::deque<QueuedMessage> dequeMeC2; - - queue->dispatch(c1); // c1 now owns default group (acquired 0) - dequeMeC1.push_back(c1->last); - int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); - BOOST_CHECK_EQUAL( id, 0 ); - - bool gotOne = queue->dispatch(c2); // c2 should get nothing - BOOST_CHECK( !gotOne ); - - queue->dispatch(c1); // c1 now acquires 1 - dequeMeC1.push_back(c1->last); - id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); - BOOST_CHECK_EQUAL( id, 1 ); - - gotOne = queue->dispatch(c2); // c2 should still get nothing - BOOST_CHECK( !gotOne ); - - while (!dequeMeC1.empty()) { - queue->dequeue(0, dequeMeC1.front()); - dequeMeC1.pop_front(); - } - - // now default group should be available... - queue->dispatch(c2); // c2 now owns default group (acquired 2) - id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); - BOOST_CHECK_EQUAL( id, 2 ); - - gotOne = queue->dispatch(c1); // c1 should get nothing - BOOST_CHECK( !gotOne ); - - queue->cancel(c1); - queue->cancel(c2); -} - QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ TestMessageStoreOC testStore; @@ -982,9 +702,9 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ args.setPersistLastNode(); Queue::shared_ptr queue1(new Queue("queue1", true, &testStore )); - queue1->create(args); + queue1->configure(args); Queue::shared_ptr queue2(new Queue("queue2", true, &testStore )); - queue2->create(args); + queue2->configure(args); intrusive_ptr<Message> msg1 = create_message("e", "A"); @@ -1070,7 +790,7 @@ not requeued to the store. Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); intrusive_ptr<Message> received; - queue1->create(args); + queue1->configure(args); // check requeue 1 intrusive_ptr<Message> msg1 = create_message("e", "C"); @@ -1150,40 +870,28 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg02(msg02); - { - ScopedSuppressLogging sl; // suppress expected error messages. - BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException); - } + BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException); msg02->tryReleaseContent(); BOOST_CHECK_EQUAL(msg02->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content DeliverableMessage dmsg03(msg03); - { - ScopedSuppressLogging sl; // suppress expected error messages. - BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException); - } + BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException); msg03->tryReleaseContent(); BOOST_CHECK_EQUAL(msg03->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content DeliverableMessage dmsg04(msg04); - { - ScopedSuppressLogging sl; // suppress expected error messages. - BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException); - } + BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException); msg04->tryReleaseContent(); BOOST_CHECK_EQUAL(msg04->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content DeliverableMessage dmsg05(msg05); - { - ScopedSuppressLogging sl; // suppress expected error messages. - BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException); - } + BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException); msg05->tryReleaseContent(); BOOST_CHECK_EQUAL(msg05->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); |