summaryrefslogtreecommitdiff
path: root/cpp/tests/QueueTest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/tests/QueueTest.cpp')
-rw-r--r--cpp/tests/QueueTest.cpp179
1 files changed, 179 insertions, 0 deletions
diff --git a/cpp/tests/QueueTest.cpp b/cpp/tests/QueueTest.cpp
new file mode 100644
index 0000000000..9d655781c1
--- /dev/null
+++ b/cpp/tests/QueueTest.cpp
@@ -0,0 +1,179 @@
+ /*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <BrokerQueue.h>
+#include <QueueRegistry.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+
+class TestBinding : public virtual Binding{
+ bool cancelled;
+
+public:
+ TestBinding();
+ virtual void cancel();
+ bool isCancelled();
+};
+
+class TestConsumer : public virtual Consumer{
+public:
+ Message::shared_ptr last;
+
+ virtual bool deliver(Message::shared_ptr& msg);
+};
+
+
+class QueueTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(QueueTest);
+ CPPUNIT_TEST(testConsumers);
+ CPPUNIT_TEST(testBinding);
+ CPPUNIT_TEST(testRegistry);
+ CPPUNIT_TEST(testDequeue);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+ void testConsumers(){
+ Queue::shared_ptr queue(new Queue("my_queue", true));
+
+ //Test adding consumers:
+ TestConsumer c1;
+ TestConsumer c2;
+ queue->consume(&c1);
+ queue->consume(&c2);
+
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount());
+
+ //Test basic delivery:
+ Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
+ Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
+ Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
+
+ queue->deliver(msg1);
+ CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
+
+ queue->deliver(msg2);
+ CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
+
+ queue->deliver(msg3);
+ CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());
+
+ //Test cancellation:
+ queue->cancel(&c1);
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount());
+ queue->cancel(&c2);
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount());
+ }
+
+ void testBinding(){
+ Queue::shared_ptr queue(new Queue("my_queue", true));
+ //Test bindings:
+ TestBinding a;
+ TestBinding b;
+ queue->bound(&a);
+ queue->bound(&b);
+
+ queue.reset();
+
+ CPPUNIT_ASSERT(a.isCancelled());
+ CPPUNIT_ASSERT(b.isCancelled());
+ }
+
+ void testRegistry(){
+ //Test use of queues in registry:
+ QueueRegistry registry;
+ registry.declare("queue1", true, true);
+ registry.declare("queue2", true, true);
+ registry.declare("queue3", true, true);
+
+ CPPUNIT_ASSERT(registry.find("queue1"));
+ CPPUNIT_ASSERT(registry.find("queue2"));
+ CPPUNIT_ASSERT(registry.find("queue3"));
+
+ registry.destroy("queue1");
+ registry.destroy("queue2");
+ registry.destroy("queue3");
+
+ CPPUNIT_ASSERT(!registry.find("queue1"));
+ CPPUNIT_ASSERT(!registry.find("queue2"));
+ CPPUNIT_ASSERT(!registry.find("queue3"));
+ }
+
+ void testDequeue(){
+ Queue::shared_ptr queue(new Queue("my_queue", true));
+
+ Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
+ Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
+ Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
+ Message::shared_ptr received;
+
+ queue->deliver(msg1);
+ queue->deliver(msg2);
+ queue->deliver(msg3);
+
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(3), queue->getMessageCount());
+
+ received = queue->dequeue();
+ CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get());
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getMessageCount());
+
+ received = queue->dequeue();
+ CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getMessageCount());
+
+ TestConsumer consumer;
+ queue->consume(&consumer);
+ queue->dispatch();
+ CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount());
+
+ received = queue->dequeue();
+ CPPUNIT_ASSERT(!received);
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount());
+
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest);
+
+//TestBinding
+TestBinding::TestBinding() : cancelled(false) {}
+
+void TestBinding::cancel(){
+ CPPUNIT_ASSERT(!cancelled);
+ cancelled = true;
+}
+
+bool TestBinding::isCancelled(){
+ return cancelled;
+}
+
+//TestConsumer
+bool TestConsumer::deliver(Message::shared_ptr& msg){
+ last = msg;
+ return true;
+}
+