diff options
author | Alan Conway <aconway@apache.org> | 2007-02-02 22:03:10 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-02-02 22:03:10 +0000 |
commit | b5c270f10496f522ef6a03a8fa60f85d55c9187d (patch) | |
tree | 714e7abf7ba591d00232d821440e51461175cb9e /cpp/tests/ChannelTest.cpp | |
parent | 750f272ac99e8c830807affb3ae68ab0beeca63f (diff) | |
download | qpid-python-b5c270f10496f522ef6a03a8fa60f85d55c9187d.tar.gz |
* cpp/lib/common/framing/MethodContext.h: Reduced MethodContext to
ChannelAdapter and Method Body. Request ID comes from body,
ChannelAdapter is used to send frames, not OutputHandler.
* cpp/lib/common/framing/ChannelAdapter.h,.cpp: Removed context member.
Context is per-method not per-channel.
* cpp/lib/broker/*: Replace direct use of OutputHandler and ChannelId
with MethodContext (for responses) or ChannelAdapter (for requests.)
Use context request-ID to construct responses, send all bodies via
ChannelAdapter.
* cpp/lib/broker/BrokerAdapter.cpp: Link broker::Channel to BrokerAdapter.
* cpp/lib/broker/*: Remove unnecessary ProtocolVersion parameters.
Fix bogus signatures: ProtocolVersion* -> const ProtocolVersion&
* Cosmetic changes, many files:
- fixed indentation, broke long lines.
- removed unnecessary qpid:: prefixes.
* broker/BrokerAdapter,BrokerChannel: Merged BrokerAdapter into
broker::channel.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@502767 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/tests/ChannelTest.cpp')
-rw-r--r-- | cpp/tests/ChannelTest.cpp | 92 |
1 files changed, 58 insertions, 34 deletions
diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp index 760a4d3344..a3dabe6408 100644 --- a/cpp/tests/ChannelTest.cpp +++ b/cpp/tests/ChannelTest.cpp @@ -28,6 +28,9 @@ #include <memory> #include <AMQP_HighestVersion.h> #include "AMQFrame.h" +#include "DummyChannel.h" +#include "broker/Connection.h" +#include "ProtocolInitiation.h" using namespace boost; using namespace qpid::broker; @@ -36,12 +39,12 @@ using namespace qpid::sys; using std::string; using std::queue; -struct DummyHandler : OutputHandler{ +struct DummyHandler : ConnectionOutputHandler{ std::vector<AMQFrame*> frames; - virtual void send(AMQFrame* frame){ - frames.push_back(frame); - } + void send(AMQFrame* frame){ frames.push_back(frame); } + + void close() {}; }; @@ -55,6 +58,10 @@ class ChannelTest : public CppUnit::TestCase CPPUNIT_TEST(testQueuePolicy); CPPUNIT_TEST_SUITE_END(); + Broker::shared_ptr broker; + Connection connection; + DummyHandler handler; + class MockMessageStore : public NullMessageStore { struct MethodCall @@ -135,9 +142,17 @@ class ChannelTest : public CppUnit::TestCase public: + ChannelTest() : + broker(Broker::create()), + connection(&handler, *broker) + { + connection.initiated(new ProtocolInitiation()); + } + + void testConsumerMgmt(){ Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(qpid::framing::highestProtocolVersion, 0, 0, 0); + Channel channel(connection, 0, 0, 0); channel.open(); CPPUNIT_ASSERT(!channel.exists("my_consumer")); @@ -162,12 +177,10 @@ class ChannelTest : public CppUnit::TestCase } void testDeliveryNoAck(){ - DummyHandler handler; - Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000); - + Channel channel(connection, 7, 10000); const string data("abcdefghijklmn"); - - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); + Message::shared_ptr msg( + createMessage("test", "my_routing_key", "my_message_id", 14)); addContent(msg, data); Queue::shared_ptr queue(new Queue("my_queue")); ConnectionToken* owner(0); @@ -175,22 +188,25 @@ class ChannelTest : public CppUnit::TestCase channel.consume(tag, queue, false, false, owner); queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel()); + CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( + handler.frames[0]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>( + handler.frames[1]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>( + handler.frames[2]->getBody().get())); + AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>( + handler.frames[3]->getBody().get()); CPPUNIT_ASSERT(contentBody); CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); } void testDeliveryAndRecovery(){ - DummyHandler handler; - Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000); + Channel channel(connection, 7, 10000); const string data("abcdefghijklmn"); Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); @@ -202,26 +218,32 @@ class ChannelTest : public CppUnit::TestCase channel.consume(tag, queue, true, false, owner); queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel()); + CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( + handler.frames[0]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>( + handler.frames[1]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>( + handler.frames[2]->getBody().get())); + AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>( + handler.frames[3]->getBody().get()); CPPUNIT_ASSERT(contentBody); CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); } void testStaging(){ MockMessageStore store; - DummyHandler handler; - Channel channel(qpid::framing::highestProtocolVersion, &handler, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); + Channel channel( + connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); const string data[] = {"abcde", "fghij", "klmno"}; - Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false); + Message* msg = new BasicMessage( + 0, "my_exchange", "my_routing_key", false, false, + DummyChannel::basicGetBody()); store.expect(); store.stage(msg); @@ -309,7 +331,9 @@ class ChannelTest : public CppUnit::TestCase Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize) { - BasicMessage* msg = new BasicMessage(0, exchange, routingKey, false, false); + BasicMessage* msg = new BasicMessage( + 0, exchange, routingKey, false, false, + DummyChannel::basicGetBody()); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(contentSize); msg->setHeader(header); |