diff options
author | Gordon Sim <gsim@apache.org> | 2006-10-10 10:06:36 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-10-10 10:06:36 +0000 |
commit | 3a0eda85d85b3185d3fec6dec6700c6ca1fe3818 (patch) | |
tree | d4671a2fbb8ad69a0cc734134b43b28152c3e5b4 /cpp/broker/test/ChannelTest.cpp | |
parent | 14654e5360b72adf1704838b3820c7d1fc860e8e (diff) | |
download | qpid-python-3a0eda85d85b3185d3fec6dec6700c6ca1fe3818.tar.gz |
Implementation and tests for basic_qos (i.e. prefetching)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@454677 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/test/ChannelTest.cpp')
-rw-r--r-- | cpp/broker/test/ChannelTest.cpp | 120 |
1 files changed, 102 insertions, 18 deletions
diff --git a/cpp/broker/test/ChannelTest.cpp b/cpp/broker/test/ChannelTest.cpp index 73a1f97b46..c96d17379e 100644 --- a/cpp/broker/test/ChannelTest.cpp +++ b/cpp/broker/test/ChannelTest.cpp @@ -24,23 +24,24 @@ #include <iostream> #include <memory> +using namespace std::tr1; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::concurrent; -struct MessageHolder{ +struct DummyRouter{ Message::shared_ptr last; -}; - -class DummyRouter{ - MessageHolder& holder; -public: - DummyRouter(MessageHolder& _holder) : holder(_holder){ + void operator()(Message::shared_ptr& msg){ + last = msg; } +}; - void operator()(Message::shared_ptr& msg){ - holder.last = msg; +struct DummyHandler : OutputHandler{ + std::vector<AMQFrame*> frames; + + virtual void send(AMQFrame* frame){ + frames.push_back(frame); } }; @@ -49,6 +50,8 @@ class ChannelTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(ChannelTest); CPPUNIT_TEST(testIncoming); + CPPUNIT_TEST(testConsumerMgmt); + CPPUNIT_TEST(testDeliveryNoAck); CPPUNIT_TEST_SUITE_END(); public: @@ -64,18 +67,99 @@ class ChannelTest : public CppUnit::TestCase AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); - MessageHolder holder; - channel.handleHeader(header, DummyRouter(holder)); - CPPUNIT_ASSERT(!holder.last); - channel.handleContent(part1, DummyRouter(holder)); - CPPUNIT_ASSERT(!holder.last); - channel.handleContent(part2, DummyRouter(holder)); - CPPUNIT_ASSERT(holder.last); - CPPUNIT_ASSERT_EQUAL(routingKey, holder.last->getRoutingKey()); + CPPUNIT_ASSERT(!channel.handleHeader(header, DummyRouter()).last); + CPPUNIT_ASSERT(!channel.handleContent(part1, DummyRouter()).last); + DummyRouter router = channel.handleContent(part2, DummyRouter()); + CPPUNIT_ASSERT(router.last); + CPPUNIT_ASSERT_EQUAL(routingKey, router.last->getRoutingKey()); + } + + void testConsumerMgmt(){ + Queue::shared_ptr queue(new Queue("my_queue")); + Channel channel(0, 0, 0); + CPPUNIT_ASSERT(!channel.exists("my_consumer")); + + ConnectionToken* owner; + string tag("my_consumer"); + channel.consume(tag, queue, false, false, owner); + string tagA; + string tagB; + channel.consume(tagA, queue, false, false, owner); + channel.consume(tagB, queue, false, false, owner); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 3, queue->getConsumerCount()); + CPPUNIT_ASSERT(channel.exists("my_consumer")); + CPPUNIT_ASSERT(channel.exists(tagA)); + CPPUNIT_ASSERT(channel.exists(tagB)); + channel.cancel(tagA); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 2, queue->getConsumerCount()); + CPPUNIT_ASSERT(channel.exists("my_consumer")); + CPPUNIT_ASSERT(!channel.exists(tagA)); + CPPUNIT_ASSERT(channel.exists(tagB)); + channel.close(); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, queue->getConsumerCount()); + } + + void testDeliveryNoAck(){ + DummyHandler handler; + Channel channel(&handler, 7, 10000); + + Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(14); + msg->setHeader(header); + AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn")); + msg->addContent(body); + + Queue::shared_ptr queue(new Queue("my_queue")); + ConnectionToken* owner; + string tag("no_ack"); + channel.consume(tag, queue, false, false, owner); + + queue->deliver(msg); + CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); + BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); + AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); + AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); + CPPUNIT_ASSERT(deliver); + CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData()); + } + + void testDeliveryAndRecovery(){ + DummyHandler handler; + Channel channel(&handler, 7, 10000); + + Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(14); + msg->setHeader(header); + AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn")); + msg->addContent(body); + + Queue::shared_ptr queue(new Queue("my_queue")); + ConnectionToken* owner; + string tag("ack"); + channel.consume(tag, queue, true, false, owner); + + queue->deliver(msg); + CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); + BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); + AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); + AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); + CPPUNIT_ASSERT(deliver); + CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData()); } }; // Make this test suite a plugin. CPPUNIT_PLUGIN_IMPLEMENT(); CPPUNIT_TEST_SUITE_REGISTRATION(ChannelTest); - |