summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/QueueTest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/QueueTest.cpp')
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp257
1 files changed, 0 insertions, 257 deletions
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
deleted file mode 100644
index fae0d88b7c..0000000000
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ /dev/null
@@ -1,257 +0,0 @@
- /*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/Exception.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/Deliverable.h"
-#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/broker/QueueRegistry.h"
-#include "qpid_test_plugin.h"
-#include <iostream>
-#include "boost/format.hpp"
-
-using namespace qpid;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-
-class TestConsumer : public virtual Consumer{
-public:
- typedef shared_ptr<TestConsumer> shared_ptr;
-
- intrusive_ptr<Message> last;
- bool received;
- TestConsumer(): received(false) {};
-
- virtual bool deliver(QueuedMessage& msg){
- last = msg.payload;
- received = true;
- return true;
- };
- void notify() {}
-};
-
-class FailOnDeliver : public Deliverable
-{
-public:
- void deliverTo(Queue::shared_ptr& queue)
- {
- throw Exception(QPID_MSG("Invalid delivery to " << queue->getName()));
- }
-};
-
-class QueueTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(QueueTest);
- CPPUNIT_TEST(testConsumers);
- CPPUNIT_TEST(testRegistry);
- CPPUNIT_TEST(testDequeue);
- CPPUNIT_TEST(testBound);
- CPPUNIT_TEST(testAsyncMessage);
- CPPUNIT_TEST(testAsyncMessageCount);
- CPPUNIT_TEST_SUITE_END();
-
-
- public:
- intrusive_ptr<Message> message(std::string exchange, std::string routingKey) {
- intrusive_ptr<Message> msg(new Message());
- AMQFrame method(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, exchange, 0, 0));
- AMQFrame header(in_place<AMQHeaderBody>());
- msg->getFrames().append(method);
- msg->getFrames().append(header);
- msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
- return msg;
- }
-
-
- void testAsyncMessage(){
-
- Queue::shared_ptr queue(new Queue("my_test_queue", true));
- intrusive_ptr<Message> received;
-
- TestConsumer c1;
- queue->consume(c1);
-
-
- //Test basic delivery:
- intrusive_ptr<Message> msg1 = message("e", "A");
- msg1->enqueueAsync();//this is done on enqueue which is not called from process
- queue->process(msg1);
- sleep(2);
-
- CPPUNIT_ASSERT(!c1.received);
- msg1->enqueueComplete();
-
- received = queue->dequeue().payload;
- CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get());
-
-
- }
-
-
- void testAsyncMessageCount(){
- Queue::shared_ptr queue(new Queue("my_test_queue", true));
- intrusive_ptr<Message> msg1 = message("e", "A");
- msg1->enqueueAsync();//this is done on enqueue which is not called from process
-
- queue->process(msg1);
- sleep(2);
- uint32_t compval=0;
- CPPUNIT_ASSERT_EQUAL(compval, queue->getMessageCount());
- msg1->enqueueComplete();
- compval=1;
- CPPUNIT_ASSERT_EQUAL(compval, queue->getMessageCount());
-
- }
-
- void testConsumers(){
- Queue::shared_ptr queue(new Queue("my_queue", true));
-
- //Test adding consumers:
- TestConsumer c1;
- TestConsumer c2;
- queue->consume(c1);
- queue->consume(c2);
-
- CPPUNIT_ASSERT_EQUAL(uint32_t(2), queue->getConsumerCount());
-
- //Test basic delivery:
- intrusive_ptr<Message> msg1 = message("e", "A");
- intrusive_ptr<Message> msg2 = message("e", "B");
- intrusive_ptr<Message> msg3 = message("e", "C");
-
- queue->deliver(msg1);
- CPPUNIT_ASSERT(queue->dispatch(c1));
- CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
-
- queue->deliver(msg2);
- CPPUNIT_ASSERT(queue->dispatch(c2));
- CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
-
- c1.received = false;
- queue->deliver(msg3);
- CPPUNIT_ASSERT(queue->dispatch(c1));
- CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());
-
- //Test cancellation:
- queue->cancel(c1);
- CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getConsumerCount());
- queue->cancel(c2);
- CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getConsumerCount());
- }
-
- void testRegistry(){
- //Test use of queues in registry:
- QueueRegistry registry;
- registry.declare("queue1", true, true);
- registry.declare("queue2", true, true);
- registry.declare("queue3", true, true);
-
- CPPUNIT_ASSERT(registry.find("queue1"));
- CPPUNIT_ASSERT(registry.find("queue2"));
- CPPUNIT_ASSERT(registry.find("queue3"));
-
- registry.destroy("queue1");
- registry.destroy("queue2");
- registry.destroy("queue3");
-
- CPPUNIT_ASSERT(!registry.find("queue1"));
- CPPUNIT_ASSERT(!registry.find("queue2"));
- CPPUNIT_ASSERT(!registry.find("queue3"));
- }
-
- void testDequeue(){
- Queue::shared_ptr queue(new Queue("my_queue", true));
- intrusive_ptr<Message> msg1 = message("e", "A");
- intrusive_ptr<Message> msg2 = message("e", "B");
- intrusive_ptr<Message> msg3 = message("e", "C");
- intrusive_ptr<Message> received;
-
- queue->deliver(msg1);
- queue->deliver(msg2);
- queue->deliver(msg3);
-
- CPPUNIT_ASSERT_EQUAL(uint32_t(3), queue->getMessageCount());
-
- received = queue->dequeue().payload;
- CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get());
- CPPUNIT_ASSERT_EQUAL(uint32_t(2), queue->getMessageCount());
-
- received = queue->dequeue().payload;
- CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
- CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getMessageCount());
-
- TestConsumer consumer;
- queue->consume(consumer);
- queue->dispatch(consumer);
- if (!consumer.received)
- sleep(2);
-
- CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
- CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount());
-
- received = queue->dequeue().payload;
- CPPUNIT_ASSERT(!received);
- CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount());
-
- }
-
- void testBound()
- {
- //test the recording of bindings, and use of those to allow a queue to be unbound
- string key("my-key");
- FieldTable args;
-
- Queue::shared_ptr queue(new Queue("my-queue", true));
- ExchangeRegistry exchanges;
- //establish bindings from exchange->queue and notify the queue as it is bound:
- Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first;
- exchange1->bind(queue, key, &args);
- queue->bound(exchange1->getName(), key, args);
-
- Exchange::shared_ptr exchange2 = exchanges.declare("my-exchange-2", "fanout").first;
- exchange2->bind(queue, key, &args);
- queue->bound(exchange2->getName(), key, args);
-
- Exchange::shared_ptr exchange3 = exchanges.declare("my-exchange-3", "topic").first;
- exchange3->bind(queue, key, &args);
- queue->bound(exchange3->getName(), key, args);
-
- //delete one of the exchanges:
- exchanges.destroy(exchange2->getName());
- exchange2.reset();
-
- //unbind the queue from all exchanges it knows it has been bound to:
- queue->unbind(exchanges, queue);
-
- //ensure the remaining exchanges don't still have the queue bound to them:
- FailOnDeliver deliverable;
- exchange1->route(deliverable, key, &args);
- exchange3->route(deliverable, key, &args);
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest);
-
-