summaryrefslogtreecommitdiff
path: root/cpp/src/tests/BrokerChannelTest.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
committerGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
commit80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch)
tree13677bf773bf25db03144aa72c97a49d2810240d /cpp/src/tests/BrokerChannelTest.cpp
parenta9232d5a02a19f093f212cb0b76772a20b45cb1b (diff)
downloadqpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses. Some refactoring around message delivery. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/BrokerChannelTest.cpp')
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp48
1 files changed, 21 insertions, 27 deletions
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index 05bdb7b3f0..eb67601875 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -48,30 +48,21 @@ struct MockHandler : ConnectionOutputHandler{
void close() {};
};
-struct DeliveryRecorder
+struct DeliveryRecorder : DeliveryAdapter
{
- typedef std::pair<Message::shared_ptr, RequestId> Delivery;
+ DeliveryId id;
+ typedef std::pair<Message::shared_ptr, DeliveryToken::shared_ptr> Delivery;
std::vector<Delivery> delivered;
- struct Adapter : DeliveryAdapter
+ DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
- RequestId id;
- DeliveryRecorder& recorder;
-
- Adapter(DeliveryRecorder& r) : recorder(r) {}
-
- RequestId getNextDeliveryTag() { return id + 1; }
- void deliver(Message::shared_ptr& msg, RequestId tag)
- {
- recorder.delivered.push_back(Delivery(msg, tag));
- id++;
- }
-
- };
+ delivered.push_back(Delivery(msg, token));
+ return ++id;
+ }
- std::auto_ptr<DeliveryAdapter> createAdapter()
+ void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId /*tag*/)
{
- return std::auto_ptr<DeliveryAdapter>(new Adapter(*this));
+ delivered.push_back(Delivery(msg, token));
}
};
@@ -166,6 +157,7 @@ class BrokerChannelTest : public CppUnit::TestCase
}
};
+ DeliveryRecorder recorder;
public:
@@ -179,13 +171,13 @@ class BrokerChannelTest : public CppUnit::TestCase
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
- Channel channel(connection, 0, 0);
+ Channel channel(connection, recorder, 0, 0);
channel.open();
CPPUNIT_ASSERT(!channel.exists("my_consumer"));
ConnectionToken* owner = 0;
string tag("my_consumer");
- std::auto_ptr<DeliveryAdapter> unused;
+ DeliveryToken::shared_ptr unused;
channel.consume(unused, tag, queue, false, false, owner);
string tagA;
string tagB;
@@ -205,24 +197,25 @@ class BrokerChannelTest : public CppUnit::TestCase
}
void testDeliveryNoAck(){
- Channel channel(connection, 7);
+ Channel channel(connection, recorder, 7);
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
Queue::shared_ptr queue(new Queue("my_queue"));
- DeliveryRecorder recorder;
string tag("test");
- channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
+ DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ channel.consume(token, tag, queue, false, false, 0);
queue->deliver(msg);
sleep(2);
CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
+ CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
void testStaging(){
MockMessageStore store;
connection.setFrameMax(1000);
connection.setStagingThreshold(10);
- Channel channel(connection, 1, &store);
+ Channel channel(connection, recorder, 1, &store);
const string data[] = {"abcde", "fghij", "klmno"};
Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
@@ -314,7 +307,7 @@ class BrokerChannelTest : public CppUnit::TestCase
}
void testFlow(){
- Channel channel(connection, 7);
+ Channel channel(connection, recorder, 7);
channel.open();
//there will always be a connection-start frame
CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
@@ -327,9 +320,9 @@ class BrokerChannelTest : public CppUnit::TestCase
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
addContent(msg, data);
Queue::shared_ptr queue(new Queue("my_queue"));
- DeliveryRecorder recorder;
string tag("test");
- channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
+ DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ channel.consume(token, tag, queue, false, false, 0);
channel.flow(false);
queue->deliver(msg);
//ensure no messages have been delivered
@@ -340,6 +333,7 @@ class BrokerChannelTest : public CppUnit::TestCase
//ensure no messages have been delivered
CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
+ CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)