summaryrefslogtreecommitdiff
path: root/cpp/src/tests/BrokerChannelTest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/BrokerChannelTest.cpp')
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp124
1 files changed, 47 insertions, 77 deletions
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index 929105f6e3..251ac624ab 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -48,13 +48,38 @@ struct MockHandler : ConnectionOutputHandler{
void close() {};
};
+struct DeliveryRecorder
+{
+ typedef std::pair<Message::shared_ptr, RequestId> Delivery;
+ std::vector<Delivery> delivered;
+
+ struct Adapter : DeliveryAdapter
+ {
+ RequestId id;
+ DeliveryRecorder& recorder;
+
+ Adapter(DeliveryRecorder& r) : recorder(r) {}
+
+ RequestId getNextDeliveryTag() { return id + 1; }
+ void deliver(Message::shared_ptr& msg, RequestId tag)
+ {
+ recorder.delivered.push_back(Delivery(msg, tag));
+ id++;
+ }
+
+ };
+
+ std::auto_ptr<DeliveryAdapter> createAdapter()
+ {
+ return std::auto_ptr<DeliveryAdapter>(new Adapter(*this));
+ }
+};
class BrokerChannelTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(BrokerChannelTest);
- CPPUNIT_TEST(testConsumerMgmt);
+ CPPUNIT_TEST(testConsumerMgmt);;
CPPUNIT_TEST(testDeliveryNoAck);
- CPPUNIT_TEST(testDeliveryAndRecovery);
CPPUNIT_TEST(testStaging);
CPPUNIT_TEST(testQueuePolicy);
CPPUNIT_TEST(testFlow);
@@ -160,11 +185,12 @@ class BrokerChannelTest : public CppUnit::TestCase
ConnectionToken* owner = 0;
string tag("my_consumer");
- channel.consume(tag, queue, false, false, owner);
+ std::auto_ptr<DeliveryAdapter> unused;
+ channel.consume(unused, tag, queue, false, false, owner);
string tagA;
string tagB;
- channel.consume(tagA, queue, false, false, owner);
- channel.consume(tagB, queue, false, false, owner);
+ channel.consume(unused, tagA, queue, false, false, owner);
+ channel.consume(unused, tagB, queue, false, false, owner);
CPPUNIT_ASSERT_EQUAL((uint32_t) 3, queue->getConsumerCount());
CPPUNIT_ASSERT(channel.exists("my_consumer"));
CPPUNIT_ASSERT(channel.exists(tagA));
@@ -178,65 +204,17 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL((uint32_t) 0, queue->getConsumerCount());
}
- void testDeliveryNoAck(){
+ void testDeliveryNoAck(){
Channel channel(connection, 7);
- channel.open();
- const string data("abcdefghijklmn");
- Message::shared_ptr msg(
- createMessage("test", "my_routing_key", "my_message_id", 14));
- addContent(msg, data);
- Queue::shared_ptr queue(new Queue("my_queue"));
- ConnectionToken* owner(0);
- string tag("no_ack");
- channel.consume(tag, queue, false, false, owner);
-
- queue->deliver(msg);
- CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel());
- CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
- handler.frames[0].getBody().get()));
- CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>(
- handler.frames[1].getBody().get()));
- CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>(
- handler.frames[2].getBody().get()));
- AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>(
- handler.frames[3].getBody().get());
- CPPUNIT_ASSERT(contentBody);
- CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
- }
-
- void testDeliveryAndRecovery(){
- Channel channel(connection, 7);
- channel.open();
- const string data("abcdefghijklmn");
-
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
- addContent(msg, data);
-
Queue::shared_ptr queue(new Queue("my_queue"));
- ConnectionToken* owner(0);
- string tag("ack");
- channel.consume(tag, queue, true, false, owner);
-
+ DeliveryRecorder recorder;
+ string tag("test");
+ channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
queue->deliver(msg);
- CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel());
- CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
- handler.frames[0].getBody().get()));
- CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>(
- handler.frames[1].getBody().get()));
- CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>(
- handler.frames[2].getBody().get()));
- AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>(
- handler.frames[3].getBody().get());
- CPPUNIT_ASSERT(contentBody);
- CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
+
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
+ CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
}
void testStaging(){
@@ -349,26 +327,18 @@ class BrokerChannelTest : public CppUnit::TestCase
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
addContent(msg, data);
Queue::shared_ptr queue(new Queue("my_queue"));
- ConnectionToken* owner(0);
- string tag("no_ack");
- channel.consume(tag, queue, false, false, owner);
+ DeliveryRecorder recorder;
+ string tag("test");
+ channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
channel.flow(false);
queue->deliver(msg);
- //ensure no more frames have been delivered
- CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount());
+ //ensure no messages have been delivered
+ CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size());
+
channel.flow(true);
- CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1].getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2].getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[3].getChannel());
- BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[1].getBody()));
- AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[2].getBody()));
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[3].getBody()));
- CPPUNIT_ASSERT(deliver);
- CPPUNIT_ASSERT(contentHeader);
- CPPUNIT_ASSERT(contentBody);
- CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
+ //ensure no messages have been delivered
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
+ CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
}
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)