/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "unit_test.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/ExpiryPolicy.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/QueueOptions.h" #include #include "boost/format.hpp" using boost::intrusive_ptr; using namespace qpid; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; class TestConsumer : public virtual Consumer{ public: typedef boost::shared_ptr shared_ptr; intrusive_ptr last; bool received; TestConsumer(bool acquire = true):Consumer(acquire), received(false) {}; virtual bool deliver(QueuedMessage& msg){ last = msg.payload; received = true; return true; }; void notify() {} OwnershipToken* getSession() { return 0; } }; class FailOnDeliver : public Deliverable { Message msg; public: void deliverTo(const boost::shared_ptr& queue) { throw Exception(QPID_MSG("Invalid delivery to " << queue->getName())); } Message& getMessage() { return msg; } }; intrusive_ptr create_message(std::string exchange, std::string routingKey) { intrusive_ptr msg(new Message()); AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); AMQFrame header((AMQHeaderBody())); msg->getFrames().append(method); msg->getFrames().append(header); msg->getFrames().getHeaders()->get(true)->setRoutingKey(routingKey); return msg; } QPID_AUTO_TEST_SUITE(QueueTestSuite) QPID_AUTO_TEST_CASE(testAsyncMessage) { Queue::shared_ptr queue(new Queue("my_test_queue", true)); intrusive_ptr received; TestConsumer::shared_ptr c1(new TestConsumer()); queue->consume(c1); //Test basic delivery: intrusive_ptr msg1 = create_message("e", "A"); msg1->enqueueAsync();//this is done on enqueue which is not called from process queue->process(msg1); sleep(2); BOOST_CHECK(!c1->received); msg1->enqueueComplete(); received = queue->get().payload; BOOST_CHECK_EQUAL(msg1.get(), received.get()); } QPID_AUTO_TEST_CASE(testAsyncMessageCount){ Queue::shared_ptr queue(new Queue("my_test_queue", true)); intrusive_ptr msg1 = create_message("e", "A"); msg1->enqueueAsync();//this is done on enqueue which is not called from process queue->process(msg1); sleep(2); uint32_t compval=0; BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); msg1->enqueueComplete(); compval=1; BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); } QPID_AUTO_TEST_CASE(testConsumers){ Queue::shared_ptr queue(new Queue("my_queue", true)); //Test adding consumers: TestConsumer::shared_ptr c1(new TestConsumer()); TestConsumer::shared_ptr c2(new TestConsumer()); queue->consume(c1); queue->consume(c2); BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount()); //Test basic delivery: intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "B"); intrusive_ptr msg3 = create_message("e", "C"); queue->deliver(msg1); BOOST_CHECK(queue->dispatch(c1)); BOOST_CHECK_EQUAL(msg1.get(), c1->last.get()); queue->deliver(msg2); BOOST_CHECK(queue->dispatch(c2)); BOOST_CHECK_EQUAL(msg2.get(), c2->last.get()); c1->received = false; queue->deliver(msg3); BOOST_CHECK(queue->dispatch(c1)); BOOST_CHECK_EQUAL(msg3.get(), c1->last.get()); //Test cancellation: queue->cancel(c1); BOOST_CHECK_EQUAL(uint32_t(1), queue->getConsumerCount()); queue->cancel(c2); BOOST_CHECK_EQUAL(uint32_t(0), queue->getConsumerCount()); } QPID_AUTO_TEST_CASE(testRegistry){ //Test use of queues in registry: QueueRegistry registry; registry.declare("queue1", true, true); registry.declare("queue2", true, true); registry.declare("queue3", true, true); BOOST_CHECK(registry.find("queue1")); BOOST_CHECK(registry.find("queue2")); BOOST_CHECK(registry.find("queue3")); registry.destroy("queue1"); registry.destroy("queue2"); registry.destroy("queue3"); BOOST_CHECK(!registry.find("queue1")); BOOST_CHECK(!registry.find("queue2")); BOOST_CHECK(!registry.find("queue3")); } QPID_AUTO_TEST_CASE(testDequeue){ Queue::shared_ptr queue(new Queue("my_queue", true)); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "B"); intrusive_ptr msg3 = create_message("e", "C"); intrusive_ptr received; queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); received = queue->get().payload; BOOST_CHECK_EQUAL(msg1.get(), received.get()); BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount()); received = queue->get().payload; BOOST_CHECK_EQUAL(msg2.get(), received.get()); BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount()); TestConsumer::shared_ptr consumer(new TestConsumer()); queue->consume(consumer); queue->dispatch(consumer); if (!consumer->received) sleep(2); BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get()); BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); received = queue->get().payload; BOOST_CHECK(!received); BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); } QPID_AUTO_TEST_CASE(testBound) { //test the recording of bindings, and use of those to allow a queue to be unbound string key("my-key"); FieldTable args; Queue::shared_ptr queue(new Queue("my-queue", true)); ExchangeRegistry exchanges; //establish bindings from exchange->queue and notify the queue as it is bound: Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first; exchange1->bind(queue, key, &args); queue->bound(exchange1->getName(), key, args); Exchange::shared_ptr exchange2 = exchanges.declare("my-exchange-2", "fanout").first; exchange2->bind(queue, key, &args); queue->bound(exchange2->getName(), key, args); Exchange::shared_ptr exchange3 = exchanges.declare("my-exchange-3", "topic").first; exchange3->bind(queue, key, &args); queue->bound(exchange3->getName(), key, args); //delete one of the exchanges: exchanges.destroy(exchange2->getName()); exchange2.reset(); //unbind the queue from all exchanges it knows it has been bound to: queue->unbind(exchanges, queue); //ensure the remaining exchanges don't still have the queue bound to them: FailOnDeliver deliverable; exchange1->route(deliverable, key, &args); exchange3->route(deliverable, key, &args); } QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ client::QueueOptions args; args.setPersistLastNode(); Queue::shared_ptr queue(new Queue("my-queue", true)); queue->configure(args); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "B"); intrusive_ptr msg3 = create_message("e", "C"); //enqueue 2 messages queue->deliver(msg1); queue->deliver(msg2); //change mode queue->setLastNodeFailure(); //enqueue 1 message queue->deliver(msg3); //check all have persistent ids. BOOST_CHECK(msg1->isPersistent()); BOOST_CHECK(msg2->isPersistent()); BOOST_CHECK(msg3->isPersistent()); } class TestMessageStoreOC : public NullMessageStore { public: virtual void dequeue(TransactionContext*, const boost::intrusive_ptr& /*msg*/, const PersistableQueue& /*queue*/) { } virtual void enqueue(TransactionContext*, const boost::intrusive_ptr& /*msg*/, const PersistableQueue& /* queue */) { } TestMessageStoreOC() : NullMessageStore() {} ~TestMessageStoreOC(){} }; QPID_AUTO_TEST_CASE(testLVQOrdering){ client::QueueOptions args; // set queue mode args.setOrdering(client::LVQ); Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "B"); intrusive_ptr msg3 = create_message("e", "C"); intrusive_ptr msg4 = create_message("e", "D"); intrusive_ptr received; //set deliever match for LVQ a,b,c,a string key; args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); msg1->getProperties()->getApplicationHeaders().setString(key,"a"); msg2->getProperties()->getApplicationHeaders().setString(key,"b"); msg3->getProperties()->getApplicationHeaders().setString(key,"c"); msg4->getProperties()->getApplicationHeaders().setString(key,"a"); //enqueue 4 message queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); queue->deliver(msg4); BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); received = queue->get().payload; BOOST_CHECK_EQUAL(msg4.get(), received.get()); received = queue->get().payload; BOOST_CHECK_EQUAL(msg2.get(), received.get()); received = queue->get().payload; BOOST_CHECK_EQUAL(msg3.get(), received.get()); intrusive_ptr msg5 = create_message("e", "A"); intrusive_ptr msg6 = create_message("e", "B"); intrusive_ptr msg7 = create_message("e", "C"); msg5->getProperties()->getApplicationHeaders().setString(key,"a"); msg6->getProperties()->getApplicationHeaders().setString(key,"b"); msg7->getProperties()->getApplicationHeaders().setString(key,"c"); queue->deliver(msg5); queue->deliver(msg6); queue->deliver(msg7); BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); received = queue->get().payload; BOOST_CHECK_EQUAL(msg5.get(), received.get()); received = queue->get().payload; BOOST_CHECK_EQUAL(msg6.get(), received.get()); received = queue->get().payload; BOOST_CHECK_EQUAL(msg7.get(), received.get()); } QPID_AUTO_TEST_CASE(testLVQEmptyKey){ client::QueueOptions args; // set queue mode args.setOrdering(client::LVQ); Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "B"); string key; args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); msg1->getProperties()->getApplicationHeaders().setString(key,"a"); queue->deliver(msg1); queue->deliver(msg2); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); } QPID_AUTO_TEST_CASE(testLVQAcquire){ client::QueueOptions args; // set queue mode args.setOrdering(client::LVQ); Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "B"); intrusive_ptr msg3 = create_message("e", "C"); intrusive_ptr msg4 = create_message("e", "D"); intrusive_ptr msg5 = create_message("e", "F"); intrusive_ptr msg6 = create_message("e", "G"); //set deliever match for LVQ a,b,c,a string key; args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); msg1->getProperties()->getApplicationHeaders().setString(key,"a"); msg2->getProperties()->getApplicationHeaders().setString(key,"b"); msg3->getProperties()->getApplicationHeaders().setString(key,"c"); msg4->getProperties()->getApplicationHeaders().setString(key,"a"); msg5->getProperties()->getApplicationHeaders().setString(key,"b"); msg6->getProperties()->getApplicationHeaders().setString(key,"c"); //enqueue 4 message queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); queue->deliver(msg4); BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); framing::SequenceNumber sequence(1); QueuedMessage qmsg(queue.get(), msg1, sequence); QueuedMessage qmsg2(queue.get(), msg2, ++sequence); BOOST_CHECK(!queue->acquire(qmsg)); BOOST_CHECK(queue->acquire(qmsg2)); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); queue->deliver(msg5); BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); // set mode to no browse and check args.setOrdering(client::LVQ_NO_BROWSE); queue->configure(args); TestConsumer::shared_ptr c1(new TestConsumer(false)); queue->dispatch(c1); queue->dispatch(c1); queue->dispatch(c1); queue->deliver(msg6); BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); intrusive_ptr received; received = queue->get().payload; BOOST_CHECK_EQUAL(msg4.get(), received.get()); } QPID_AUTO_TEST_CASE(testLVQMultiQueue){ client::QueueOptions args; // set queue mode args.setOrdering(client::LVQ); Queue::shared_ptr queue1(new Queue("my-queue", true )); Queue::shared_ptr queue2(new Queue("my-queue", true )); intrusive_ptr received; queue1->configure(args); queue2->configure(args); intrusive_ptr msg1 = create_message("e", "A"); intrusive_ptr msg2 = create_message("e", "A"); string key; args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); msg1->getProperties()->getApplicationHeaders().setString(key,"a"); msg2->getProperties()->getApplicationHeaders().setString(key,"a"); queue1->deliver(msg1); queue2->deliver(msg1); queue1->deliver(msg2); received = queue1->get().payload; BOOST_CHECK_EQUAL(msg2.get(), received.get()); received = queue2->get().payload; BOOST_CHECK_EQUAL(msg1.get(), received.get()); } void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) { for (uint i = 0; i < count; i++) { intrusive_ptr m = create_message("exchange", "key"); if (i % 2) { if (oddTtl) m->getProperties()->setTtl(oddTtl); } else { if (evenTtl) m->getProperties()->setTtl(evenTtl); } m->setTimestamp(new broker::ExpiryPolicy); queue.deliver(m); } } QPID_AUTO_TEST_CASE(testPurgeExpired) { Queue queue("my-queue"); addMessagesToQueue(10, queue); BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u); ::usleep(300*1000); queue.purgeExpired(); BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u); } QPID_AUTO_TEST_CASE(testQueueCleaner) { Timer timer; QueueRegistry queues; Queue::shared_ptr queue = queues.declare("my-queue").first; addMessagesToQueue(10, *queue, 200, 400); BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); QueueCleaner cleaner(queues, timer); cleaner.start(100 * qpid::sys::TIME_MSEC); ::usleep(300*1000); BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u); ::usleep(300*1000); BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); } QPID_AUTO_TEST_SUITE_END()