diff options
author | Alan Conway <aconway@apache.org> | 2012-05-28 18:24:06 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-05-28 18:24:06 +0000 |
commit | a78cef0941374e4aa27c9025fbb3b5a43686b8fd (patch) | |
tree | f82688402704057000bfa13ce775fb8ad2f036ff /cpp/src | |
parent | 2fcdb5bc17a6ae502a3af7df4ba66dd7adb79dfa (diff) | |
download | qpid-python-a78cef0941374e4aa27c9025fbb3b5a43686b8fd.tar.gz |
QPID-3603: Allow Queue::setPosition() to truncate the queue.
In the new HA code a backup may sometimes be ahead of the new primary after a
fail-over. In that case the backup truncates it's queues to the same position
as the primary so it can continue replicating.
(Note the assertions added to verify setPosition showed up a minor bug in the
old cluster code, which was leaving messages on the cluster update queue after
an update. This patch fixes the issue.)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1343347 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/MessageDeque.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageDeque.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageMap.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageMap.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Messages.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PriorityQueue.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PriorityQueue.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 85 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 18 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 1 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 246 |
12 files changed, 310 insertions, 92 deletions
diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp index f70c996975..474e4139bd 100644 --- a/cpp/src/qpid/broker/MessageDeque.cpp +++ b/cpp/src/qpid/broker/MessageDeque.cpp @@ -173,6 +173,26 @@ void MessageDeque::updateAcquired(const QueuedMessage& acquired) } } +namespace { +bool isNotDeleted(const QueuedMessage& qm) { return qm.status != QueuedMessage::DELETED; } +} // namespace + +void MessageDeque::setPosition(const framing::SequenceNumber& n) { + size_t i = index(n+1); + if (i >= messages.size()) return; // Nothing to do. + + // Assertion to verify the precondition: no messaages after n. + assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) == + messages.end()); + messages.erase(messages.begin()+i, messages.end()); + if (head >= messages.size()) head = messages.size() - 1; + // Re-count the available messages + available = 0; + for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->status == QueuedMessage::AVAILABLE) ++available; + } +} + void MessageDeque::clean() { while (messages.size() && messages.front().status == QueuedMessage::DELETED) { diff --git a/cpp/src/qpid/broker/MessageDeque.h b/cpp/src/qpid/broker/MessageDeque.h index 9b53716d4e..c5670b2a72 100644 --- a/cpp/src/qpid/broker/MessageDeque.h +++ b/cpp/src/qpid/broker/MessageDeque.h @@ -44,7 +44,7 @@ class MessageDeque : public Messages bool consume(QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); void updateAcquired(const QueuedMessage& acquired); - + void setPosition(const framing::SequenceNumber&); void foreach(Functor); void removeIf(Predicate); diff --git a/cpp/src/qpid/broker/MessageMap.cpp b/cpp/src/qpid/broker/MessageMap.cpp index 9b164d4e5c..d6702a9336 100644 --- a/cpp/src/qpid/broker/MessageMap.cpp +++ b/cpp/src/qpid/broker/MessageMap.cpp @@ -21,6 +21,7 @@ #include "qpid/broker/MessageMap.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/log/Statement.h" +#include <algorithm> namespace qpid { namespace broker { @@ -130,18 +131,24 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) QueuedMessage& a = messages[added.position]; a = added; a.status = QueuedMessage::AVAILABLE; - QPID_LOG(debug, "Added message at " << a.position); + QPID_LOG(debug, "Added message " << a); return false; } else { //there is already a message with that key which needs to be replaced removed = result.first->second; result.first->second = replace(result.first->second, added); result.first->second.status = QueuedMessage::AVAILABLE; - QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first); + QPID_LOG(debug, "Displaced message " << removed << " with " << result.first->second << ": " << result.first->first); return true; } } +void MessageMap::setPosition(const framing::SequenceNumber& seq) { + // Nothing to do, just assert that the precondition is respected and there + // are no undeleted messages after seq. + assert(messages.empty() || (--messages.end())->first <= seq); +} + void MessageMap::foreach(Functor f) { for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { diff --git a/cpp/src/qpid/broker/MessageMap.h b/cpp/src/qpid/broker/MessageMap.h index a668450250..1f0481cb6b 100644 --- a/cpp/src/qpid/broker/MessageMap.h +++ b/cpp/src/qpid/broker/MessageMap.h @@ -6,7 +6,7 @@ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file +o * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at @@ -50,6 +50,7 @@ class MessageMap : public Messages virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); bool consume(QueuedMessage&); virtual bool push(const QueuedMessage& added, QueuedMessage& removed); + void setPosition(const framing::SequenceNumber&); void foreach(Functor); virtual void removeIf(Predicate); diff --git a/cpp/src/qpid/broker/Messages.h b/cpp/src/qpid/broker/Messages.h index 61e9fa110a..45f5e6cd81 100644 --- a/cpp/src/qpid/broker/Messages.h +++ b/cpp/src/qpid/broker/Messages.h @@ -21,6 +21,7 @@ * under the License. * */ +#include "qpid/framing/SequenceNumber.h" #include <boost/function.hpp> namespace qpid { @@ -101,14 +102,22 @@ class Messages virtual void updateAcquired(const QueuedMessage&) { } /** + * Set the position of the back of the queue. Next message enqueued will be n+1. + *@pre Any messages with seq > n must already be dequeued. + */ + virtual void setPosition(const framing::SequenceNumber& /*n*/) = 0; + + /** * Apply, the functor to each message held */ + virtual void foreach(Functor) = 0; /** * Remove every message held that for which the specified * predicate returns true */ virtual void removeIf(Predicate) = 0; + private: }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PriorityQueue.cpp b/cpp/src/qpid/broker/PriorityQueue.cpp index ab5ec7235a..9a0fead744 100644 --- a/cpp/src/qpid/broker/PriorityQueue.cpp +++ b/cpp/src/qpid/broker/PriorityQueue.cpp @@ -121,6 +121,10 @@ void PriorityQueue::updateAcquired(const QueuedMessage& acquired) { fifo.updateAcquired(acquired); } +void PriorityQueue::setPosition(const framing::SequenceNumber& n) { + fifo.setPosition(n); +} + void PriorityQueue::foreach(Functor f) { fifo.foreach(f); diff --git a/cpp/src/qpid/broker/PriorityQueue.h b/cpp/src/qpid/broker/PriorityQueue.h index 8628745db1..301367358b 100644 --- a/cpp/src/qpid/broker/PriorityQueue.h +++ b/cpp/src/qpid/broker/PriorityQueue.h @@ -52,6 +52,7 @@ class PriorityQueue : public Messages bool consume(QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); void updateAcquired(const QueuedMessage& acquired); + void setPosition(const framing::SequenceNumber&); void foreach(Functor); void removeIf(Predicate); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 3d90490186..9df0ebf313 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -588,21 +588,51 @@ QueuedMessage Queue::get(){ return msg; } -bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message) +namespace { +bool collectIf(QueuedMessage& qm, Messages::Predicate predicate, + std::deque<QueuedMessage>& collection) { - if (message.payload->hasExpired()) { - expired.push_back(message); + if (predicate(qm)) { + collection.push_back(qm); return true; } else { return false; } } +bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); } +} // namespace + +void Queue::dequeueIf(Messages::Predicate predicate, + std::deque<QueuedMessage>& dequeued) +{ + { + Mutex::ScopedLock locker(messageLock); + messages->removeIf(boost::bind(&collectIf, _1, predicate, boost::ref(dequeued))); + } + if (!dequeued.empty()) { + if (mgmtObject) { + mgmtObject->inc_acquires(dequeued.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(dequeued.size()); + } + for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin(); + i != dequeued.end(); ++i) { + { + // KAG: should be safe to retake lock after the removeIf, since + // no other thread can touch these messages after the removeIf() call + Mutex::ScopedLock locker(messageLock); + observeAcquire(*i, locker); + } + dequeue( 0, *i ); + } + } +} + /** *@param lapse: time since the last purgeExpired */ -void Queue::purgeExpired(qpid::sys::Duration lapse) -{ +void Queue::purgeExpired(sys::Duration lapse) { //As expired messages are discarded during dequeue also, only //bother explicitly expiring if the rate of dequeues since last //attempt is less than one per second. @@ -610,37 +640,18 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) dequeueSincePurge -= count; int seconds = int64_t(lapse)/qpid::sys::TIME_SEC; if (seconds == 0 || count / seconds < 1) { - std::deque<QueuedMessage> expired; - { - Mutex::ScopedLock locker(messageLock); - messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); - } - - if (!expired.empty()) { + std::deque<QueuedMessage> dequeued; + dequeueIf(boost::bind(&isExpired, _1), dequeued); + if (dequeued.size()) { if (mgmtObject) { - mgmtObject->inc_acquires(expired.size()); - mgmtObject->inc_discardsTtl(expired.size()); - if (brokerMgmtObject) { - brokerMgmtObject->inc_acquires(expired.size()); - brokerMgmtObject->inc_discardsTtl(expired.size()); - } - } - - for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); - i != expired.end(); ++i) { - { - // KAG: should be safe to retake lock after the removeIf, since - // no other thread can touch these messages after the removeIf() call - Mutex::ScopedLock locker(messageLock); - observeAcquire(*i, locker); - } - dequeue( 0, *i ); + mgmtObject->inc_discardsTtl(dequeued.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsTtl(dequeued.size()); } } } } - namespace { // for use with purge/move below - collect messages that match a given filter // @@ -1661,8 +1672,22 @@ void Queue::query(qpid::types::Variant::Map& results) const if (allocator) allocator->query(results); } +namespace { +struct After { + framing::SequenceNumber seq; + After(framing::SequenceNumber s) : seq(s) {} + bool operator()(const QueuedMessage& qm) { return qm.position > seq; } +}; +} // namespace + + void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); + if (n < sequence) { + std::deque<QueuedMessage> dequeued; + dequeueIf(After(n), dequeued); + messages->setPosition(n); + } sequence = n; QPID_LOG(trace, "Set position to " << sequence << " on " << getName()); } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 9869a698c1..ed1f63504b 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -175,6 +175,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, void configureImpl(const qpid::framing::FieldTable& settings); void checkNotDeleted(const Consumer::shared_ptr& c); void notifyDeleted(); + void dequeueIf(Messages::Predicate predicate, std::deque<QueuedMessage>& dequeued); public: @@ -375,12 +376,21 @@ class Queue : public boost::enable_shared_from_this<Queue>, std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f); } - /** Set the position sequence number for the next message on the queue. - * Must be >= the current sequence number. - * Used by cluster to replicate queues. + /** + * Set the sequence number for the back of the queue, the + * next message enqueued will be pos+1. + * If pos > getPosition() this creates a gap in the sequence numbers. + * if pos < getPosition() the back of the queue is reset to pos, + * + * The _caller_ must ensure that any messages after pos have been dequeued. + * + * Used by HA/cluster code for queue replication. */ QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos); - /** return current position sequence number for the next message on the queue. + + /** + *@return sequence number for the back of the queue. The next message pushed + * will be at getPosition+1 */ QPID_BROKER_EXTERN framing::SequenceNumber getPosition(); QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>); diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 3754e791f9..081d54ab49 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -524,6 +524,7 @@ broker::QueuedMessage Connection::getUpdateMessage() { boost::shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE); assert(!updateq->isDurable()); broker::QueuedMessage m = updateq->get(); + updateq->dequeue(0, m); if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); return m; } diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 3e705b074e..cdc7429f3b 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -67,7 +67,7 @@ tmodule_LTLIBRARIES= TESTS+=unit_test check_PROGRAMS+=unit_test -unit_test_LDADD=-lboost_unit_test_framework \ +unit_test_LDADD=-lboost_unit_test_framework -lpthread \ $(lib_messaging) $(lib_broker) $(lib_console) $(lib_qmf2) unit_test_SOURCES= unit_test.cpp unit_test.h \ diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index db84614a9b..a481f703ce 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -31,6 +31,8 @@ #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/ExpiryPolicy.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/FieldTable.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/QueueOptions.h" #include "qpid/framing/AMQFrame.h" @@ -40,10 +42,11 @@ #include "qpid/broker/QueueFlowLimit.h" #include <iostream> -#include "boost/format.hpp" - -using std::string; +#include <vector> +#include <boost/format.hpp> +#include <boost/lexical_cast.hpp> +using namespace std; using boost::intrusive_ptr; using namespace qpid; using namespace qpid::broker; @@ -85,7 +88,7 @@ public: Message& getMessage() { return *(msg.get()); } }; -intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) { +intrusive_ptr<Message> createMessage(std::string exchange, std::string routingKey, uint64_t ttl = 0) { intrusive_ptr<Message> msg(new Message()); AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); AMQFrame header((AMQHeaderBody())); @@ -96,6 +99,16 @@ intrusive_ptr<Message> create_message(std::string exchange, std::string routingK return msg; } +intrusive_ptr<Message> contentMessage(string content) { + intrusive_ptr<Message> m(MessageUtils::createMessage()); + MessageUtils::addContent(m, content); + return m; +} + +string getContent(intrusive_ptr<Message> m) { + return m->getFrames().getContent(); +} + QPID_AUTO_TEST_SUITE(QueueTestSuite) QPID_AUTO_TEST_CASE(testAsyncMessage) { @@ -107,7 +120,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) { //Test basic delivery: - intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process queue->process(msg1); sleep(2); @@ -122,7 +135,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) { QPID_AUTO_TEST_CASE(testAsyncMessageCount){ Queue::shared_ptr queue(new Queue("my_test_queue", true)); - intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process queue->process(msg1); @@ -147,9 +160,9 @@ QPID_AUTO_TEST_CASE(testConsumers){ BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount()); //Test basic delivery: - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); queue->deliver(msg1); BOOST_CHECK(queue->dispatch(c1)); @@ -193,9 +206,9 @@ QPID_AUTO_TEST_CASE(testRegistry){ QPID_AUTO_TEST_CASE(testDequeue){ Queue::shared_ptr queue(new Queue("my_queue", true)); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); intrusive_ptr<Message> received; queue->deliver(msg1); @@ -267,9 +280,9 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ Queue::shared_ptr queue(new Queue("my-queue", true)); queue->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); //enqueue 2 messages queue->deliver(msg1); @@ -293,9 +306,9 @@ QPID_AUTO_TEST_CASE(testSeek){ Queue::shared_ptr queue(new Queue("my-queue", true)); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); //enqueue 2 messages queue->deliver(msg1); @@ -319,9 +332,9 @@ QPID_AUTO_TEST_CASE(testSearch){ Queue::shared_ptr queue(new Queue("my-queue", true)); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); //enqueue 2 messages queue->deliver(msg1); @@ -433,10 +446,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); - intrusive_ptr<Message> msg4 = create_message("e", "D"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); + intrusive_ptr<Message> msg4 = createMessage("e", "D"); intrusive_ptr<Message> received; //set deliever match for LVQ a,b,c,a @@ -468,9 +481,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ received = queue->get().payload; BOOST_CHECK_EQUAL(msg3.get(), received.get()); - intrusive_ptr<Message> msg5 = create_message("e", "A"); - intrusive_ptr<Message> msg6 = create_message("e", "B"); - intrusive_ptr<Message> msg7 = create_message("e", "C"); + intrusive_ptr<Message> msg5 = createMessage("e", "A"); + intrusive_ptr<Message> msg6 = createMessage("e", "B"); + intrusive_ptr<Message> msg7 = createMessage("e", "C"); msg5->insertCustomProperty(key,"a"); msg6->insertCustomProperty(key,"b"); msg7->insertCustomProperty(key,"c"); @@ -500,8 +513,8 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){ Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); string key; args.getLVQKey(key); @@ -526,12 +539,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); - intrusive_ptr<Message> msg4 = create_message("e", "D"); - intrusive_ptr<Message> msg5 = create_message("e", "F"); - intrusive_ptr<Message> msg6 = create_message("e", "G"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); + intrusive_ptr<Message> msg4 = createMessage("e", "D"); + intrusive_ptr<Message> msg5 = createMessage("e", "F"); + intrusive_ptr<Message> msg6 = createMessage("e", "G"); //set deliever match for LVQ a,b,c,a @@ -603,8 +616,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){ queue1->configure(args); queue2->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "A"); string key; args.getLVQKey(key); @@ -647,8 +660,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ intrusive_ptr<Message> received; queue1->create(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "A"); // 2 string key; args.getLVQKey(key); @@ -675,7 +688,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) { for (uint i = 0; i < count; i++) { - intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl); + intrusive_ptr<Message> m = createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl); m->computeExpiration(new broker::ExpiryPolicy); queue.deliver(m); } @@ -738,7 +751,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { std::string("b"), std::string("b"), std::string("b"), std::string("c"), std::string("c"), std::string("c") }; for (int i = 0; i < 9; ++i) { - intrusive_ptr<Message> msg = create_message("e", "A"); + intrusive_ptr<Message> msg = createMessage("e", "A"); msg->insertCustomProperty("GROUP-ID", groups[i]); msg->insertCustomProperty("MY-ID", i); queue->deliver(msg); @@ -885,7 +898,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { // Queue = a-2, // Owners= ^C3, - intrusive_ptr<Message> msg = create_message("e", "A"); + intrusive_ptr<Message> msg = createMessage("e", "A"); msg->insertCustomProperty("GROUP-ID", "a"); msg->insertCustomProperty("MY-ID", 9); queue->deliver(msg); @@ -896,7 +909,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { gotOne = queue->dispatch(c2); BOOST_CHECK( !gotOne ); - msg = create_message("e", "A"); + msg = createMessage("e", "A"); msg->insertCustomProperty("GROUP-ID", "b"); msg->insertCustomProperty("MY-ID", 10); queue->deliver(msg); @@ -927,7 +940,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { queue->configure(args); for (int i = 0; i < 3; ++i) { - intrusive_ptr<Message> msg = create_message("e", "A"); + intrusive_ptr<Message> msg = createMessage("e", "A"); // no "GROUP-ID" header msg->insertCustomProperty("MY-ID", i); queue->deliver(msg); @@ -990,7 +1003,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ Queue::shared_ptr queue2(new Queue("queue2", true, &testStore )); queue2->create(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); queue1->deliver(msg1); queue2->deliver(msg1); @@ -1006,7 +1019,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 2u); - intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); queue1->deliver(msg2); queue2->deliver(msg2); @@ -1021,7 +1034,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ queue1->clearLastNodeFailure(); queue2->clearLastNodeFailure(); - intrusive_ptr<Message> msg3 = create_message("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "B"); queue1->deliver(msg3); queue2->deliver(msg3); BOOST_CHECK_EQUAL(testStore.enqCnt, 4u); @@ -1035,8 +1048,8 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ * internal details not part of the queue abstraction. // check requeue 1 - intrusive_ptr<Message> msg4 = create_message("e", "C"); - intrusive_ptr<Message> msg5 = create_message("e", "D"); + intrusive_ptr<Message> msg4 = createMessage("e", "C"); + intrusive_ptr<Message> msg5 = createMessage("e", "D"); framing::SequenceNumber sequence(1); QueuedMessage qmsg1(queue1.get(), msg4, sequence); @@ -1083,8 +1096,8 @@ not requeued to the store. queue1->create(args); // check requeue 1 - intrusive_ptr<Message> msg1 = create_message("e", "C"); - intrusive_ptr<Message> msg2 = create_message("e", "D"); + intrusive_ptr<Message> msg1 = createMessage("e", "C"); + intrusive_ptr<Message> msg2 = createMessage("e", "D"); queue1->recover(msg1); @@ -1116,7 +1129,7 @@ simulate store exception going into last node standing queue1->configure(args); // check requeue 1 - intrusive_ptr<Message> msg1 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "C"); queue1->deliver(msg1); testStore.createError(); @@ -1403,6 +1416,133 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ BOOST_CHECK_EQUAL(5u, tq9->getMessageCount()); } +QPID_AUTO_TEST_CASE(testSetPositionFifo) { + Queue::shared_ptr q(new Queue("my-queue", true)); + BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0)); + for (int i = 0; i < 10; ++i) + q->deliver(contentMessage(boost::lexical_cast<string>(i+1))); + + // Verify the front of the queue + TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(1, c->last.position); // Numbered from 1 + BOOST_CHECK_EQUAL("1", getContent(c->last.payload)); + // Verify the back of the queue + QueuedMessage qm; + BOOST_CHECK_EQUAL(10, q->getPosition()); + BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue + BOOST_CHECK_EQUAL("10", getContent(qm.payload)); + BOOST_CHECK_EQUAL(10, q->getMessageCount()); + + // Using setPosition to introduce a gap in sequence numbers. + q->setPosition(15); + BOOST_CHECK_EQUAL(10, q->getMessageCount()); + BOOST_CHECK_EQUAL(15, q->getPosition()); + BOOST_CHECK(q->find(10, qm)); // Back of the queue + BOOST_CHECK_EQUAL("10", getContent(qm.payload)); + q->deliver(contentMessage("16")); + c->setPosition(9); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(10, c->last.position); + BOOST_CHECK_EQUAL("10", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(16, c->last.position); + BOOST_CHECK_EQUAL("16", getContent(c->last.payload)); + + // Using setPosition to trunkcate the queue + q->setPosition(5); + BOOST_CHECK_EQUAL(5, q->getMessageCount()); + q->deliver(contentMessage("6a")); + c->setPosition(4); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(5, c->last.position); + BOOST_CHECK_EQUAL("5", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(6, c->last.position); + BOOST_CHECK_EQUAL("6a", getContent(c->last.payload)); + BOOST_CHECK(!q->dispatch(c)); // No more messages. +} + +QPID_AUTO_TEST_CASE(testSetPositionLvq) { + Queue::shared_ptr q(new Queue("my-queue", true)); + string key="key"; + framing::FieldTable args; + args.setString("qpid.last_value_queue_key", "key"); + q->configure(args); + + const char* values[] = { "a", "b", "c", "a", "b", "c" }; + for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) { + intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1)); + m->insertCustomProperty(key, values[i]); + q->deliver(m); + } + BOOST_CHECK_EQUAL(3, q->getMessageCount()); + // Verify the front of the queue + TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(4, c->last.position); // Numbered from 1 + BOOST_CHECK_EQUAL("4", getContent(c->last.payload)); + // Verify the back of the queue + QueuedMessage qm; + BOOST_CHECK_EQUAL(6, q->getPosition()); + BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue + BOOST_CHECK_EQUAL("6", getContent(qm.payload)); + + q->setPosition(5); + c->setPosition(4); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(5, c->last.position); // Numbered from 1 + BOOST_CHECK(!q->dispatch(c)); +} + +QPID_AUTO_TEST_CASE(testSetPositionPriority) { + Queue::shared_ptr q(new Queue("my-queue", true)); + framing::FieldTable args; + args.setInt("qpid.priorities", 10); + q->configure(args); + + const int priorities[] = { 1, 2, 3, 2, 1, 3 }; + for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) { + intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1)); + m->getFrames().getHeaders()->get<DeliveryProperties>(true) + ->setPriority(priorities[i]); + q->deliver(m); + } + + // Truncation removes messages in fifo order, not priority order. + q->setPosition(3); + TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in FIFO order + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(1, c->last.position); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(2, c->last.position); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(3, c->last.position); + BOOST_CHECK(!q->dispatch(c)); + + intrusive_ptr<Message> m = contentMessage("4a"); + m->getFrames().getHeaders()->get<DeliveryProperties>(true) + ->setPriority(4); + q->deliver(m); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(4, c->last.position); + BOOST_CHECK_EQUAL("4a", getContent(c->last.payload)); + + // But consumers see priority order + c.reset(new TestConsumer("test", true)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(4, c->last.position); + BOOST_CHECK_EQUAL("4a", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(3, c->last.position); + BOOST_CHECK_EQUAL("3", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(2, c->last.position); + BOOST_CHECK_EQUAL("2", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(1, c->last.position); + BOOST_CHECK_EQUAL("1", getContent(c->last.payload)); +} QPID_AUTO_TEST_SUITE_END() |