summaryrefslogtreecommitdiff
path: root/cpp/broker/test/ChannelTest.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-10-10 10:06:36 +0000
committerGordon Sim <gsim@apache.org>2006-10-10 10:06:36 +0000
commit3a0eda85d85b3185d3fec6dec6700c6ca1fe3818 (patch)
treed4671a2fbb8ad69a0cc734134b43b28152c3e5b4 /cpp/broker/test/ChannelTest.cpp
parent14654e5360b72adf1704838b3820c7d1fc860e8e (diff)
downloadqpid-python-3a0eda85d85b3185d3fec6dec6700c6ca1fe3818.tar.gz
Implementation and tests for basic_qos (i.e. prefetching)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@454677 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/test/ChannelTest.cpp')
-rw-r--r--cpp/broker/test/ChannelTest.cpp120
1 files changed, 102 insertions, 18 deletions
diff --git a/cpp/broker/test/ChannelTest.cpp b/cpp/broker/test/ChannelTest.cpp
index 73a1f97b46..c96d17379e 100644
--- a/cpp/broker/test/ChannelTest.cpp
+++ b/cpp/broker/test/ChannelTest.cpp
@@ -24,23 +24,24 @@
#include <iostream>
#include <memory>
+using namespace std::tr1;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::concurrent;
-struct MessageHolder{
+struct DummyRouter{
Message::shared_ptr last;
-};
-
-class DummyRouter{
- MessageHolder& holder;
-public:
- DummyRouter(MessageHolder& _holder) : holder(_holder){
+ void operator()(Message::shared_ptr& msg){
+ last = msg;
}
+};
- void operator()(Message::shared_ptr& msg){
- holder.last = msg;
+struct DummyHandler : OutputHandler{
+ std::vector<AMQFrame*> frames;
+
+ virtual void send(AMQFrame* frame){
+ frames.push_back(frame);
}
};
@@ -49,6 +50,8 @@ class ChannelTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(ChannelTest);
CPPUNIT_TEST(testIncoming);
+ CPPUNIT_TEST(testConsumerMgmt);
+ CPPUNIT_TEST(testDeliveryNoAck);
CPPUNIT_TEST_SUITE_END();
public:
@@ -64,18 +67,99 @@ class ChannelTest : public CppUnit::TestCase
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
- MessageHolder holder;
- channel.handleHeader(header, DummyRouter(holder));
- CPPUNIT_ASSERT(!holder.last);
- channel.handleContent(part1, DummyRouter(holder));
- CPPUNIT_ASSERT(!holder.last);
- channel.handleContent(part2, DummyRouter(holder));
- CPPUNIT_ASSERT(holder.last);
- CPPUNIT_ASSERT_EQUAL(routingKey, holder.last->getRoutingKey());
+ CPPUNIT_ASSERT(!channel.handleHeader(header, DummyRouter()).last);
+ CPPUNIT_ASSERT(!channel.handleContent(part1, DummyRouter()).last);
+ DummyRouter router = channel.handleContent(part2, DummyRouter());
+ CPPUNIT_ASSERT(router.last);
+ CPPUNIT_ASSERT_EQUAL(routingKey, router.last->getRoutingKey());
+ }
+
+ void testConsumerMgmt(){
+ Queue::shared_ptr queue(new Queue("my_queue"));
+ Channel channel(0, 0, 0);
+ CPPUNIT_ASSERT(!channel.exists("my_consumer"));
+
+ ConnectionToken* owner;
+ string tag("my_consumer");
+ channel.consume(tag, queue, false, false, owner);
+ string tagA;
+ string tagB;
+ channel.consume(tagA, queue, false, false, owner);
+ channel.consume(tagB, queue, false, false, owner);
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 3, queue->getConsumerCount());
+ CPPUNIT_ASSERT(channel.exists("my_consumer"));
+ CPPUNIT_ASSERT(channel.exists(tagA));
+ CPPUNIT_ASSERT(channel.exists(tagB));
+ channel.cancel(tagA);
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 2, queue->getConsumerCount());
+ CPPUNIT_ASSERT(channel.exists("my_consumer"));
+ CPPUNIT_ASSERT(!channel.exists(tagA));
+ CPPUNIT_ASSERT(channel.exists(tagB));
+ channel.close();
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, queue->getConsumerCount());
+ }
+
+ void testDeliveryNoAck(){
+ DummyHandler handler;
+ Channel channel(&handler, 7, 10000);
+
+ Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false));
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ header->setContentSize(14);
+ msg->setHeader(header);
+ AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn"));
+ msg->addContent(body);
+
+ Queue::shared_ptr queue(new Queue("my_queue"));
+ ConnectionToken* owner;
+ string tag("no_ack");
+ channel.consume(tag, queue, false, false, owner);
+
+ queue->deliver(msg);
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
+ BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody()));
+ AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody()));
+ AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
+ CPPUNIT_ASSERT(deliver);
+ CPPUNIT_ASSERT(contentHeader);
+ CPPUNIT_ASSERT(contentBody);
+ CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData());
+ }
+
+ void testDeliveryAndRecovery(){
+ DummyHandler handler;
+ Channel channel(&handler, 7, 10000);
+
+ Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false));
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ header->setContentSize(14);
+ msg->setHeader(header);
+ AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn"));
+ msg->addContent(body);
+
+ Queue::shared_ptr queue(new Queue("my_queue"));
+ ConnectionToken* owner;
+ string tag("ack");
+ channel.consume(tag, queue, true, false, owner);
+
+ queue->deliver(msg);
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
+ BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody()));
+ AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody()));
+ AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
+ CPPUNIT_ASSERT(deliver);
+ CPPUNIT_ASSERT(contentHeader);
+ CPPUNIT_ASSERT(contentBody);
+ CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData());
}
};
// Make this test suite a plugin.
CPPUNIT_PLUGIN_IMPLEMENT();
CPPUNIT_TEST_SUITE_REGISTRATION(ChannelTest);
-