diff options
Diffstat (limited to 'qpid/cpp/src/tests/QueueTest.cpp')
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 165 |
1 files changed, 90 insertions, 75 deletions
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index b70afa52a7..841a19f7c1 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,9 +39,12 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; +namespace qpid { +namespace tests { + class TestConsumer : public virtual Consumer{ public: - typedef boost::shared_ptr<TestConsumer> shared_ptr; + typedef boost::shared_ptr<TestConsumer> shared_ptr; intrusive_ptr<Message> last; bool received; @@ -82,68 +85,68 @@ QPID_AUTO_TEST_SUITE(QueueTestSuite) QPID_AUTO_TEST_CASE(testAsyncMessage) { Queue::shared_ptr queue(new Queue("my_test_queue", true)); intrusive_ptr<Message> received; - + TestConsumer::shared_ptr c1(new TestConsumer()); queue->consume(c1); - - + + //Test basic delivery: intrusive_ptr<Message> msg1 = create_message("e", "A"); msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process queue->process(msg1); sleep(2); - + BOOST_CHECK(!c1->received); msg1->enqueueComplete(); - + received = queue->get().payload; - BOOST_CHECK_EQUAL(msg1.get(), received.get()); + BOOST_CHECK_EQUAL(msg1.get(), received.get()); } - - + + QPID_AUTO_TEST_CASE(testAsyncMessageCount){ Queue::shared_ptr queue(new Queue("my_test_queue", true)); intrusive_ptr<Message> msg1 = create_message("e", "A"); msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process - + queue->process(msg1); sleep(2); uint32_t compval=0; BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); msg1->enqueueComplete(); compval=1; - BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); + BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); } QPID_AUTO_TEST_CASE(testConsumers){ Queue::shared_ptr queue(new Queue("my_queue", true)); - + //Test adding consumers: TestConsumer::shared_ptr c1(new TestConsumer()); TestConsumer::shared_ptr c2(new TestConsumer()); queue->consume(c1); queue->consume(c2); - + BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount()); - + //Test basic delivery: intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "B"); intrusive_ptr<Message> msg3 = create_message("e", "C"); - + queue->deliver(msg1); BOOST_CHECK(queue->dispatch(c1)); BOOST_CHECK_EQUAL(msg1.get(), c1->last.get()); - + queue->deliver(msg2); BOOST_CHECK(queue->dispatch(c2)); BOOST_CHECK_EQUAL(msg2.get(), c2->last.get()); - + c1->received = false; queue->deliver(msg3); BOOST_CHECK(queue->dispatch(c1)); - BOOST_CHECK_EQUAL(msg3.get(), c1->last.get()); - + BOOST_CHECK_EQUAL(msg3.get(), c1->last.get()); + //Test cancellation: queue->cancel(c1); BOOST_CHECK_EQUAL(uint32_t(1), queue->getConsumerCount()); @@ -157,15 +160,15 @@ QPID_AUTO_TEST_CASE(testRegistry){ registry.declare("queue1", true, true); registry.declare("queue2", true, true); registry.declare("queue3", true, true); - + BOOST_CHECK(registry.find("queue1")); BOOST_CHECK(registry.find("queue2")); BOOST_CHECK(registry.find("queue3")); - + registry.destroy("queue1"); registry.destroy("queue2"); registry.destroy("queue3"); - + BOOST_CHECK(!registry.find("queue1")); BOOST_CHECK(!registry.find("queue2")); BOOST_CHECK(!registry.find("queue3")); @@ -177,13 +180,13 @@ QPID_AUTO_TEST_CASE(testDequeue){ intrusive_ptr<Message> msg2 = create_message("e", "B"); intrusive_ptr<Message> msg3 = create_message("e", "C"); intrusive_ptr<Message> received; - + queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); - + BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); - + received = queue->get().payload; BOOST_CHECK_EQUAL(msg1.get(), received.get()); BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount()); @@ -204,7 +207,7 @@ QPID_AUTO_TEST_CASE(testDequeue){ received = queue->get().payload; BOOST_CHECK(!received); BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); - + } QPID_AUTO_TEST_CASE(testBound) @@ -236,7 +239,7 @@ QPID_AUTO_TEST_CASE(testBound) queue->unbind(exchanges, queue); //ensure the remaining exchanges don't still have the queue bound to them: - FailOnDeliver deliverable; + FailOnDeliver deliverable; exchange1->route(deliverable, key, &args); exchange3->route(deliverable, key, &args); } @@ -245,10 +248,10 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ client::QueueOptions args; args.setPersistLastNode(); - + Queue::shared_ptr queue(new Queue("my-queue", true)); queue->configure(args); - + intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "B"); intrusive_ptr<Message> msg3 = create_message("e", "C"); @@ -256,13 +259,13 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ //enqueue 2 messages queue->deliver(msg1); queue->deliver(msg2); - + //change mode queue->setLastNodeFailure(); - + //enqueue 1 message queue->deliver(msg3); - + //check all have persistent ids. BOOST_CHECK(msg1->isPersistent()); BOOST_CHECK(msg2->isPersistent()); @@ -277,7 +280,7 @@ class TestMessageStoreOC : public NullMessageStore uint enqCnt; uint deqCnt; bool error; - + virtual void dequeue(TransactionContext*, const boost::intrusive_ptr<PersistableMessage>& /*msg*/, const PersistableQueue& /*queue*/) @@ -298,7 +301,7 @@ class TestMessageStoreOC : public NullMessageStore { error=true; } - + TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {} ~TestMessageStoreOC(){} }; @@ -312,7 +315,7 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); - + intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "B"); intrusive_ptr<Message> msg3 = create_message("e", "C"); @@ -324,27 +327,27 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ string key; args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); - + //enqueue 4 message queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); queue->deliver(msg4); - + BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); - + received = queue->get().payload; BOOST_CHECK_EQUAL(msg4.get(), received.get()); received = queue->get().payload; BOOST_CHECK_EQUAL(msg2.get(), received.get()); - + received = queue->get().payload; BOOST_CHECK_EQUAL(msg3.get(), received.get()); @@ -357,18 +360,18 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ queue->deliver(msg5); queue->deliver(msg6); queue->deliver(msg7); - + BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); - + received = queue->get().payload; BOOST_CHECK_EQUAL(msg5.get(), received.get()); received = queue->get().payload; BOOST_CHECK_EQUAL(msg6.get(), received.get()); - + received = queue->get().payload; BOOST_CHECK_EQUAL(msg7.get(), received.get()); - + } QPID_AUTO_TEST_CASE(testLVQEmptyKey){ @@ -379,20 +382,20 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){ Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); - + intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "B"); string key; args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); queue->deliver(msg1); queue->deliver(msg2); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); - + } QPID_AUTO_TEST_CASE(testLVQAcquire){ @@ -403,7 +406,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); - + intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "B"); intrusive_ptr<Message> msg3 = create_message("e", "C"); @@ -416,7 +419,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ string key; args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); @@ -424,13 +427,13 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); - + //enqueue 4 message queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); queue->deliver(msg4); - + BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); framing::SequenceNumber sequence(1); @@ -439,9 +442,9 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ BOOST_CHECK(!queue->acquire(qmsg)); BOOST_CHECK(queue->acquire(qmsg2)); - + BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); - + queue->deliver(msg5); BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); @@ -449,11 +452,11 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ args.setOrdering(client::LVQ_NO_BROWSE); queue->configure(args); TestConsumer::shared_ptr c1(new TestConsumer(false)); - + queue->dispatch(c1); queue->dispatch(c1); queue->dispatch(c1); - + queue->deliver(msg6); BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); @@ -474,7 +477,7 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){ intrusive_ptr<Message> received; queue1->configure(args); queue2->configure(args); - + intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "A"); @@ -484,17 +487,17 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); - + queue1->deliver(msg1); queue2->deliver(msg1); queue1->deliver(msg2); - + received = queue1->get().payload; BOOST_CHECK_EQUAL(msg2.get(), received.get()); received = queue2->get().payload; BOOST_CHECK_EQUAL(msg1.get(), received.get()); - + } QPID_AUTO_TEST_CASE(testLVQRecover){ @@ -518,7 +521,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); intrusive_ptr<Message> received; queue1->configure(args); - + intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "A"); // 2 @@ -544,7 +547,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ BOOST_CHECK_EQUAL(testStore.deqCnt, 1u); } -void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) +void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) { for (uint i = 0; i < count; i++) { intrusive_ptr<Message> m = create_message("exchange", "key"); @@ -592,7 +595,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ queue1->configure(args); Queue::shared_ptr queue2(new Queue("queue2", true, &testStore )); queue2->configure(args); - + intrusive_ptr<Message> msg1 = create_message("e", "A"); queue1->deliver(msg1); @@ -623,7 +626,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ // check no failure messages are stored queue1->clearLastNodeFailure(); queue2->clearLastNodeFailure(); - + intrusive_ptr<Message> msg3 = create_message("e", "B"); queue1->deliver(msg3); queue2->deliver(msg3); @@ -631,7 +634,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ queue1->setLastNodeFailure(); queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 6u); - + // check requeue 1 intrusive_ptr<Message> msg4 = create_message("e", "C"); intrusive_ptr<Message> msg5 = create_message("e", "D"); @@ -639,17 +642,17 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ framing::SequenceNumber sequence(1); QueuedMessage qmsg1(queue1.get(), msg4, sequence); QueuedMessage qmsg2(queue2.get(), msg5, ++sequence); - + queue1->requeue(qmsg1); BOOST_CHECK_EQUAL(testStore.enqCnt, 7u); - + // check requeue 2 queue2->clearLastNodeFailure(); queue2->requeue(qmsg2); BOOST_CHECK_EQUAL(testStore.enqCnt, 7u); queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 8u); - + queue2->clearLastNodeFailure(); queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 8u); @@ -664,8 +667,8 @@ simulate this: 4. stop and recover remaining node 5. add another node 6. kill that new node again -make sure that an attempt to re-enqueue a message does not happen which will -result in the last man standing exiting with an error. +make sure that an attempt to re-enqueue a message does not happen which will +result in the last man standing exiting with an error. we need to make sure that recover is safe, i.e. messages are not requeued to the store. @@ -678,7 +681,7 @@ not requeued to the store. Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); intrusive_ptr<Message> received; queue1->configure(args); - + // check requeue 1 intrusive_ptr<Message> msg1 = create_message("e", "C"); intrusive_ptr<Message> msg2 = create_message("e", "D"); @@ -711,17 +714,29 @@ simulate store excption going into last node standing Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); intrusive_ptr<Message> received; queue1->configure(args); - + // check requeue 1 intrusive_ptr<Message> msg1 = create_message("e", "C"); queue1->deliver(msg1); testStore.createError(); - + ScopedSuppressLogging sl; // Suppress messages for expected errors. queue1->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 0u); -}QPID_AUTO_TEST_SUITE_END() +} + +intrusive_ptr<Message> mkMsg(std::string exchange, std::string routingKey) { + intrusive_ptr<Message> msg(new Message()); + AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); + AMQFrame header((AMQHeaderBody())); + msg->getFrames().append(method); + msg->getFrames().append(header); + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); + return msg; +} +QPID_AUTO_TEST_SUITE_END() +}} // namespace qpid::tests |