summaryrefslogtreecommitdiff
path: root/cpp/src/tests/QueueTest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/QueueTest.cpp')
-rw-r--r--cpp/src/tests/QueueTest.cpp444
1 files changed, 76 insertions, 368 deletions
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index aaa2721021..80c69ac386 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -36,9 +36,6 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
-#include "qpid/broker/QueuePolicy.h"
-#include "qpid/broker/QueueFlowLimit.h"
-
#include <iostream>
#include "boost/format.hpp"
@@ -56,12 +53,12 @@ class TestConsumer : public virtual Consumer{
public:
typedef boost::shared_ptr<TestConsumer> shared_ptr;
- QueuedMessage last;
+ intrusive_ptr<Message> last;
bool received;
- TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {};
+ TestConsumer(bool acquire = true):Consumer(acquire), received(false) {};
virtual bool deliver(QueuedMessage& msg){
- last = msg;
+ last = msg.payload;
received = true;
return true;
};
@@ -81,14 +78,13 @@ public:
Message& getMessage() { return *(msg.get()); }
};
-intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
+intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey) {
intrusive_ptr<Message> msg(new Message());
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
AMQFrame header((AMQHeaderBody()));
msg->getFrames().append(method);
msg->getFrames().append(header);
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
- if (ttl) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
return msg;
}
@@ -149,16 +145,16 @@ QPID_AUTO_TEST_CASE(testConsumers){
queue->deliver(msg1);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get());
+ BOOST_CHECK_EQUAL(msg1.get(), c1->last.get());
queue->deliver(msg2);
BOOST_CHECK(queue->dispatch(c2));
- BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get());
+ BOOST_CHECK_EQUAL(msg2.get(), c2->last.get());
c1->received = false;
queue->deliver(msg3);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get());
+ BOOST_CHECK_EQUAL(msg3.get(), c1->last.get());
//Test cancellation:
queue->cancel(c1);
@@ -214,7 +210,7 @@ QPID_AUTO_TEST_CASE(testDequeue){
if (!consumer->received)
sleep(2);
- BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
+ BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
received = queue->get().payload;
@@ -248,7 +244,7 @@ QPID_AUTO_TEST_CASE(testBound){
exchange2.reset();
//unbind the queue from all exchanges it knows it has been bound to:
- queue->unbind(exchanges);
+ queue->unbind(exchanges, queue);
//ensure the remaining exchanges don't still have the queue bound to them:
FailOnDeliver deliverable;
@@ -258,26 +254,26 @@ QPID_AUTO_TEST_CASE(testBound){
QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
client::QueueOptions args;
- args.setPersistLastNode();
+ args.setPersistLastNode();
- Queue::shared_ptr queue(new Queue("my-queue", true));
+ Queue::shared_ptr queue(new Queue("my-queue", true));
queue->configure(args);
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
intrusive_ptr<Message> msg3 = create_message("e", "C");
- //enqueue 2 messages
+ //enqueue 2 messages
queue->deliver(msg1);
queue->deliver(msg2);
- //change mode
- queue->setLastNodeFailure();
+ //change mode
+ queue->setLastNodeFailure();
- //enqueue 1 message
+ //enqueue 1 message
queue->deliver(msg3);
- //check all have persistent ids.
+ //check all have persistent ids.
BOOST_CHECK(msg1->isPersistent());
BOOST_CHECK(msg2->isPersistent());
BOOST_CHECK(msg3->isPersistent());
@@ -287,58 +283,54 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
QPID_AUTO_TEST_CASE(testSeek){
- Queue::shared_ptr queue(new Queue("my-queue", true));
+ Queue::shared_ptr queue(new Queue("my-queue", true));
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
intrusive_ptr<Message> msg3 = create_message("e", "C");
- //enqueue 2 messages
+ //enqueue 2 messages
queue->deliver(msg1);
queue->deliver(msg2);
queue->deliver(msg3);
- TestConsumer::shared_ptr consumer(new TestConsumer("test", false));
+ TestConsumer::shared_ptr consumer(new TestConsumer(false));
SequenceNumber seq(2);
consumer->position = seq;
QueuedMessage qm;
queue->dispatch(consumer);
-
- BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
+
+ BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
queue->dispatch(consumer);
queue->dispatch(consumer); // make sure over-run is safe
-
+
}
QPID_AUTO_TEST_CASE(testSearch){
- Queue::shared_ptr queue(new Queue("my-queue", true));
+ Queue::shared_ptr queue(new Queue("my-queue", true));
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
intrusive_ptr<Message> msg3 = create_message("e", "C");
- //enqueue 2 messages
+ //enqueue 2 messages
queue->deliver(msg1);
queue->deliver(msg2);
queue->deliver(msg3);
SequenceNumber seq(2);
- QueuedMessage qm;
- TestConsumer::shared_ptr c1(new TestConsumer());
-
- BOOST_CHECK(queue->find(seq, qm));
-
+ QueuedMessage qm = queue->find(seq);
+
BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
-
- queue->acquire(qm, c1->getName());
+
+ queue->acquire(qm);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
SequenceNumber seq1(3);
- QueuedMessage qm1;
- BOOST_CHECK(queue->find(seq1, qm1));
+ QueuedMessage qm1 = queue->find(seq1);
BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
-
+
}
const std::string nullxid = "";
@@ -424,10 +416,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
client::QueueOptions args;
// set queue mode
- args.setOrdering(client::LVQ);
+ args.setOrdering(client::LVQ);
- Queue::shared_ptr queue(new Queue("my-queue", true ));
- queue->configure(args);
+ Queue::shared_ptr queue(new Queue("my-queue", true ));
+ queue->configure(args);
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
@@ -438,16 +430,16 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
//set deliever match for LVQ a,b,c,a
string key;
- args.getLVQKey(key);
+ args.getLVQKey(key);
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->insertCustomProperty(key,"a");
- msg2->insertCustomProperty(key,"b");
- msg3->insertCustomProperty(key,"c");
- msg4->insertCustomProperty(key,"a");
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+ msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+ msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- //enqueue 4 message
+ //enqueue 4 message
queue->deliver(msg1);
queue->deliver(msg2);
queue->deliver(msg3);
@@ -467,9 +459,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
intrusive_ptr<Message> msg5 = create_message("e", "A");
intrusive_ptr<Message> msg6 = create_message("e", "B");
intrusive_ptr<Message> msg7 = create_message("e", "C");
- msg5->insertCustomProperty(key,"a");
- msg6->insertCustomProperty(key,"b");
- msg7->insertCustomProperty(key,"c");
+ msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+ msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
queue->deliver(msg5);
queue->deliver(msg6);
queue->deliver(msg7);
@@ -504,7 +496,7 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->insertCustomProperty(key,"a");
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
queue->deliver(msg1);
queue->deliver(msg2);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
@@ -516,8 +508,6 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
client::QueueOptions args;
// set queue mode
args.setOrdering(client::LVQ);
- // disable flow control, as this test violates the enqueue/dequeue sequence.
- args.setInt(QueueFlowLimit::flowStopCountKey, 0);
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
@@ -536,12 +526,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->insertCustomProperty(key,"a");
- msg2->insertCustomProperty(key,"b");
- msg3->insertCustomProperty(key,"c");
- msg4->insertCustomProperty(key,"a");
- msg5->insertCustomProperty(key,"b");
- msg6->insertCustomProperty(key,"c");
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+ msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+ msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+ msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
//enqueue 4 message
queue->deliver(msg1);
@@ -556,13 +546,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
QueuedMessage qmsg2(queue.get(), msg2, ++sequence);
framing::SequenceNumber sequence1(10);
QueuedMessage qmsg3(queue.get(), 0, sequence1);
- TestConsumer::shared_ptr dummy(new TestConsumer());
- BOOST_CHECK(!queue->acquire(qmsg, dummy->getName()));
- BOOST_CHECK(queue->acquire(qmsg2, dummy->getName()));
+ BOOST_CHECK(!queue->acquire(qmsg));
+ BOOST_CHECK(queue->acquire(qmsg2));
// Acquire the massage again to test failure case.
- BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName()));
- BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName()));
+ BOOST_CHECK(!queue->acquire(qmsg2));
+ BOOST_CHECK(!queue->acquire(qmsg3));
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
@@ -572,7 +561,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
// set mode to no browse and check
args.setOrdering(client::LVQ_NO_BROWSE);
queue->configure(args);
- TestConsumer::shared_ptr c1(new TestConsumer("test", false));
+ TestConsumer::shared_ptr c1(new TestConsumer(false));
queue->dispatch(c1);
queue->dispatch(c1);
@@ -606,8 +595,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
args.getLVQKey(key);
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->insertCustomProperty(key,"a");
- msg2->insertCustomProperty(key,"a");
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
queue1->deliver(msg1);
queue2->deliver(msg1);
@@ -641,7 +630,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
intrusive_ptr<Message> received;
- queue1->create(args);
+ queue1->configure(args);
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "A");
@@ -650,9 +639,9 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
args.getLVQKey(key);
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->insertCustomProperty(key,"a");
- msg2->insertCustomProperty(key,"a");
- // 3
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ // 3
queue1->deliver(msg1);
// 4
queue1->setLastNodeFailure();
@@ -671,8 +660,13 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
{
for (uint i = 0; i < count; i++) {
- intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl);
- m->computeExpiration(new broker::ExpiryPolicy);
+ intrusive_ptr<Message> m = create_message("exchange", "key");
+ if (i % 2) {
+ if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl);
+ } else {
+ if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl);
+ }
+ m->setTimestamp(new broker::ExpiryPolicy);
queue.deliver(m);
}
}
@@ -682,7 +676,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
addMessagesToQueue(10, queue);
BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u);
::usleep(300*1000);
- queue.purgeExpired(0);
+ queue.purgeExpired();
BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u);
}
@@ -693,7 +687,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
addMessagesToQueue(10, *queue, 200, 400);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
- QueueCleaner cleaner(queues, &timer);
+ QueueCleaner cleaner(queues, timer);
cleaner.start(100 * qpid::sys::TIME_MSEC);
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
@@ -701,280 +695,6 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
}
-
-namespace {
- // helper for group tests
- void verifyAcquire( Queue::shared_ptr queue,
- TestConsumer::shared_ptr c,
- std::deque<QueuedMessage>& results,
- const std::string& expectedGroup,
- const int expectedId )
- {
- queue->dispatch(c);
- results.push_back(c->last);
- std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
- int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
- BOOST_CHECK_EQUAL( group, expectedGroup );
- BOOST_CHECK_EQUAL( id, expectedId );
- }
-}
-
-QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
- //
- // Verify that consumers of grouped messages own the groups once a message is acquired,
- // and release the groups once all acquired messages have been dequeued or requeued
- //
- FieldTable args;
- Queue::shared_ptr queue(new Queue("my_queue", true));
- args.setString("qpid.group_header_key", "GROUP-ID");
- args.setInt("qpid.shared_msg_group", 1);
- queue->configure(args);
-
- std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
- std::string("b"), std::string("b"), std::string("b"),
- std::string("c"), std::string("c"), std::string("c") };
- for (int i = 0; i < 9; ++i) {
- intrusive_ptr<Message> msg = create_message("e", "A");
- msg->insertCustomProperty("GROUP-ID", groups[i]);
- msg->insertCustomProperty("MY-ID", i);
- queue->deliver(msg);
- }
-
- // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
- // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,
-
- BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount());
-
- TestConsumer::shared_ptr c1(new TestConsumer("C1"));
- TestConsumer::shared_ptr c2(new TestConsumer("C2"));
-
- queue->consume(c1);
- queue->consume(c2);
-
- std::deque<QueuedMessage> dequeMeC1;
- std::deque<QueuedMessage> dequeMeC2;
-
-
- verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0)
- verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3)
-
- // now let c1 complete the 'a-0' message - this should free the 'a' group
- queue->dequeue( 0, dequeMeC1.front() );
- dequeMeC1.pop_front();
-
- // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
- // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, ---
-
- // now c2 should pick up the next 'a-1', since it is oldest free
- verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b"
-
- // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
- // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, ---
-
- // c1 should only be able to snarf up the first "c" message now...
- verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c"
-
- // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
- // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1
-
- // hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired)
- queue->dequeue( 0, dequeMeC2.front() );
- dequeMeC2.pop_front();
-
- // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
- // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1
-
- // b group is free, c is owned by c1 - c1's next get should grab 'b-4'
- verifyAcquire(queue, c1, dequeMeC1, "b", 4 );
-
- // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
- // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1
-
- // c2 can now only grab a-2, and that's all
- verifyAcquire(queue, c2, dequeMeC2, "a", 2 );
-
- // now C2 can't get any more, since C1 owns "b" and "c" group...
- bool gotOne = queue->dispatch(c2);
- BOOST_CHECK( !gotOne );
-
- // hmmm... what if c1 now dequeues "c-6"? (now only own's b-4)
- queue->dequeue( 0, dequeMeC1.front() );
- dequeMeC1.pop_front();
-
- // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
- // Owners= ^C2, ^C2, ^C1, ^C1, ---, ---
-
- // c2 can now grab c-7
- verifyAcquire(queue, c2, dequeMeC2, "c", 7 );
-
- // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
- // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2
-
- // what happens if C-2 "requeues" a-1 and a-2?
- queue->requeue( dequeMeC2.front() );
- dequeMeC2.pop_front();
- queue->requeue( dequeMeC2.front() );
- dequeMeC2.pop_front(); // now just has c-7 acquired
-
- // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
- // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2
-
- // now c1 will grab a-1 and a-2...
- verifyAcquire(queue, c1, dequeMeC1, "a", 1 );
- verifyAcquire(queue, c1, dequeMeC1, "a", 2 );
-
- // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
- // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2
-
- // c2 can now acquire c-8 only
- verifyAcquire(queue, c2, dequeMeC2, "c", 8 );
-
- // and c1 can get b-5
- verifyAcquire(queue, c1, dequeMeC1, "b", 5 );
-
- // should be no more acquire-able for anyone now:
- gotOne = queue->dispatch(c1);
- BOOST_CHECK( !gotOne );
- gotOne = queue->dispatch(c2);
- BOOST_CHECK( !gotOne );
-
- // requeue all of C1's acquired messages, then cancel C1
- while (!dequeMeC1.empty()) {
- queue->requeue(dequeMeC1.front());
- dequeMeC1.pop_front();
- }
- queue->cancel(c1);
-
- // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
- // Owners= ---, ---, ---, ---, ^C2, ^C2
-
- // b-4, a-1, a-2, b-5 all should be available, right?
- verifyAcquire(queue, c2, dequeMeC2, "a", 1 );
-
- while (!dequeMeC2.empty()) {
- queue->dequeue(0, dequeMeC2.front());
- dequeMeC2.pop_front();
- }
-
- // Queue = a-2, b-4, b-5
- // Owners= ---, ---, ---
-
- TestConsumer::shared_ptr c3(new TestConsumer("C3"));
- std::deque<QueuedMessage> dequeMeC3;
-
- verifyAcquire(queue, c3, dequeMeC3, "a", 2 );
- verifyAcquire(queue, c2, dequeMeC2, "b", 4 );
-
- // Queue = a-2, b-4, b-5
- // Owners= ^C3, ^C2, ^C2
-
- gotOne = queue->dispatch(c3);
- BOOST_CHECK( !gotOne );
-
- verifyAcquire(queue, c2, dequeMeC2, "b", 5 );
-
- while (!dequeMeC2.empty()) {
- queue->dequeue(0, dequeMeC2.front());
- dequeMeC2.pop_front();
- }
-
- // Queue = a-2,
- // Owners= ^C3,
-
- intrusive_ptr<Message> msg = create_message("e", "A");
- msg->insertCustomProperty("GROUP-ID", "a");
- msg->insertCustomProperty("MY-ID", 9);
- queue->deliver(msg);
-
- // Queue = a-2, a-9
- // Owners= ^C3, ^C3
-
- gotOne = queue->dispatch(c2);
- BOOST_CHECK( !gotOne );
-
- msg = create_message("e", "A");
- msg->insertCustomProperty("GROUP-ID", "b");
- msg->insertCustomProperty("MY-ID", 10);
- queue->deliver(msg);
-
- // Queue = a-2, a-9, b-10
- // Owners= ^C3, ^C3, ----
-
- verifyAcquire(queue, c2, dequeMeC2, "b", 10 );
- verifyAcquire(queue, c3, dequeMeC3, "a", 9 );
-
- gotOne = queue->dispatch(c3);
- BOOST_CHECK( !gotOne );
-
- queue->cancel(c2);
- queue->cancel(c3);
-}
-
-
-QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
- //
- // Verify that the same default group name is automatically applied to messages that
- // do not specify a group name.
- //
- FieldTable args;
- Queue::shared_ptr queue(new Queue("my_queue", true));
- args.setString("qpid.group_header_key", "GROUP-ID");
- args.setInt("qpid.shared_msg_group", 1);
- queue->configure(args);
-
- for (int i = 0; i < 3; ++i) {
- intrusive_ptr<Message> msg = create_message("e", "A");
- // no "GROUP-ID" header
- msg->insertCustomProperty("MY-ID", i);
- queue->deliver(msg);
- }
-
- // Queue = 0, 1, 2
-
- BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
-
- TestConsumer::shared_ptr c1(new TestConsumer("C1"));
- TestConsumer::shared_ptr c2(new TestConsumer("C2"));
-
- queue->consume(c1);
- queue->consume(c2);
-
- std::deque<QueuedMessage> dequeMeC1;
- std::deque<QueuedMessage> dequeMeC2;
-
- queue->dispatch(c1); // c1 now owns default group (acquired 0)
- dequeMeC1.push_back(c1->last);
- int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
- BOOST_CHECK_EQUAL( id, 0 );
-
- bool gotOne = queue->dispatch(c2); // c2 should get nothing
- BOOST_CHECK( !gotOne );
-
- queue->dispatch(c1); // c1 now acquires 1
- dequeMeC1.push_back(c1->last);
- id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
- BOOST_CHECK_EQUAL( id, 1 );
-
- gotOne = queue->dispatch(c2); // c2 should still get nothing
- BOOST_CHECK( !gotOne );
-
- while (!dequeMeC1.empty()) {
- queue->dequeue(0, dequeMeC1.front());
- dequeMeC1.pop_front();
- }
-
- // now default group should be available...
- queue->dispatch(c2); // c2 now owns default group (acquired 2)
- id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
- BOOST_CHECK_EQUAL( id, 2 );
-
- gotOne = queue->dispatch(c1); // c1 should get nothing
- BOOST_CHECK( !gotOne );
-
- queue->cancel(c1);
- queue->cancel(c2);
-}
-
QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
TestMessageStoreOC testStore;
@@ -982,9 +702,9 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
args.setPersistLastNode();
Queue::shared_ptr queue1(new Queue("queue1", true, &testStore ));
- queue1->create(args);
+ queue1->configure(args);
Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
- queue2->create(args);
+ queue2->configure(args);
intrusive_ptr<Message> msg1 = create_message("e", "A");
@@ -1070,7 +790,7 @@ not requeued to the store.
Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
intrusive_ptr<Message> received;
- queue1->create(args);
+ queue1->configure(args);
// check requeue 1
intrusive_ptr<Message> msg1 = create_message("e", "C");
@@ -1150,40 +870,28 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg02(msg02);
- {
- ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException);
- }
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException);
msg02->tryReleaseContent();
BOOST_CHECK_EQUAL(msg02->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
DeliverableMessage dmsg03(msg03);
- {
- ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException);
- }
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException);
msg03->tryReleaseContent();
BOOST_CHECK_EQUAL(msg03->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content
DeliverableMessage dmsg04(msg04);
- {
- ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException);
- }
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException);
msg04->tryReleaseContent();
BOOST_CHECK_EQUAL(msg04->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content
DeliverableMessage dmsg05(msg05);
- {
- ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException);
- }
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException);
msg05->tryReleaseContent();
BOOST_CHECK_EQUAL(msg05->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());