diff options
author | Gordon Sim <gsim@apache.org> | 2007-05-09 17:00:32 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-05-09 17:00:32 +0000 |
commit | 3a87c67be419a3ae74ea456ae67be5d0f2d2ec92 (patch) | |
tree | 82f646b4394a31a6baa669f699a775454afadf36 /cpp/tests/ChannelTest.cpp | |
parent | e6fd98ab0f78c0b91c4b12075ffdb93bce2c4c0f (diff) | |
download | qpid-python-3a87c67be419a3ae74ea456ae67be5d0f2d2ec92.tar.gz |
* Added support for channel.flow:
cpp/tests/ChannelTest.cpp
cpp/lib/broker/SessionHandlerImpl.cpp
cpp/lib/broker/BrokerChannel.h
cpp/lib/broker/BrokerChannel.cpp
* Fixed client connection closing process:
cpp/lib/common/sys/apr/Socket.cpp
cpp/lib/client/Connector.h
cpp/lib/client/Connector.cpp
cpp/lib/client/Connection.h
cpp/lib/client/Connection.cpp
* Use amq.direct rather than default exchange in P2P test
(to interop with java)
cpp/tests/BasicP2Ptest.h
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@536584 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/tests/ChannelTest.cpp')
-rw-r--r-- | cpp/tests/ChannelTest.cpp | 32 |
1 files changed, 32 insertions, 0 deletions
diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp index f0860b8a28..cc0a90bad9 100644 --- a/cpp/tests/ChannelTest.cpp +++ b/cpp/tests/ChannelTest.cpp @@ -53,6 +53,7 @@ class ChannelTest : public CppUnit::TestCase CPPUNIT_TEST(testDeliveryAndRecovery); CPPUNIT_TEST(testStaging); CPPUNIT_TEST(testQueuePolicy); + CPPUNIT_TEST(testFlow); CPPUNIT_TEST_SUITE_END(); class MockMessageStore : public NullMessageStore @@ -303,6 +304,37 @@ class ChannelTest : public CppUnit::TestCase store.check(); } + + void testFlow(){ + DummyHandler handler; + Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000); + + const string data("abcdefghijklmn"); + + Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); + addContent(msg, data); + Queue::shared_ptr queue(new Queue("my_queue")); + ConnectionToken* owner(0); + string tag("no_ack"); + channel.consume(tag, queue, false, false, owner); + channel.flow(false); + queue->deliver(msg); + CPPUNIT_ASSERT_EQUAL((size_t) 0, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount()); + channel.flow(true); + CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); + BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); + AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); + AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); + CPPUNIT_ASSERT(deliver); + CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + } + Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize) { Message* msg = new Message(0, exchange, routingKey, false, false); |