diff options
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/BrokerChannelTest.cpp | 48 | ||||
-rw-r--r-- | cpp/src/tests/FramingTest.cpp | 38 |
2 files changed, 40 insertions, 46 deletions
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index 05bdb7b3f0..eb67601875 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -48,30 +48,21 @@ struct MockHandler : ConnectionOutputHandler{ void close() {}; }; -struct DeliveryRecorder +struct DeliveryRecorder : DeliveryAdapter { - typedef std::pair<Message::shared_ptr, RequestId> Delivery; + DeliveryId id; + typedef std::pair<Message::shared_ptr, DeliveryToken::shared_ptr> Delivery; std::vector<Delivery> delivered; - struct Adapter : DeliveryAdapter + DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) { - RequestId id; - DeliveryRecorder& recorder; - - Adapter(DeliveryRecorder& r) : recorder(r) {} - - RequestId getNextDeliveryTag() { return id + 1; } - void deliver(Message::shared_ptr& msg, RequestId tag) - { - recorder.delivered.push_back(Delivery(msg, tag)); - id++; - } - - }; + delivered.push_back(Delivery(msg, token)); + return ++id; + } - std::auto_ptr<DeliveryAdapter> createAdapter() + void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId /*tag*/) { - return std::auto_ptr<DeliveryAdapter>(new Adapter(*this)); + delivered.push_back(Delivery(msg, token)); } }; @@ -166,6 +157,7 @@ class BrokerChannelTest : public CppUnit::TestCase } }; + DeliveryRecorder recorder; public: @@ -179,13 +171,13 @@ class BrokerChannelTest : public CppUnit::TestCase void testConsumerMgmt(){ Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(connection, 0, 0); + Channel channel(connection, recorder, 0, 0); channel.open(); CPPUNIT_ASSERT(!channel.exists("my_consumer")); ConnectionToken* owner = 0; string tag("my_consumer"); - std::auto_ptr<DeliveryAdapter> unused; + DeliveryToken::shared_ptr unused; channel.consume(unused, tag, queue, false, false, owner); string tagA; string tagB; @@ -205,24 +197,25 @@ class BrokerChannelTest : public CppUnit::TestCase } void testDeliveryNoAck(){ - Channel channel(connection, 7); + Channel channel(connection, recorder, 7); Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); Queue::shared_ptr queue(new Queue("my_queue")); - DeliveryRecorder recorder; string tag("test"); - channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); + DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token")); + channel.consume(token, tag, queue, false, false, 0); queue->deliver(msg); sleep(2); CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); + CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second); } void testStaging(){ MockMessageStore store; connection.setFrameMax(1000); connection.setStagingThreshold(10); - Channel channel(connection, 1, &store); + Channel channel(connection, recorder, 1, &store); const string data[] = {"abcde", "fghij", "klmno"}; Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false); @@ -314,7 +307,7 @@ class BrokerChannelTest : public CppUnit::TestCase } void testFlow(){ - Channel channel(connection, 7); + Channel channel(connection, recorder, 7); channel.open(); //there will always be a connection-start frame CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size()); @@ -327,9 +320,9 @@ class BrokerChannelTest : public CppUnit::TestCase Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); addContent(msg, data); Queue::shared_ptr queue(new Queue("my_queue")); - DeliveryRecorder recorder; string tag("test"); - channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); + DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token")); + channel.consume(token, tag, queue, false, false, 0); channel.flow(false); queue->deliver(msg); //ensure no messages have been delivered @@ -340,6 +333,7 @@ class BrokerChannelTest : public CppUnit::TestCase //ensure no messages have been delivered CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); + CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second); } Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp index 134acb94f9..98f89b59be 100644 --- a/cpp/src/tests/FramingTest.cpp +++ b/cpp/src/tests/FramingTest.cpp @@ -108,7 +108,7 @@ class FramingTest : public CppUnit::TestCase { std::string a = "hostA"; std::string b = "hostB"; - ConnectionRedirectBody in(version, 0, a, b); + ConnectionRedirectBody in(version, a, b); in.encodeContent(buffer); buffer.flip(); ConnectionRedirectBody out(version); @@ -146,7 +146,7 @@ class FramingTest : public CppUnit::TestCase std::string a = "hostA"; std::string b = "hostB"; AMQFrame in(version, 999, - new ConnectionRedirectBody(version, 0, a, b)); + new ConnectionRedirectBody(version, a, b)); in.encode(buffer); buffer.flip(); AMQFrame out; @@ -157,7 +157,7 @@ class FramingTest : public CppUnit::TestCase void testBasicConsumeOkBodyFrame() { std::string s = "hostA"; - AMQFrame in(version, 999, new BasicConsumeOkBody(version, 0, s)); + AMQFrame in(version, 999, new BasicConsumeOkBody(version, s)); in.encode(buffer); buffer.flip(); AMQFrame out; @@ -400,22 +400,22 @@ class FramingTest : public CppUnit::TestCase c.declareQueue(queue); c.bind(exchange, queue, "MyTopic", framing::FieldTable()); broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin(); - ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: ]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=1): ExecutionFlush: ]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; request(id=1,mark=0): ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=1): QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=4,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=5,mark=2): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=6,mark=2): ExecutionFlush: ]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; request(id=2,mark=0): ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet=0]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionOpenOk: knownHosts=]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; ChannelOpen: outOfBand=]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; ChannelOpenOk: ]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet={}]", *i++); } }; |