/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "MessageUtils.h" #include "unit_test.h" #include "test_tools.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" //#include "qpid/broker/NullMessageStore.h" #include "qpid/broker/ExpiryPolicy.h" #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/QueueOptions.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/QueueSettings.h" #include #include #include #include using namespace std; using boost::intrusive_ptr; using namespace qpid; using namespace qpid::broker; using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; namespace qpid { namespace tests { class TestConsumer : public virtual Consumer{ public: typedef boost::shared_ptr shared_ptr; QueueCursor lastCursor; Message lastMessage; bool received; TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER), received(false) {}; virtual bool deliver(const QueueCursor& cursor, const Message& message){ lastCursor = cursor; lastMessage = message; received = true; return true; }; void notify() {} void cancel() {} void acknowledged(const DeliveryRecord&) {} OwnershipToken* getSession() { return 0; } }; class FailOnDeliver : public Deliverable { Message msg; public: FailOnDeliver() : msg(MessageUtils::createMessage()) {} void deliverTo(const boost::shared_ptr& queue) { throw Exception(QPID_MSG("Invalid delivery to " << queue->getName())); } Message& getMessage() { return msg; } }; QPID_AUTO_TEST_SUITE(QueueTestSuite) QPID_AUTO_TEST_CASE(testBound){ //test the recording of bindings, and use of those to allow a queue to be unbound string key("my-key"); FieldTable args; Queue::shared_ptr queue(new Queue("my-queue")); ExchangeRegistry exchanges; //establish bindings from exchange->queue and notify the queue as it is bound: Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first; exchange1->bind(queue, key, &args, 0); queue->bound(exchange1->getName(), key, args); Exchange::shared_ptr exchange2 = exchanges.declare("my-exchange-2", "fanout").first; exchange2->bind(queue, key, &args, 0); queue->bound(exchange2->getName(), key, args); Exchange::shared_ptr exchange3 = exchanges.declare("my-exchange-3", "topic").first; exchange3->bind(queue, key, &args, 0); queue->bound(exchange3->getName(), key, args); //delete one of the exchanges: exchanges.destroy(exchange2->getName()); exchange2.reset(); //unbind the queue from all exchanges it knows it has been bound to: queue->unbind(exchanges); //ensure the remaining exchanges don't still have the queue bound to them: FailOnDeliver deliverable; exchange1->route(deliverable); exchange3->route(deliverable); } QPID_AUTO_TEST_CASE(testLVQ){ QueueSettings settings; string key="key"; settings.lvqKey = key; QueueFactory factory; Queue::shared_ptr q(factory.create("my-queue", settings)); const char* values[] = { "a", "b", "c", "a"}; for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) { qpid::types::Variant::Map properties; properties[key] = values[i]; q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast(i+1))); } BOOST_CHECK_EQUAL(q->getMessageCount(), 3u); TestConsumer::shared_ptr c(new TestConsumer("test", true)); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(std::string("2"), c->lastMessage.getContent()); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(std::string("3"), c->lastMessage.getContent()); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(std::string("4"), c->lastMessage.getContent()); const char* values2[] = { "a", "b", "c"}; for (size_t i = 0; i < sizeof(values2)/sizeof(values2[0]); ++i) { qpid::types::Variant::Map properties; properties[key] = values[i]; q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast(i+5))); } BOOST_CHECK_EQUAL(q->getMessageCount(), 3u); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(std::string("5"), c->lastMessage.getContent()); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(std::string("6"), c->lastMessage.getContent()); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(std::string("7"), c->lastMessage.getContent()); } QPID_AUTO_TEST_CASE(testLVQEmptyKey){ QueueSettings settings; string key="key"; settings.lvqKey = key; QueueFactory factory; Queue::shared_ptr q(factory.create("my-queue", settings)); qpid::types::Variant::Map properties; properties["key"] = "a"; q->deliver(MessageUtils::createMessage(properties, "one")); properties.clear(); q->deliver(MessageUtils::createMessage(properties, "two")); BOOST_CHECK_EQUAL(q->getMessageCount(), 2u); } void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) { for (uint i = 0; i < count; i++) { Message m = MessageUtils::createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl); m.computeExpiration(new broker::ExpiryPolicy); queue.deliver(m); } } QPID_AUTO_TEST_CASE(testPurgeExpired) { Queue queue("my-queue"); addMessagesToQueue(10, queue); BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u); ::usleep(300*1000); queue.purgeExpired(0); BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u); } QPID_AUTO_TEST_CASE(testQueueCleaner) { Timer timer; QueueRegistry queues; Queue::shared_ptr queue = queues.declare("my-queue", QueueSettings()).first; addMessagesToQueue(10, *queue, 200, 400); BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); QueueCleaner cleaner(queues, &timer); cleaner.start(100 * qpid::sys::TIME_MSEC); ::usleep(300*1000); BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u); ::usleep(300*1000); BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); } namespace { int getIntProperty(const Message& message, const std::string& key) { qpid::types::Variant v = message.getProperty(key); int i(0); if (!v.isVoid()) i = v; return i; } // helper for group tests void verifyAcquire( Queue::shared_ptr queue, TestConsumer::shared_ptr c, std::deque& results, const std::string& expectedGroup, const int expectedId ) { bool success = queue->dispatch(c); BOOST_CHECK(success); if (success) { results.push_back(c->lastCursor); std::string group = c->lastMessage.getPropertyAsString("GROUP-ID"); int id = getIntProperty(c->lastMessage, "MY-ID"); BOOST_CHECK_EQUAL( group, expectedGroup ); BOOST_CHECK_EQUAL( id, expectedId ); } } Message createGroupMessage(int id, const std::string& group) { qpid::types::Variant::Map properties; properties["GROUP-ID"] = group; properties["MY-ID"] = id; return MessageUtils::createMessage(properties); } } QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { // // Verify that consumers of grouped messages own the groups once a message is acquired, // and release the groups once all acquired messages have been dequeued or requeued // QueueSettings settings; settings.shareGroups = 1; settings.groupKey = "GROUP-ID"; QueueFactory factory; Queue::shared_ptr queue(factory.create("my_queue", settings)); std::string groups[] = { std::string("a"), std::string("a"), std::string("a"), std::string("b"), std::string("b"), std::string("b"), std::string("c"), std::string("c"), std::string("c") }; for (int i = 0; i < 9; ++i) { queue->deliver(createGroupMessage(i, groups[i])); } // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---, BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount()); TestConsumer::shared_ptr c1(new TestConsumer("C1")); TestConsumer::shared_ptr c2(new TestConsumer("C2")); queue->consume(c1); queue->consume(c2); std::deque dequeMeC1; std::deque dequeMeC2; verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0) verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3) // now let c1 complete the 'a-0' message - this should free the 'a' group queue->dequeue( 0, dequeMeC1.front() ); dequeMeC1.pop_front(); // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, --- // now c2 should pick up the next 'a-1', since it is oldest free verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b" // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, --- // c1 should only be able to snarf up the first "c" message now... verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c" // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1 // hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired) queue->dequeue( 0, dequeMeC2.front() ); dequeMeC2.pop_front(); // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1 // b group is free, c is owned by c1 - c1's next get should grab 'b-4' verifyAcquire(queue, c1, dequeMeC1, "b", 4 ); // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1 // c2 can now only grab a-2, and that's all verifyAcquire(queue, c2, dequeMeC2, "a", 2 ); // now C2 can't get any more, since C1 owns "b" and "c" group... bool gotOne = queue->dispatch(c2); BOOST_CHECK( !gotOne ); // hmmm... what if c1 now dequeues "c-6"? (now only own's b-4) queue->dequeue( 0, dequeMeC1.front() ); dequeMeC1.pop_front(); // Queue = a-1, a-2, b-4, b-5, c-7, c-8... // Owners= ^C2, ^C2, ^C1, ^C1, ---, --- // c2 can now grab c-7 verifyAcquire(queue, c2, dequeMeC2, "c", 7 ); // Queue = a-1, a-2, b-4, b-5, c-7, c-8... // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2 // what happens if C-2 "requeues" a-1 and a-2? queue->release( dequeMeC2.front() ); dequeMeC2.pop_front(); queue->release( dequeMeC2.front() ); dequeMeC2.pop_front(); // now just has c-7 acquired // Queue = a-1, a-2, b-4, b-5, c-7, c-8... // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2 // now c1 will grab a-1 and a-2... verifyAcquire(queue, c1, dequeMeC1, "a", 1 ); verifyAcquire(queue, c1, dequeMeC1, "a", 2 ); // Queue = a-1, a-2, b-4, b-5, c-7, c-8... // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2 // c2 can now acquire c-8 only verifyAcquire(queue, c2, dequeMeC2, "c", 8 ); // and c1 can get b-5 verifyAcquire(queue, c1, dequeMeC1, "b", 5 ); // should be no more acquire-able for anyone now: gotOne = queue->dispatch(c1); BOOST_CHECK( !gotOne ); gotOne = queue->dispatch(c2); BOOST_CHECK( !gotOne ); // release all of C1's acquired messages, then cancel C1 while (!dequeMeC1.empty()) { queue->release(dequeMeC1.front()); dequeMeC1.pop_front(); } queue->cancel(c1); // Queue = a-1, a-2, b-4, b-5, c-7, c-8... // Owners= ---, ---, ---, ---, ^C2, ^C2 // b-4, a-1, a-2, b-5 all should be available, right? verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); while (!dequeMeC2.empty()) { queue->dequeue(0, dequeMeC2.front()); dequeMeC2.pop_front(); } // Queue = a-2, b-4, b-5 // Owners= ---, ---, --- TestConsumer::shared_ptr c3(new TestConsumer("C3")); std::deque dequeMeC3; verifyAcquire(queue, c3, dequeMeC3, "a", 2 ); verifyAcquire(queue, c2, dequeMeC2, "b", 4 ); // Queue = a-2, b-4, b-5 // Owners= ^C3, ^C2, ^C2 gotOne = queue->dispatch(c3); BOOST_CHECK( !gotOne ); verifyAcquire(queue, c2, dequeMeC2, "b", 5 ); while (!dequeMeC2.empty()) { queue->dequeue(0, dequeMeC2.front()); dequeMeC2.pop_front(); } // Queue = a-2, // Owners= ^C3, queue->deliver(createGroupMessage(9, "a")); // Queue = a-2, a-9 // Owners= ^C3, ^C3 gotOne = queue->dispatch(c2); BOOST_CHECK( !gotOne ); queue->deliver(createGroupMessage(10, "b")); // Queue = a-2, a-9, b-10 // Owners= ^C3, ^C3, ---- verifyAcquire(queue, c2, dequeMeC2, "b", 10 ); verifyAcquire(queue, c3, dequeMeC3, "a", 9 ); gotOne = queue->dispatch(c3); BOOST_CHECK( !gotOne ); queue->cancel(c2); queue->cancel(c3); } QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { // // Verify that the same default group name is automatically applied to messages that // do not specify a group name. // QueueSettings settings; settings.shareGroups = 1; settings.groupKey = "GROUP-ID"; QueueFactory factory; Queue::shared_ptr queue(factory.create("my_queue", settings)); for (int i = 0; i < 3; ++i) { qpid::types::Variant::Map properties; // no "GROUP-ID" header properties["MY-ID"] = i; queue->deliver(MessageUtils::createMessage(properties)); } // Queue = 0, 1, 2 BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); TestConsumer::shared_ptr c1(new TestConsumer("C1")); TestConsumer::shared_ptr c2(new TestConsumer("C2")); queue->consume(c1); queue->consume(c2); std::deque dequeMeC1; std::deque dequeMeC2; queue->dispatch(c1); // c1 now owns default group (acquired 0) dequeMeC1.push_back(c1->lastCursor); int id = getIntProperty(c1->lastMessage, "MY-ID"); BOOST_CHECK_EQUAL( id, 0 ); bool gotOne = queue->dispatch(c2); // c2 should get nothing BOOST_CHECK( !gotOne ); queue->dispatch(c1); // c1 now acquires 1 dequeMeC1.push_back(c1->lastCursor); id = getIntProperty(c1->lastMessage, "MY-ID"); BOOST_CHECK_EQUAL( id, 1 ); gotOne = queue->dispatch(c2); // c2 should still get nothing BOOST_CHECK( !gotOne ); while (!dequeMeC1.empty()) { queue->dequeue(0, dequeMeC1.front()); dequeMeC1.pop_front(); } // now default group should be available... queue->dispatch(c2); // c2 now owns default group (acquired 2) id = c2->lastMessage.getProperty("MY-ID"); BOOST_CHECK_EQUAL( id, 2 ); gotOne = queue->dispatch(c1); // c1 should get nothing BOOST_CHECK( !gotOne ); queue->cancel(c1); queue->cancel(c2); } QPID_AUTO_TEST_CASE(testSetPositionFifo) { Queue::shared_ptr q(new Queue("my-queue", true)); BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0)); for (int i = 0; i < 10; ++i) q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), boost::lexical_cast(i+1))); // Verify the front of the queue TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); // Numbered from 1 BOOST_CHECK_EQUAL("1", c->lastMessage.getContent()); // Verify the back of the queue BOOST_CHECK_EQUAL(10u, q->getPosition()); BOOST_CHECK_EQUAL(10u, q->getMessageCount()); // Using setPosition to introduce a gap in sequence numbers. q->setPosition(15); BOOST_CHECK_EQUAL(10u, q->getMessageCount()); BOOST_CHECK_EQUAL(15u, q->getPosition()); q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "16")); q->seek(*c, Queue::MessagePredicate(), 9); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(10u, c->lastMessage.getSequence()); BOOST_CHECK_EQUAL("10", c->lastMessage.getContent()); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(16u, c->lastMessage.getSequence()); BOOST_CHECK_EQUAL("16", c->lastMessage.getContent()); // Using setPosition to trunkcate the queue q->setPosition(5); BOOST_CHECK_EQUAL(5u, q->getMessageCount()); q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "6a")); c = boost::shared_ptr(new TestConsumer("test", false)); // Don't acquire q->seek(*c, Queue::MessagePredicate(), 4); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence()); BOOST_CHECK_EQUAL("5", c->lastMessage.getContent()); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(6u, c->lastMessage.getSequence()); BOOST_CHECK_EQUAL("6a", c->lastMessage.getContent()); BOOST_CHECK(!q->dispatch(c)); // No more messages. } QPID_AUTO_TEST_CASE(testSetPositionLvq) { QueueSettings settings; string key="key"; settings.lvqKey = key; QueueFactory factory; Queue::shared_ptr q(factory.create("my-queue", settings)); const char* values[] = { "a", "b", "c", "a", "b", "c" }; for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) { qpid::types::Variant::Map properties; properties[key] = values[i]; q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast(i+1))); } BOOST_CHECK_EQUAL(3u, q->getMessageCount()); // Verify the front of the queue TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); // Numbered from 1 BOOST_CHECK_EQUAL("4", c->lastMessage.getContent()); // Verify the back of the queue BOOST_CHECK_EQUAL(6u, q->getPosition()); q->setPosition(5); c = boost::shared_ptr(new TestConsumer("test", false)); // Don't acquire q->seek(*c, Queue::MessagePredicate(), 4); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence()); // Numbered from 1 BOOST_CHECK(!q->dispatch(c)); } QPID_AUTO_TEST_CASE(testSetPositionPriority) { QueueSettings settings; settings.priorities = 10; QueueFactory factory; Queue::shared_ptr q(factory.create("my-queue", settings)); const int priorities[] = { 1, 2, 3, 2, 1, 3 }; for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) { qpid::types::Variant::Map properties; properties["priority"] = priorities[i]; q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast(i+1))); } // Truncation removes messages in fifo order, not priority order. q->setPosition(3); TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in priority order BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence()); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence()); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); BOOST_CHECK(!q->dispatch(c)); qpid::types::Variant::Map properties; properties["priority"] = 4; q->deliver(MessageUtils::createMessage(properties, "4a")); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent()); // But consumers see priority order c.reset(new TestConsumer("test", true)); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent()); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence()); BOOST_CHECK_EQUAL("3", c->lastMessage.getContent()); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence()); BOOST_CHECK_EQUAL("2", c->lastMessage.getContent()); BOOST_CHECK(q->dispatch(c)); BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); BOOST_CHECK_EQUAL("1", c->lastMessage.getContent()); } QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests