summaryrefslogtreecommitdiff
path: root/cpp/src/tests/BrokerChannelTest.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-06-06 16:39:03 +0000
committerGordon Sim <gsim@apache.org>2007-06-06 16:39:03 +0000
commit70a3cdf33b3e38ee26ee2840a55f83ebd26589b4 (patch)
tree07c3dab5cb7d97158737c36efa1caa8d9254c266 /cpp/src/tests/BrokerChannelTest.cpp
parent480e99cfc6071f15bc7135895cf2b60d0dd9c981 (diff)
downloadqpid-python-70a3cdf33b3e38ee26ee2840a55f83ebd26589b4.tar.gz
Merged in channel.flow implementation and interoperability tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@544879 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/BrokerChannelTest.cpp')
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp37
1 files changed, 37 insertions, 0 deletions
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index 3b2119be13..19841dc18d 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -57,6 +57,7 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_TEST(testDeliveryAndRecovery);
CPPUNIT_TEST(testStaging);
CPPUNIT_TEST(testQueuePolicy);
+ CPPUNIT_TEST(testFlow);
CPPUNIT_TEST_SUITE_END();
Broker::shared_ptr broker;
@@ -333,6 +334,42 @@ class BrokerChannelTest : public CppUnit::TestCase
store.check();
}
+ void testFlow(){
+ Channel channel(connection, 7, 10000);
+ channel.open();
+ //there will always be a connection-start frame
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
+ CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
+ handler.frames[0].getBody().get()));
+
+ const string data("abcdefghijklmn");
+
+ Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+ addContent(msg, data);
+ Queue::shared_ptr queue(new Queue("my_queue"));
+ ConnectionToken* owner(0);
+ string tag("no_ack");
+ channel.consume(tag, queue, false, false, owner);
+ channel.flow(false);
+ queue->deliver(msg);
+ //ensure no more frames have been delivered
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount());
+ channel.flow(true);
+ CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1].getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2].getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[3].getChannel());
+ BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[1].getBody()));
+ AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[2].getBody()));
+ AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[3].getBody()));
+ CPPUNIT_ASSERT(deliver);
+ CPPUNIT_ASSERT(contentHeader);
+ CPPUNIT_ASSERT(contentBody);
+ CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
+ }
+
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
{
BasicMessage* msg = new BasicMessage(