summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/QueueTest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/QueueTest.cpp')
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp165
1 files changed, 90 insertions, 75 deletions
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index b70afa52a7..841a19f7c1 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,9 +39,12 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
+namespace qpid {
+namespace tests {
+
class TestConsumer : public virtual Consumer{
public:
- typedef boost::shared_ptr<TestConsumer> shared_ptr;
+ typedef boost::shared_ptr<TestConsumer> shared_ptr;
intrusive_ptr<Message> last;
bool received;
@@ -82,68 +85,68 @@ QPID_AUTO_TEST_SUITE(QueueTestSuite)
QPID_AUTO_TEST_CASE(testAsyncMessage) {
Queue::shared_ptr queue(new Queue("my_test_queue", true));
intrusive_ptr<Message> received;
-
+
TestConsumer::shared_ptr c1(new TestConsumer());
queue->consume(c1);
-
-
+
+
//Test basic delivery:
intrusive_ptr<Message> msg1 = create_message("e", "A");
msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process
queue->process(msg1);
sleep(2);
-
+
BOOST_CHECK(!c1->received);
msg1->enqueueComplete();
-
+
received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg1.get(), received.get());
+ BOOST_CHECK_EQUAL(msg1.get(), received.get());
}
-
-
+
+
QPID_AUTO_TEST_CASE(testAsyncMessageCount){
Queue::shared_ptr queue(new Queue("my_test_queue", true));
intrusive_ptr<Message> msg1 = create_message("e", "A");
msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process
-
+
queue->process(msg1);
sleep(2);
uint32_t compval=0;
BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
msg1->enqueueComplete();
compval=1;
- BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
+ BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
}
QPID_AUTO_TEST_CASE(testConsumers){
Queue::shared_ptr queue(new Queue("my_queue", true));
-
+
//Test adding consumers:
TestConsumer::shared_ptr c1(new TestConsumer());
TestConsumer::shared_ptr c2(new TestConsumer());
queue->consume(c1);
queue->consume(c2);
-
+
BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount());
-
+
//Test basic delivery:
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
intrusive_ptr<Message> msg3 = create_message("e", "C");
-
+
queue->deliver(msg1);
BOOST_CHECK(queue->dispatch(c1));
BOOST_CHECK_EQUAL(msg1.get(), c1->last.get());
-
+
queue->deliver(msg2);
BOOST_CHECK(queue->dispatch(c2));
BOOST_CHECK_EQUAL(msg2.get(), c2->last.get());
-
+
c1->received = false;
queue->deliver(msg3);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg3.get(), c1->last.get());
-
+ BOOST_CHECK_EQUAL(msg3.get(), c1->last.get());
+
//Test cancellation:
queue->cancel(c1);
BOOST_CHECK_EQUAL(uint32_t(1), queue->getConsumerCount());
@@ -157,15 +160,15 @@ QPID_AUTO_TEST_CASE(testRegistry){
registry.declare("queue1", true, true);
registry.declare("queue2", true, true);
registry.declare("queue3", true, true);
-
+
BOOST_CHECK(registry.find("queue1"));
BOOST_CHECK(registry.find("queue2"));
BOOST_CHECK(registry.find("queue3"));
-
+
registry.destroy("queue1");
registry.destroy("queue2");
registry.destroy("queue3");
-
+
BOOST_CHECK(!registry.find("queue1"));
BOOST_CHECK(!registry.find("queue2"));
BOOST_CHECK(!registry.find("queue3"));
@@ -177,13 +180,13 @@ QPID_AUTO_TEST_CASE(testDequeue){
intrusive_ptr<Message> msg2 = create_message("e", "B");
intrusive_ptr<Message> msg3 = create_message("e", "C");
intrusive_ptr<Message> received;
-
+
queue->deliver(msg1);
queue->deliver(msg2);
queue->deliver(msg3);
-
+
BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
-
+
received = queue->get().payload;
BOOST_CHECK_EQUAL(msg1.get(), received.get());
BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount());
@@ -204,7 +207,7 @@ QPID_AUTO_TEST_CASE(testDequeue){
received = queue->get().payload;
BOOST_CHECK(!received);
BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
-
+
}
QPID_AUTO_TEST_CASE(testBound)
@@ -236,7 +239,7 @@ QPID_AUTO_TEST_CASE(testBound)
queue->unbind(exchanges, queue);
//ensure the remaining exchanges don't still have the queue bound to them:
- FailOnDeliver deliverable;
+ FailOnDeliver deliverable;
exchange1->route(deliverable, key, &args);
exchange3->route(deliverable, key, &args);
}
@@ -245,10 +248,10 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
client::QueueOptions args;
args.setPersistLastNode();
-
+
Queue::shared_ptr queue(new Queue("my-queue", true));
queue->configure(args);
-
+
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
intrusive_ptr<Message> msg3 = create_message("e", "C");
@@ -256,13 +259,13 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
//enqueue 2 messages
queue->deliver(msg1);
queue->deliver(msg2);
-
+
//change mode
queue->setLastNodeFailure();
-
+
//enqueue 1 message
queue->deliver(msg3);
-
+
//check all have persistent ids.
BOOST_CHECK(msg1->isPersistent());
BOOST_CHECK(msg2->isPersistent());
@@ -277,7 +280,7 @@ class TestMessageStoreOC : public NullMessageStore
uint enqCnt;
uint deqCnt;
bool error;
-
+
virtual void dequeue(TransactionContext*,
const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
const PersistableQueue& /*queue*/)
@@ -298,7 +301,7 @@ class TestMessageStoreOC : public NullMessageStore
{
error=true;
}
-
+
TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {}
~TestMessageStoreOC(){}
};
@@ -312,7 +315,7 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
-
+
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
intrusive_ptr<Message> msg3 = create_message("e", "C");
@@ -324,27 +327,27 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
string key;
args.getLVQKey(key);
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
+
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-
+
//enqueue 4 message
queue->deliver(msg1);
queue->deliver(msg2);
queue->deliver(msg3);
queue->deliver(msg4);
-
+
BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
+
received = queue->get().payload;
BOOST_CHECK_EQUAL(msg4.get(), received.get());
received = queue->get().payload;
BOOST_CHECK_EQUAL(msg2.get(), received.get());
-
+
received = queue->get().payload;
BOOST_CHECK_EQUAL(msg3.get(), received.get());
@@ -357,18 +360,18 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
queue->deliver(msg5);
queue->deliver(msg6);
queue->deliver(msg7);
-
+
BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
+
received = queue->get().payload;
BOOST_CHECK_EQUAL(msg5.get(), received.get());
received = queue->get().payload;
BOOST_CHECK_EQUAL(msg6.get(), received.get());
-
+
received = queue->get().payload;
BOOST_CHECK_EQUAL(msg7.get(), received.get());
-
+
}
QPID_AUTO_TEST_CASE(testLVQEmptyKey){
@@ -379,20 +382,20 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
-
+
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
string key;
args.getLVQKey(key);
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
+
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
queue->deliver(msg1);
queue->deliver(msg2);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
-
+
}
QPID_AUTO_TEST_CASE(testLVQAcquire){
@@ -403,7 +406,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
-
+
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
intrusive_ptr<Message> msg3 = create_message("e", "C");
@@ -416,7 +419,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
string key;
args.getLVQKey(key);
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
+
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
@@ -424,13 +427,13 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
-
+
//enqueue 4 message
queue->deliver(msg1);
queue->deliver(msg2);
queue->deliver(msg3);
queue->deliver(msg4);
-
+
BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
framing::SequenceNumber sequence(1);
@@ -439,9 +442,9 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
BOOST_CHECK(!queue->acquire(qmsg));
BOOST_CHECK(queue->acquire(qmsg2));
-
+
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
-
+
queue->deliver(msg5);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
@@ -449,11 +452,11 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
args.setOrdering(client::LVQ_NO_BROWSE);
queue->configure(args);
TestConsumer::shared_ptr c1(new TestConsumer(false));
-
+
queue->dispatch(c1);
queue->dispatch(c1);
queue->dispatch(c1);
-
+
queue->deliver(msg6);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
@@ -474,7 +477,7 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
intrusive_ptr<Message> received;
queue1->configure(args);
queue2->configure(args);
-
+
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "A");
@@ -484,17 +487,17 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-
+
queue1->deliver(msg1);
queue2->deliver(msg1);
queue1->deliver(msg2);
-
+
received = queue1->get().payload;
BOOST_CHECK_EQUAL(msg2.get(), received.get());
received = queue2->get().payload;
BOOST_CHECK_EQUAL(msg1.get(), received.get());
-
+
}
QPID_AUTO_TEST_CASE(testLVQRecover){
@@ -518,7 +521,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
intrusive_ptr<Message> received;
queue1->configure(args);
-
+
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "A");
// 2
@@ -544,7 +547,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
BOOST_CHECK_EQUAL(testStore.deqCnt, 1u);
}
-void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
+void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
{
for (uint i = 0; i < count; i++) {
intrusive_ptr<Message> m = create_message("exchange", "key");
@@ -592,7 +595,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
queue1->configure(args);
Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
queue2->configure(args);
-
+
intrusive_ptr<Message> msg1 = create_message("e", "A");
queue1->deliver(msg1);
@@ -623,7 +626,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
// check no failure messages are stored
queue1->clearLastNodeFailure();
queue2->clearLastNodeFailure();
-
+
intrusive_ptr<Message> msg3 = create_message("e", "B");
queue1->deliver(msg3);
queue2->deliver(msg3);
@@ -631,7 +634,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
queue1->setLastNodeFailure();
queue2->setLastNodeFailure();
BOOST_CHECK_EQUAL(testStore.enqCnt, 6u);
-
+
// check requeue 1
intrusive_ptr<Message> msg4 = create_message("e", "C");
intrusive_ptr<Message> msg5 = create_message("e", "D");
@@ -639,17 +642,17 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
framing::SequenceNumber sequence(1);
QueuedMessage qmsg1(queue1.get(), msg4, sequence);
QueuedMessage qmsg2(queue2.get(), msg5, ++sequence);
-
+
queue1->requeue(qmsg1);
BOOST_CHECK_EQUAL(testStore.enqCnt, 7u);
-
+
// check requeue 2
queue2->clearLastNodeFailure();
queue2->requeue(qmsg2);
BOOST_CHECK_EQUAL(testStore.enqCnt, 7u);
queue2->setLastNodeFailure();
BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
-
+
queue2->clearLastNodeFailure();
queue2->setLastNodeFailure();
BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
@@ -664,8 +667,8 @@ simulate this:
4. stop and recover remaining node
5. add another node
6. kill that new node again
-make sure that an attempt to re-enqueue a message does not happen which will
-result in the last man standing exiting with an error.
+make sure that an attempt to re-enqueue a message does not happen which will
+result in the last man standing exiting with an error.
we need to make sure that recover is safe, i.e. messages are
not requeued to the store.
@@ -678,7 +681,7 @@ not requeued to the store.
Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
intrusive_ptr<Message> received;
queue1->configure(args);
-
+
// check requeue 1
intrusive_ptr<Message> msg1 = create_message("e", "C");
intrusive_ptr<Message> msg2 = create_message("e", "D");
@@ -711,17 +714,29 @@ simulate store excption going into last node standing
Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
intrusive_ptr<Message> received;
queue1->configure(args);
-
+
// check requeue 1
intrusive_ptr<Message> msg1 = create_message("e", "C");
queue1->deliver(msg1);
testStore.createError();
-
+
ScopedSuppressLogging sl; // Suppress messages for expected errors.
queue1->setLastNodeFailure();
BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-}QPID_AUTO_TEST_SUITE_END()
+}
+
+intrusive_ptr<Message> mkMsg(std::string exchange, std::string routingKey) {
+ intrusive_ptr<Message> msg(new Message());
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ return msg;
+}
+QPID_AUTO_TEST_SUITE_END()
+}} // namespace qpid::tests