summaryrefslogtreecommitdiff
path: root/cpp/tests/ChannelTest.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-05-09 17:00:32 +0000
committerGordon Sim <gsim@apache.org>2007-05-09 17:00:32 +0000
commit3a87c67be419a3ae74ea456ae67be5d0f2d2ec92 (patch)
tree82f646b4394a31a6baa669f699a775454afadf36 /cpp/tests/ChannelTest.cpp
parente6fd98ab0f78c0b91c4b12075ffdb93bce2c4c0f (diff)
downloadqpid-python-3a87c67be419a3ae74ea456ae67be5d0f2d2ec92.tar.gz
* Added support for channel.flow:
cpp/tests/ChannelTest.cpp cpp/lib/broker/SessionHandlerImpl.cpp cpp/lib/broker/BrokerChannel.h cpp/lib/broker/BrokerChannel.cpp * Fixed client connection closing process: cpp/lib/common/sys/apr/Socket.cpp cpp/lib/client/Connector.h cpp/lib/client/Connector.cpp cpp/lib/client/Connection.h cpp/lib/client/Connection.cpp * Use amq.direct rather than default exchange in P2P test (to interop with java) cpp/tests/BasicP2Ptest.h git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@536584 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/tests/ChannelTest.cpp')
-rw-r--r--cpp/tests/ChannelTest.cpp32
1 files changed, 32 insertions, 0 deletions
diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp
index f0860b8a28..cc0a90bad9 100644
--- a/cpp/tests/ChannelTest.cpp
+++ b/cpp/tests/ChannelTest.cpp
@@ -53,6 +53,7 @@ class ChannelTest : public CppUnit::TestCase
CPPUNIT_TEST(testDeliveryAndRecovery);
CPPUNIT_TEST(testStaging);
CPPUNIT_TEST(testQueuePolicy);
+ CPPUNIT_TEST(testFlow);
CPPUNIT_TEST_SUITE_END();
class MockMessageStore : public NullMessageStore
@@ -303,6 +304,37 @@ class ChannelTest : public CppUnit::TestCase
store.check();
}
+
+ void testFlow(){
+ DummyHandler handler;
+ Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000);
+
+ const string data("abcdefghijklmn");
+
+ Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+ addContent(msg, data);
+ Queue::shared_ptr queue(new Queue("my_queue"));
+ ConnectionToken* owner(0);
+ string tag("no_ack");
+ channel.consume(tag, queue, false, false, owner);
+ channel.flow(false);
+ queue->deliver(msg);
+ CPPUNIT_ASSERT_EQUAL((size_t) 0, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount());
+ channel.flow(true);
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
+ BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody()));
+ AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody()));
+ AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
+ CPPUNIT_ASSERT(deliver);
+ CPPUNIT_ASSERT(contentHeader);
+ CPPUNIT_ASSERT(contentBody);
+ CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
+ }
+
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize)
{
Message* msg = new Message(0, exchange, routingKey, false, false);