diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
commit | 80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch) | |
tree | 13677bf773bf25db03144aa72c97a49d2810240d /cpp/src/tests/BrokerChannelTest.cpp | |
parent | a9232d5a02a19f093f212cb0b76772a20b45cb1b (diff) | |
download | qpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz |
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses.
Some refactoring around message delivery.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/BrokerChannelTest.cpp')
-rw-r--r-- | cpp/src/tests/BrokerChannelTest.cpp | 48 |
1 files changed, 21 insertions, 27 deletions
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index 05bdb7b3f0..eb67601875 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -48,30 +48,21 @@ struct MockHandler : ConnectionOutputHandler{ void close() {}; }; -struct DeliveryRecorder +struct DeliveryRecorder : DeliveryAdapter { - typedef std::pair<Message::shared_ptr, RequestId> Delivery; + DeliveryId id; + typedef std::pair<Message::shared_ptr, DeliveryToken::shared_ptr> Delivery; std::vector<Delivery> delivered; - struct Adapter : DeliveryAdapter + DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) { - RequestId id; - DeliveryRecorder& recorder; - - Adapter(DeliveryRecorder& r) : recorder(r) {} - - RequestId getNextDeliveryTag() { return id + 1; } - void deliver(Message::shared_ptr& msg, RequestId tag) - { - recorder.delivered.push_back(Delivery(msg, tag)); - id++; - } - - }; + delivered.push_back(Delivery(msg, token)); + return ++id; + } - std::auto_ptr<DeliveryAdapter> createAdapter() + void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId /*tag*/) { - return std::auto_ptr<DeliveryAdapter>(new Adapter(*this)); + delivered.push_back(Delivery(msg, token)); } }; @@ -166,6 +157,7 @@ class BrokerChannelTest : public CppUnit::TestCase } }; + DeliveryRecorder recorder; public: @@ -179,13 +171,13 @@ class BrokerChannelTest : public CppUnit::TestCase void testConsumerMgmt(){ Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(connection, 0, 0); + Channel channel(connection, recorder, 0, 0); channel.open(); CPPUNIT_ASSERT(!channel.exists("my_consumer")); ConnectionToken* owner = 0; string tag("my_consumer"); - std::auto_ptr<DeliveryAdapter> unused; + DeliveryToken::shared_ptr unused; channel.consume(unused, tag, queue, false, false, owner); string tagA; string tagB; @@ -205,24 +197,25 @@ class BrokerChannelTest : public CppUnit::TestCase } void testDeliveryNoAck(){ - Channel channel(connection, 7); + Channel channel(connection, recorder, 7); Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); Queue::shared_ptr queue(new Queue("my_queue")); - DeliveryRecorder recorder; string tag("test"); - channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); + DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token")); + channel.consume(token, tag, queue, false, false, 0); queue->deliver(msg); sleep(2); CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); + CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second); } void testStaging(){ MockMessageStore store; connection.setFrameMax(1000); connection.setStagingThreshold(10); - Channel channel(connection, 1, &store); + Channel channel(connection, recorder, 1, &store); const string data[] = {"abcde", "fghij", "klmno"}; Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false); @@ -314,7 +307,7 @@ class BrokerChannelTest : public CppUnit::TestCase } void testFlow(){ - Channel channel(connection, 7); + Channel channel(connection, recorder, 7); channel.open(); //there will always be a connection-start frame CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size()); @@ -327,9 +320,9 @@ class BrokerChannelTest : public CppUnit::TestCase Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); addContent(msg, data); Queue::shared_ptr queue(new Queue("my_queue")); - DeliveryRecorder recorder; string tag("test"); - channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); + DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token")); + channel.consume(token, tag, queue, false, false, 0); channel.flow(false); queue->deliver(msg); //ensure no messages have been delivered @@ -340,6 +333,7 @@ class BrokerChannelTest : public CppUnit::TestCase //ensure no messages have been delivered CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); + CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second); } Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) |