From e54ef8dc737196343ad974c91a86681efca5fb14 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 30 Mar 2012 19:36:48 +0000 Subject: QPID-3603: Keep acquired messages on queues for all queue types. Updated priority and lvq queues to keep acquired messages, and supply them to browsers if requested. This is necessary so replicating subscriptions can back-up these queue types without message loss. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1307582 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Fairshare.cpp | 87 +++++++-------- qpid/cpp/src/qpid/broker/Fairshare.h | 19 ++-- qpid/cpp/src/qpid/broker/LegacyLVQ.cpp | 29 +++-- qpid/cpp/src/qpid/broker/LegacyLVQ.h | 1 + qpid/cpp/src/qpid/broker/MessageMap.cpp | 78 ++++++++----- qpid/cpp/src/qpid/broker/MessageMap.h | 2 +- qpid/cpp/src/qpid/broker/PriorityQueue.cpp | 169 +++++++++++++---------------- qpid/cpp/src/qpid/broker/PriorityQueue.h | 26 ++--- qpid/cpp/src/qpid/cluster/Connection.cpp | 4 +- qpid/cpp/src/qpid/cluster/Connection.h | 2 +- qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 6 +- qpid/cpp/src/tests/brokertest.py | 11 +- qpid/cpp/src/tests/ha_tests.py | 52 ++++++++- qpid/cpp/xml/cluster.xml | 3 +- 14 files changed, 277 insertions(+), 212 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Fairshare.cpp b/qpid/cpp/src/qpid/broker/Fairshare.cpp index 7cdad1a44f..c30b64c7ae 100644 --- a/qpid/cpp/src/qpid/broker/Fairshare.cpp +++ b/qpid/cpp/src/qpid/broker/Fairshare.cpp @@ -23,6 +23,7 @@ #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" +#include #include #include #include @@ -32,7 +33,7 @@ namespace broker { Fairshare::Fairshare(size_t levels, uint limit) : PriorityQueue(levels), - limits(levels, limit), priority(levels-1), count(0) {} + limits(levels, limit), counts(levels, 0) {} void Fairshare::setLimit(size_t level, uint limit) @@ -40,70 +41,63 @@ void Fairshare::setLimit(size_t level, uint limit) limits[level] = limit; } -bool Fairshare::limitReached() -{ - uint l = limits[priority]; - return l && ++count > l; -} - -uint Fairshare::currentLevel() -{ - if (limitReached()) { - return nextLevel(); - } else { - return priority; - } -} - -uint Fairshare::nextLevel() -{ - count = 1; - if (priority) --priority; - else priority = levels-1; - return priority; -} - bool Fairshare::isNull() { for (int i = 0; i < levels; i++) if (limits[i]) return false; return true; } -bool Fairshare::getState(uint& p, uint& c) const +bool Fairshare::getState(qpid::framing::FieldTable& state) const { - p = priority; - c = count; + for (int i = 0; i < levels; i++) { + if (counts[i]) { + std::string key = (boost::format("fairshare-count-%1%") % i).str(); + state.setInt(key, counts[i]); + } + } return true; } -bool Fairshare::setState(uint p, uint c) +bool Fairshare::checkLevel(uint level) { - priority = p; - count = c; - return true; + if (!limits[level] || counts[level] < limits[level]) { + counts[level]++; + return true; + } else { + return false; + } } -bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages) +bool Fairshare::consume(QueuedMessage& message) { - const uint start = p = currentLevel(); - do { - if (!messages[p].empty()) return true; - } while ((p = nextLevel()) != start); - return false; + for (Available::iterator i = available.begin(); i != available.end(); ++i) { + QueuedMessage* next = *i; + if (checkLevel(getPriorityLevel(*next))) { + messages[next->position].status = QueuedMessage::ACQUIRED; + message = *next; + available.erase(i); + return true; + } + } + if (!available.empty()) { + std::fill(counts.begin(), counts.end(), 0);//reset counts + return consume(message); + } else { + return false; + } } - -bool Fairshare::getState(const Messages& m, uint& priority, uint& count) +bool Fairshare::getState(const Messages& m, qpid::framing::FieldTable& counts) { const Fairshare* fairshare = dynamic_cast(&m); - return fairshare && fairshare->getState(priority, count); + return fairshare && fairshare->getState(counts); } -bool Fairshare::setState(Messages& m, uint priority, uint count) +bool Fairshare::setState(Messages& m, const qpid::framing::FieldTable& counts) { Fairshare* fairshare = dynamic_cast(&m); - return fairshare && fairshare->setState(priority, count); + return fairshare && fairshare->setState(counts); } int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::vector& keys) @@ -136,7 +130,14 @@ int getIntegerSettingForKey(const qpid::framing::FieldTable& settings, const std { return getIntegerSetting(settings, boost::assign::list_of(key)); } - +bool Fairshare::setState(const qpid::framing::FieldTable& state) +{ + for (int i = 0; i < levels; i++) { + std::string key = (boost::format("fairshare-count-%1%") % i).str(); + counts[i] = state.isSet(key) ? getIntegerSettingForKey(state, key) : 0; + } + return true; +} int getSetting(const qpid::framing::FieldTable& settings, const std::vector& keys, int minvalue, int maxvalue) { return std::max(minvalue,std::min(getIntegerSetting(settings, keys), maxvalue)); diff --git a/qpid/cpp/src/qpid/broker/Fairshare.h b/qpid/cpp/src/qpid/broker/Fairshare.h index 1b25721e0c..dfcbdf280e 100644 --- a/qpid/cpp/src/qpid/broker/Fairshare.h +++ b/qpid/cpp/src/qpid/broker/Fairshare.h @@ -22,6 +22,7 @@ * */ #include "qpid/broker/PriorityQueue.h" +#include namespace qpid { namespace framing { @@ -38,23 +39,19 @@ class Fairshare : public PriorityQueue { public: Fairshare(size_t levels, uint limit); - bool getState(uint& priority, uint& count) const; - bool setState(uint priority, uint count); + bool getState(qpid::framing::FieldTable& counts) const; + bool setState(const qpid::framing::FieldTable& counts); void setLimit(size_t level, uint limit); bool isNull(); + bool consume(QueuedMessage&); static std::auto_ptr create(const qpid::framing::FieldTable& settings); - static bool getState(const Messages&, uint& priority, uint& count); - static bool setState(Messages&, uint priority, uint count); + static bool getState(const Messages&, qpid::framing::FieldTable& counts); + static bool setState(Messages&, const qpid::framing::FieldTable& counts); private: std::vector limits; + std::vector counts; - uint priority; - uint count; - - uint currentLevel(); - uint nextLevel(); - bool limitReached(); - bool findFrontLevel(uint& p, PriorityLevels&); + bool checkLevel(uint level); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp index 49c0a32c19..f1deddf4c8 100644 --- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp @@ -28,16 +28,26 @@ namespace broker { LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {} void LegacyLVQ::setNoBrowse(bool b) -{ +{ noBrowse = b; } +bool LegacyLVQ::deleted(const QueuedMessage& message) +{ + Ordering::iterator i = messages.find(message.position); + if (i != messages.end() && i->second.payload == message.payload) { + erase(i); + return true; + } else { + return false; + } +} bool LegacyLVQ::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { Ordering::iterator i = messages.find(position); - if (i != messages.end() && i->second.payload == message.payload) { + if (i != messages.end() && i->second.payload == message.payload && i->second.status == QueuedMessage::AVAILABLE) { + i->second.status = QueuedMessage::ACQUIRED; message = i->second; - erase(i); return true; } else { return false; @@ -66,12 +76,17 @@ bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed) } const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update) -{ +{ //add the new message into the original position of the replaced message Ordering::iterator i = messages.find(original.position); - i->second = update; - i->second.position = original.position; - return i->second; + if (i != messages.end()) { + i->second = update; + i->second.position = original.position; + return i->second; + } else { + QPID_LOG(error, "Failed to replace message at " << original.position); + return update; + } } void LegacyLVQ::removeIf(Predicate p) diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h index 695e51131d..9355069f37 100644 --- a/qpid/cpp/src/qpid/broker/LegacyLVQ.h +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.h @@ -40,6 +40,7 @@ class LegacyLVQ : public MessageMap { public: LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0); + bool deleted(const QueuedMessage&); bool acquire(const framing::SequenceNumber&, QueuedMessage&); bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); bool push(const QueuedMessage& added, QueuedMessage& removed); diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp index 048df45434..9b164d4e5c 100644 --- a/qpid/cpp/src/qpid/broker/MessageMap.cpp +++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/MessageMap.h" #include "qpid/broker/QueuedMessage.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { @@ -27,7 +28,16 @@ namespace { const std::string EMPTY; } -bool MessageMap::deleted(const QueuedMessage&) { return true; } +bool MessageMap::deleted(const QueuedMessage& message) +{ + Ordering::iterator i = messages.find(message.position); + if (i != messages.end()) { + erase(i); + return true; + } else { + return false; + } +} std::string MessageMap::getKey(const QueuedMessage& message) { @@ -38,30 +48,32 @@ std::string MessageMap::getKey(const QueuedMessage& message) size_t MessageMap::size() { - return messages.size(); + size_t count(0); + for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->second.status == QueuedMessage::AVAILABLE) ++count; + } + return count; } bool MessageMap::empty() { - return messages.empty(); + return size() == 0;//TODO: more efficient implementation } void MessageMap::release(const QueuedMessage& message) { - std::string key = getKey(message); - Index::iterator i = index.find(key); - if (i == index.end()) { - index[key] = message; - messages[message.position] = message; - } //else message has already been replaced + Ordering::iterator i = messages.find(message.position); + if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) { + i->second.status = QueuedMessage::AVAILABLE; + } } bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { Ordering::iterator i = messages.find(position); - if (i != messages.end()) { + if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { + i->second.status = QueuedMessage::ACQUIRED; message = i->second; - erase(i); return true; } else { return false; @@ -71,7 +83,7 @@ bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message) { Ordering::iterator i = messages.find(position); - if (i != messages.end()) { + if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { message = i->second; return true; } else { @@ -79,10 +91,10 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me } } -bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool) +bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) { Ordering::iterator i = messages.lower_bound(position+1); - if (i != messages.end()) { + if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) { message = i->second; return true; } else { @@ -92,14 +104,14 @@ bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& bool MessageMap::consume(QueuedMessage& message) { - Ordering::iterator i = messages.begin(); - if (i != messages.end()) { - message = i->second; - erase(i); - return true; - } else { - return false; + for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->second.status == QueuedMessage::AVAILABLE) { + i->second.status = QueuedMessage::ACQUIRED; + message = i->second; + return true; + } } + return false; } const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update) @@ -115,12 +127,17 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) if (result.second) { //there was no previous message for this key; nothing needs to //be removed, just add the message into its correct position - messages[added.position] = added; + QueuedMessage& a = messages[added.position]; + a = added; + a.status = QueuedMessage::AVAILABLE; + QPID_LOG(debug, "Added message at " << a.position); return false; } else { //there is already a message with that key which needs to be replaced removed = result.first->second; result.first->second = replace(result.first->second, added); + result.first->second.status = QueuedMessage::AVAILABLE; + QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first); return true; } } @@ -128,15 +145,24 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) void MessageMap::foreach(Functor f) { for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { - f(i->second); + if (i->second.status == QueuedMessage::AVAILABLE) f(i->second); } } void MessageMap::removeIf(Predicate p) { - for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) { - if (p(i->second)) { - erase(i); + for (Ordering::iterator i = messages.begin(); i != messages.end();) { + if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) { + index.erase(getKey(i->second)); + //Note: Removing from messages means that the subsequent + //call to deleted() for the same message will return + //false. At present that is not a problem. If this were + //changed to hold onto the message until dequeued + //(e.g. with REMOVED state), then the erase() below would + //need to take that into account. + messages.erase(i++); + } else { + ++i; } } } diff --git a/qpid/cpp/src/qpid/broker/MessageMap.h b/qpid/cpp/src/qpid/broker/MessageMap.h index d1b8217f9b..a668450250 100644 --- a/qpid/cpp/src/qpid/broker/MessageMap.h +++ b/qpid/cpp/src/qpid/broker/MessageMap.h @@ -43,7 +43,7 @@ class MessageMap : public Messages size_t size(); bool empty(); - bool deleted(const QueuedMessage&); + virtual bool deleted(const QueuedMessage&); void release(const QueuedMessage&); virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp index d807ef22b1..da52675a29 100644 --- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp @@ -28,120 +28,125 @@ namespace qpid { namespace broker { PriorityQueue::PriorityQueue(int l) : - levels(l), - messages(levels, Deque()), - frontLevel(0), haveFront(false), cached(false) {} + levels(l) {} -bool PriorityQueue::deleted(const QueuedMessage&) { return true; } - -size_t PriorityQueue::size() +bool PriorityQueue::deleted(const QueuedMessage& message) { - size_t total(0); - for (int i = 0; i < levels; ++i) { - total += messages[i].size(); + Index::iterator i = messages.find(message.position); + if (i != messages.end()) { + //remove from available list if necessary + if (i->second.status == QueuedMessage::AVAILABLE) { + Available::iterator j = std::find(available.begin(), available.end(), &i->second); + if (j != available.end()) available.erase(j); + } + //remove from messages map + messages.erase(i); + return true; + } else { + return false; } - return total; } -void PriorityQueue::release(const QueuedMessage& message) +size_t PriorityQueue::size() { - uint p = getPriorityLevel(message); - messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message); - clearCache(); + return available.size(); } -bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove) +void PriorityQueue::release(const QueuedMessage& message) { - QueuedMessage comp; - comp.position = position; - for (int i = 0; i < levels; ++i) { - if (!messages[i].empty()) { - unsigned long diff = position.getValue() - messages[i].front().position.getValue(); - long maxEnd = diff < messages[i].size() ? diff : messages[i].size(); - Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp); - if (l != messages[i].end() && l->position == position) { - message = *l; - if (remove) { - messages[i].erase(l); - clearCache(); - } - return true; - } - } + Index::iterator i = messages.find(message.position); + if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) { + i->second.status = QueuedMessage::AVAILABLE; + //insert message back into the correct place in available queue, based on priority: + Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2)); + available.insert(j, &i->second); } - return false; } bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { - return find(position, message, true); + Index::iterator i = messages.find(position); + if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { + i->second.status = QueuedMessage::ACQUIRED; + message = i->second; + //remove it from available list (could make this faster by using ordering): + Available::iterator j = std::find(available.begin(), available.end(), &i->second); + assert(j != available.end()); + available.erase(j); + return true; + } else { + return false; + } } bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message) { - return find(position, message, false); + Index::iterator i = messages.find(position); + if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { + message = i->second; + return true; + } else { + return false; + } } -bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool) +bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) { - QueuedMessage match; - match.position = position+1; - Deque::iterator lowest; - bool found = false; - for (int i = 0; i < levels; ++i) { - Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match); - if (m != messages[i].end()) { - if (m->position == match.position) { - message = *m; - return true; - } else if (!found || m->position < lowest->position) { - lowest = m; - found = true; - } - } - } - if (found) { - message = *lowest; + Index::iterator i = messages.lower_bound(position+1); + if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) { + message = i->second; + return true; + } else { + return false; } - return found; } bool PriorityQueue::consume(QueuedMessage& message) { - if (checkFront()) { - message = messages[frontLevel].front(); - messages[frontLevel].pop_front(); - clearCache(); + if (!available.empty()) { + QueuedMessage* next = available.front(); + messages[next->position].status = QueuedMessage::ACQUIRED; + message = *next; + available.pop_front(); return true; } else { return false; } } +bool PriorityQueue::compare(const QueuedMessage* a, const QueuedMessage* b) const +{ + int priorityA = getPriorityLevel(*a); + int priorityB = getPriorityLevel(*b); + if (priorityA == priorityB) return a->position < b->position; + else return priorityA > priorityB; +} + bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { - messages[getPriorityLevel(added)].push_back(added); - clearCache(); + Index::iterator i = messages.insert(Index::value_type(added.position, added)).first; + i->second.status = QueuedMessage::AVAILABLE; + //insert message into the correct place in available queue, based on priority: + Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2)); + available.insert(j, &i->second); return false;//adding a message never causes one to be removed for deque } void PriorityQueue::foreach(Functor f) { - for (int i = 0; i < levels; ++i) { - std::for_each(messages[i].begin(), messages[i].end(), f); + for (Available::iterator i = available.begin(); i != available.end(); ++i) { + f(**i); } } void PriorityQueue::removeIf(Predicate p) { - for (int priority = 0; priority < levels; ++priority) { - for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) { - if (p(*i)) { - i = messages[priority].erase(i); - clearCache(); - } else { - ++i; - } + for (Available::iterator i = available.begin(); i != available.end();) { + if (p(**i)) { + messages[(*i)->position].status = QueuedMessage::REMOVED; + i = available.erase(i); + } else { + ++i; } } } @@ -156,30 +161,6 @@ uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const return std::min(priority - firstLevel, (uint)levels-1); } -void PriorityQueue::clearCache() -{ - cached = false; -} - -bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m) -{ - for (int p = levels-1; p >= 0; --p) { - if (!m[p].empty()) { - l = p; - return true; - } - } - return false; -} - -bool PriorityQueue::checkFront() -{ - if (!cached) { - haveFront = findFrontLevel(frontLevel, messages); - cached = true; - } - return haveFront; -} uint PriorityQueue::getPriority(const QueuedMessage& message) { diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h index 67c31468d2..590cf68003 100644 --- a/qpid/cpp/src/qpid/broker/PriorityQueue.h +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h @@ -23,8 +23,8 @@ */ #include "qpid/broker/Messages.h" #include "qpid/sys/IntegerTypes.h" -#include -#include +#include +#include namespace qpid { namespace broker { @@ -46,28 +46,22 @@ class PriorityQueue : public Messages bool acquire(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); - bool consume(QueuedMessage&); + virtual bool consume(QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); void foreach(Functor); void removeIf(Predicate); static uint getPriority(const QueuedMessage&); protected: - typedef std::deque Deque; - typedef std::vector PriorityLevels; - virtual bool findFrontLevel(uint& p, PriorityLevels&); + typedef std::list Available; + typedef std::map Index; const int levels; - private: - PriorityLevels messages; - uint frontLevel; - bool haveFront; - bool cached; - - bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove); - uint getPriorityLevel(const QueuedMessage&) const; - void clearCache(); - bool checkFront(); + Index messages; + Available available; + + bool compare(const QueuedMessage* a, const QueuedMessage* b) const; + uint getPriorityLevel(const QueuedMessage& m) const; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 3d5a7be1c3..4f72501d52 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -585,9 +585,9 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi findQueue(qname)->setPosition(position); } -void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count) +void Connection::queueFairshareState(const std::string& qname, const framing::FieldTable& counts) { - if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) { + if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), counts)) { QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies."); } } diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 920c4937db..14f7fb2e1a 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -156,7 +156,7 @@ class Connection : uint32_t credit); void queuePosition(const std::string&, const framing::SequenceNumber&); - void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); + void queueFairshareState(const std::string&, const framing::FieldTable& count); void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&); void txStart(); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 3a3582d032..8460b7c7bb 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -389,9 +389,9 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1)); q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1)); ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition()); - uint priority, count; - if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) { - ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count); + qpid::framing::FieldTable counts; + if (qpid::broker::Fairshare::getState(q->getMessages(), counts)) { + ClusterConnectionProxy(s).queueFairshareState(q->getName(), counts); } ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge()); diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 3207a51b79..ccf25f35b5 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -513,18 +513,21 @@ class BrokerTest(TestCase): finally: r.close() return contents - def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content): + def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" actual_contents = self.browse(session, queue, timeout, transform=transform) - self.assertEqual(expect_contents, actual_contents) + if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) + self.assertEqual(expect_contents, actual_contents, msg) - def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content): + def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg=None): """Wait up to timeout for contents of queue to match expect_contents""" test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents retry(test, timeout, delay) - self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform)) + actual_contents = self.browse(session, queue, 0, transform=transform) + if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) + self.assertEqual(expect_contents, actual_contents, msg) def join(thread, timeout=10): thread.join(timeout) diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 822e07c702..e9d44c21e0 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -100,8 +100,8 @@ class HaCluster(object): def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value -class ShortTests(BrokerTest): - """Short HA functionality tests.""" +class HaTest(BrokerTest): + """Base class for HA test cases, defines convenience functions""" # Wait for an address to become valid. def wait(self, session, address): @@ -135,6 +135,9 @@ class ShortTests(BrokerTest): """Connect to a backup broker as an admin connection""" return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs) +class ReplicationTests(HaTest): + """Correctness tests for HA replication.""" + def test_replication(self): """Test basic replication of configuration and messages before and after backup has connected""" @@ -491,6 +494,51 @@ class ShortTests(BrokerTest): # self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority) self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority) + def test_backup_acquired(self): + """Verify that acquired messages are backed up, for all queue types.""" + class Test: + def __init__(self, queue, arguments, expect): + self.queue = queue + self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%( + self.queue, ",".join(arguments + ["'qpid.replicate':all"])) + self.expect = [str(i) for i in expect] + + def send(self, connection): + """Send messages, then acquire one but don't acknowledge""" + s = connection.session() + for m in range(10): s.sender(self.address).send(str(m)) + s.receiver(self.address).fetch() + + def wait(self, brokertest, backup): + brokertest.wait_backup(backup, self.queue) + + def verify(self, brokertest, backup): + brokertest.assert_browse_backup( + backup, self.queue, self.expect, msg=self.queue) + + tests = [ + Test("plain",[],range(10)), + Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)), + Test("priority",["'qpid.priorities':10"], range(10)), + Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)), + Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9]) + ] + + primary = HaBroker(self, name="primary") + primary.promote() + backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) + c = primary.connect() + for t in tests: t.send(c) # Send messages, leave one unacknowledged. + + backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) + # Wait for backups to catch up. + for t in tests: + t.wait(self, backup1) + t.wait(self, backup2) + # Verify acquired message was replicated + for t in tests: t.verify(self, backup1) + for t in tests: t.verify(self, backup2) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 7b3f2fe63b..580451c5b5 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -304,8 +304,7 @@ - - + -- cgit v1.2.1