summaryrefslogtreecommitdiff
path: root/cpp/tests/ChannelTest.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-02 22:03:10 +0000
committerAlan Conway <aconway@apache.org>2007-02-02 22:03:10 +0000
commitb5c270f10496f522ef6a03a8fa60f85d55c9187d (patch)
tree714e7abf7ba591d00232d821440e51461175cb9e /cpp/tests/ChannelTest.cpp
parent750f272ac99e8c830807affb3ae68ab0beeca63f (diff)
downloadqpid-python-b5c270f10496f522ef6a03a8fa60f85d55c9187d.tar.gz
* cpp/lib/common/framing/MethodContext.h: Reduced MethodContext to
ChannelAdapter and Method Body. Request ID comes from body, ChannelAdapter is used to send frames, not OutputHandler. * cpp/lib/common/framing/ChannelAdapter.h,.cpp: Removed context member. Context is per-method not per-channel. * cpp/lib/broker/*: Replace direct use of OutputHandler and ChannelId with MethodContext (for responses) or ChannelAdapter (for requests.) Use context request-ID to construct responses, send all bodies via ChannelAdapter. * cpp/lib/broker/BrokerAdapter.cpp: Link broker::Channel to BrokerAdapter. * cpp/lib/broker/*: Remove unnecessary ProtocolVersion parameters. Fix bogus signatures: ProtocolVersion* -> const ProtocolVersion& * Cosmetic changes, many files: - fixed indentation, broke long lines. - removed unnecessary qpid:: prefixes. * broker/BrokerAdapter,BrokerChannel: Merged BrokerAdapter into broker::channel. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@502767 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/tests/ChannelTest.cpp')
-rw-r--r--cpp/tests/ChannelTest.cpp92
1 files changed, 58 insertions, 34 deletions
diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp
index 760a4d3344..a3dabe6408 100644
--- a/cpp/tests/ChannelTest.cpp
+++ b/cpp/tests/ChannelTest.cpp
@@ -28,6 +28,9 @@
#include <memory>
#include <AMQP_HighestVersion.h>
#include "AMQFrame.h"
+#include "DummyChannel.h"
+#include "broker/Connection.h"
+#include "ProtocolInitiation.h"
using namespace boost;
using namespace qpid::broker;
@@ -36,12 +39,12 @@ using namespace qpid::sys;
using std::string;
using std::queue;
-struct DummyHandler : OutputHandler{
+struct DummyHandler : ConnectionOutputHandler{
std::vector<AMQFrame*> frames;
- virtual void send(AMQFrame* frame){
- frames.push_back(frame);
- }
+ void send(AMQFrame* frame){ frames.push_back(frame); }
+
+ void close() {};
};
@@ -55,6 +58,10 @@ class ChannelTest : public CppUnit::TestCase
CPPUNIT_TEST(testQueuePolicy);
CPPUNIT_TEST_SUITE_END();
+ Broker::shared_ptr broker;
+ Connection connection;
+ DummyHandler handler;
+
class MockMessageStore : public NullMessageStore
{
struct MethodCall
@@ -135,9 +142,17 @@ class ChannelTest : public CppUnit::TestCase
public:
+ ChannelTest() :
+ broker(Broker::create()),
+ connection(&handler, *broker)
+ {
+ connection.initiated(new ProtocolInitiation());
+ }
+
+
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
- Channel channel(qpid::framing::highestProtocolVersion, 0, 0, 0);
+ Channel channel(connection, 0, 0, 0);
channel.open();
CPPUNIT_ASSERT(!channel.exists("my_consumer"));
@@ -162,12 +177,10 @@ class ChannelTest : public CppUnit::TestCase
}
void testDeliveryNoAck(){
- DummyHandler handler;
- Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000);
-
+ Channel channel(connection, 7, 10000);
const string data("abcdefghijklmn");
-
- Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+ Message::shared_ptr msg(
+ createMessage("test", "my_routing_key", "my_message_id", 14));
addContent(msg, data);
Queue::shared_ptr queue(new Queue("my_queue"));
ConnectionToken* owner(0);
@@ -175,22 +188,25 @@ class ChannelTest : public CppUnit::TestCase
channel.consume(tag, queue, false, false, owner);
queue->deliver(msg);
- CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
- BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody()));
- AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody()));
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
- CPPUNIT_ASSERT(deliver);
- CPPUNIT_ASSERT(contentHeader);
+ CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel());
+ CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
+ handler.frames[0]->getBody().get()));
+ CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>(
+ handler.frames[1]->getBody().get()));
+ CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>(
+ handler.frames[2]->getBody().get()));
+ AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>(
+ handler.frames[3]->getBody().get());
CPPUNIT_ASSERT(contentBody);
CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
}
void testDeliveryAndRecovery(){
- DummyHandler handler;
- Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000);
+ Channel channel(connection, 7, 10000);
const string data("abcdefghijklmn");
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
@@ -202,26 +218,32 @@ class ChannelTest : public CppUnit::TestCase
channel.consume(tag, queue, true, false, owner);
queue->deliver(msg);
- CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
- BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody()));
- AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody()));
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
- CPPUNIT_ASSERT(deliver);
- CPPUNIT_ASSERT(contentHeader);
+ CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel());
+ CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
+ handler.frames[0]->getBody().get()));
+ CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>(
+ handler.frames[1]->getBody().get()));
+ CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>(
+ handler.frames[2]->getBody().get()));
+ AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>(
+ handler.frames[3]->getBody().get());
CPPUNIT_ASSERT(contentBody);
CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
}
void testStaging(){
MockMessageStore store;
- DummyHandler handler;
- Channel channel(qpid::framing::highestProtocolVersion, &handler, 1, 1000/*framesize*/, &store, 10/*staging threshold*/);
+ Channel channel(
+ connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/);
const string data[] = {"abcde", "fghij", "klmno"};
- Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
+ Message* msg = new BasicMessage(
+ 0, "my_exchange", "my_routing_key", false, false,
+ DummyChannel::basicGetBody());
store.expect();
store.stage(msg);
@@ -309,7 +331,9 @@ class ChannelTest : public CppUnit::TestCase
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize)
{
- BasicMessage* msg = new BasicMessage(0, exchange, routingKey, false, false);
+ BasicMessage* msg = new BasicMessage(
+ 0, exchange, routingKey, false, false,
+ DummyChannel::basicGetBody());
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(contentSize);
msg->setHeader(header);