summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp48
-rw-r--r--cpp/src/tests/FramingTest.cpp38
2 files changed, 40 insertions, 46 deletions
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index 05bdb7b3f0..eb67601875 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -48,30 +48,21 @@ struct MockHandler : ConnectionOutputHandler{
void close() {};
};
-struct DeliveryRecorder
+struct DeliveryRecorder : DeliveryAdapter
{
- typedef std::pair<Message::shared_ptr, RequestId> Delivery;
+ DeliveryId id;
+ typedef std::pair<Message::shared_ptr, DeliveryToken::shared_ptr> Delivery;
std::vector<Delivery> delivered;
- struct Adapter : DeliveryAdapter
+ DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
- RequestId id;
- DeliveryRecorder& recorder;
-
- Adapter(DeliveryRecorder& r) : recorder(r) {}
-
- RequestId getNextDeliveryTag() { return id + 1; }
- void deliver(Message::shared_ptr& msg, RequestId tag)
- {
- recorder.delivered.push_back(Delivery(msg, tag));
- id++;
- }
-
- };
+ delivered.push_back(Delivery(msg, token));
+ return ++id;
+ }
- std::auto_ptr<DeliveryAdapter> createAdapter()
+ void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId /*tag*/)
{
- return std::auto_ptr<DeliveryAdapter>(new Adapter(*this));
+ delivered.push_back(Delivery(msg, token));
}
};
@@ -166,6 +157,7 @@ class BrokerChannelTest : public CppUnit::TestCase
}
};
+ DeliveryRecorder recorder;
public:
@@ -179,13 +171,13 @@ class BrokerChannelTest : public CppUnit::TestCase
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
- Channel channel(connection, 0, 0);
+ Channel channel(connection, recorder, 0, 0);
channel.open();
CPPUNIT_ASSERT(!channel.exists("my_consumer"));
ConnectionToken* owner = 0;
string tag("my_consumer");
- std::auto_ptr<DeliveryAdapter> unused;
+ DeliveryToken::shared_ptr unused;
channel.consume(unused, tag, queue, false, false, owner);
string tagA;
string tagB;
@@ -205,24 +197,25 @@ class BrokerChannelTest : public CppUnit::TestCase
}
void testDeliveryNoAck(){
- Channel channel(connection, 7);
+ Channel channel(connection, recorder, 7);
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
Queue::shared_ptr queue(new Queue("my_queue"));
- DeliveryRecorder recorder;
string tag("test");
- channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
+ DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ channel.consume(token, tag, queue, false, false, 0);
queue->deliver(msg);
sleep(2);
CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
+ CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
void testStaging(){
MockMessageStore store;
connection.setFrameMax(1000);
connection.setStagingThreshold(10);
- Channel channel(connection, 1, &store);
+ Channel channel(connection, recorder, 1, &store);
const string data[] = {"abcde", "fghij", "klmno"};
Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
@@ -314,7 +307,7 @@ class BrokerChannelTest : public CppUnit::TestCase
}
void testFlow(){
- Channel channel(connection, 7);
+ Channel channel(connection, recorder, 7);
channel.open();
//there will always be a connection-start frame
CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
@@ -327,9 +320,9 @@ class BrokerChannelTest : public CppUnit::TestCase
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
addContent(msg, data);
Queue::shared_ptr queue(new Queue("my_queue"));
- DeliveryRecorder recorder;
string tag("test");
- channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
+ DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ channel.consume(token, tag, queue, false, false, 0);
channel.flow(false);
queue->deliver(msg);
//ensure no messages have been delivered
@@ -340,6 +333,7 @@ class BrokerChannelTest : public CppUnit::TestCase
//ensure no messages have been delivered
CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
+ CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp
index 134acb94f9..98f89b59be 100644
--- a/cpp/src/tests/FramingTest.cpp
+++ b/cpp/src/tests/FramingTest.cpp
@@ -108,7 +108,7 @@ class FramingTest : public CppUnit::TestCase
{
std::string a = "hostA";
std::string b = "hostB";
- ConnectionRedirectBody in(version, 0, a, b);
+ ConnectionRedirectBody in(version, a, b);
in.encodeContent(buffer);
buffer.flip();
ConnectionRedirectBody out(version);
@@ -146,7 +146,7 @@ class FramingTest : public CppUnit::TestCase
std::string a = "hostA";
std::string b = "hostB";
AMQFrame in(version, 999,
- new ConnectionRedirectBody(version, 0, a, b));
+ new ConnectionRedirectBody(version, a, b));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -157,7 +157,7 @@ class FramingTest : public CppUnit::TestCase
void testBasicConsumeOkBodyFrame()
{
std::string s = "hostA";
- AMQFrame in(version, 999, new BasicConsumeOkBody(version, 0, s));
+ AMQFrame in(version, 999, new BasicConsumeOkBody(version, s));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -400,22 +400,22 @@ class FramingTest : public CppUnit::TestCase
c.declareQueue(queue);
c.bind(exchange, queue, "MyTopic", framing::FieldTable());
broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin();
- ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: ]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=1): ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; request(id=1,mark=0): ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=1): QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=4,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=5,mark=2): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=6,mark=2): ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; request(id=2,mark=0): ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet=0]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionOpenOk: knownHosts=]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; ChannelOpen: outOfBand=]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ChannelOpenOk: ]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet={}]", *i++);
}
};