diff options
-rw-r--r-- | qpid/cpp/src/qpid/broker/Fairshare.cpp | 87 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Fairshare.h | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LegacyLVQ.cpp | 29 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LegacyLVQ.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageMap.cpp | 78 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageMap.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PriorityQueue.cpp | 169 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PriorityQueue.h | 26 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 11 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 52 | ||||
-rw-r--r-- | qpid/cpp/xml/cluster.xml | 3 |
14 files changed, 277 insertions, 212 deletions
diff --git a/qpid/cpp/src/qpid/broker/Fairshare.cpp b/qpid/cpp/src/qpid/broker/Fairshare.cpp index 7cdad1a44f..c30b64c7ae 100644 --- a/qpid/cpp/src/qpid/broker/Fairshare.cpp +++ b/qpid/cpp/src/qpid/broker/Fairshare.cpp @@ -23,6 +23,7 @@ #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" +#include <algorithm> #include <boost/format.hpp> #include <boost/lexical_cast.hpp> #include <boost/assign/list_of.hpp> @@ -32,7 +33,7 @@ namespace broker { Fairshare::Fairshare(size_t levels, uint limit) : PriorityQueue(levels), - limits(levels, limit), priority(levels-1), count(0) {} + limits(levels, limit), counts(levels, 0) {} void Fairshare::setLimit(size_t level, uint limit) @@ -40,70 +41,63 @@ void Fairshare::setLimit(size_t level, uint limit) limits[level] = limit; } -bool Fairshare::limitReached() -{ - uint l = limits[priority]; - return l && ++count > l; -} - -uint Fairshare::currentLevel() -{ - if (limitReached()) { - return nextLevel(); - } else { - return priority; - } -} - -uint Fairshare::nextLevel() -{ - count = 1; - if (priority) --priority; - else priority = levels-1; - return priority; -} - bool Fairshare::isNull() { for (int i = 0; i < levels; i++) if (limits[i]) return false; return true; } -bool Fairshare::getState(uint& p, uint& c) const +bool Fairshare::getState(qpid::framing::FieldTable& state) const { - p = priority; - c = count; + for (int i = 0; i < levels; i++) { + if (counts[i]) { + std::string key = (boost::format("fairshare-count-%1%") % i).str(); + state.setInt(key, counts[i]); + } + } return true; } -bool Fairshare::setState(uint p, uint c) +bool Fairshare::checkLevel(uint level) { - priority = p; - count = c; - return true; + if (!limits[level] || counts[level] < limits[level]) { + counts[level]++; + return true; + } else { + return false; + } } -bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages) +bool Fairshare::consume(QueuedMessage& message) { - const uint start = p = currentLevel(); - do { - if (!messages[p].empty()) return true; - } while ((p = nextLevel()) != start); - return false; + for (Available::iterator i = available.begin(); i != available.end(); ++i) { + QueuedMessage* next = *i; + if (checkLevel(getPriorityLevel(*next))) { + messages[next->position].status = QueuedMessage::ACQUIRED; + message = *next; + available.erase(i); + return true; + } + } + if (!available.empty()) { + std::fill(counts.begin(), counts.end(), 0);//reset counts + return consume(message); + } else { + return false; + } } - -bool Fairshare::getState(const Messages& m, uint& priority, uint& count) +bool Fairshare::getState(const Messages& m, qpid::framing::FieldTable& counts) { const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m); - return fairshare && fairshare->getState(priority, count); + return fairshare && fairshare->getState(counts); } -bool Fairshare::setState(Messages& m, uint priority, uint count) +bool Fairshare::setState(Messages& m, const qpid::framing::FieldTable& counts) { Fairshare* fairshare = dynamic_cast<Fairshare*>(&m); - return fairshare && fairshare->setState(priority, count); + return fairshare && fairshare->setState(counts); } int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys) @@ -136,7 +130,14 @@ int getIntegerSettingForKey(const qpid::framing::FieldTable& settings, const std { return getIntegerSetting(settings, boost::assign::list_of<std::string>(key)); } - +bool Fairshare::setState(const qpid::framing::FieldTable& state) +{ + for (int i = 0; i < levels; i++) { + std::string key = (boost::format("fairshare-count-%1%") % i).str(); + counts[i] = state.isSet(key) ? getIntegerSettingForKey(state, key) : 0; + } + return true; +} int getSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys, int minvalue, int maxvalue) { return std::max(minvalue,std::min(getIntegerSetting(settings, keys), maxvalue)); diff --git a/qpid/cpp/src/qpid/broker/Fairshare.h b/qpid/cpp/src/qpid/broker/Fairshare.h index 1b25721e0c..dfcbdf280e 100644 --- a/qpid/cpp/src/qpid/broker/Fairshare.h +++ b/qpid/cpp/src/qpid/broker/Fairshare.h @@ -22,6 +22,7 @@ * */ #include "qpid/broker/PriorityQueue.h" +#include <vector> namespace qpid { namespace framing { @@ -38,23 +39,19 @@ class Fairshare : public PriorityQueue { public: Fairshare(size_t levels, uint limit); - bool getState(uint& priority, uint& count) const; - bool setState(uint priority, uint count); + bool getState(qpid::framing::FieldTable& counts) const; + bool setState(const qpid::framing::FieldTable& counts); void setLimit(size_t level, uint limit); bool isNull(); + bool consume(QueuedMessage&); static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings); - static bool getState(const Messages&, uint& priority, uint& count); - static bool setState(Messages&, uint priority, uint count); + static bool getState(const Messages&, qpid::framing::FieldTable& counts); + static bool setState(Messages&, const qpid::framing::FieldTable& counts); private: std::vector<uint> limits; + std::vector<uint> counts; - uint priority; - uint count; - - uint currentLevel(); - uint nextLevel(); - bool limitReached(); - bool findFrontLevel(uint& p, PriorityLevels&); + bool checkLevel(uint level); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp index 49c0a32c19..f1deddf4c8 100644 --- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp @@ -28,16 +28,26 @@ namespace broker { LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {} void LegacyLVQ::setNoBrowse(bool b) -{ +{ noBrowse = b; } +bool LegacyLVQ::deleted(const QueuedMessage& message) +{ + Ordering::iterator i = messages.find(message.position); + if (i != messages.end() && i->second.payload == message.payload) { + erase(i); + return true; + } else { + return false; + } +} bool LegacyLVQ::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { Ordering::iterator i = messages.find(position); - if (i != messages.end() && i->second.payload == message.payload) { + if (i != messages.end() && i->second.payload == message.payload && i->second.status == QueuedMessage::AVAILABLE) { + i->second.status = QueuedMessage::ACQUIRED; message = i->second; - erase(i); return true; } else { return false; @@ -66,12 +76,17 @@ bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed) } const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update) -{ +{ //add the new message into the original position of the replaced message Ordering::iterator i = messages.find(original.position); - i->second = update; - i->second.position = original.position; - return i->second; + if (i != messages.end()) { + i->second = update; + i->second.position = original.position; + return i->second; + } else { + QPID_LOG(error, "Failed to replace message at " << original.position); + return update; + } } void LegacyLVQ::removeIf(Predicate p) diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h index 695e51131d..9355069f37 100644 --- a/qpid/cpp/src/qpid/broker/LegacyLVQ.h +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.h @@ -40,6 +40,7 @@ class LegacyLVQ : public MessageMap { public: LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0); + bool deleted(const QueuedMessage&); bool acquire(const framing::SequenceNumber&, QueuedMessage&); bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); bool push(const QueuedMessage& added, QueuedMessage& removed); diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp index 048df45434..9b164d4e5c 100644 --- a/qpid/cpp/src/qpid/broker/MessageMap.cpp +++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/MessageMap.h" #include "qpid/broker/QueuedMessage.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { @@ -27,7 +28,16 @@ namespace { const std::string EMPTY; } -bool MessageMap::deleted(const QueuedMessage&) { return true; } +bool MessageMap::deleted(const QueuedMessage& message) +{ + Ordering::iterator i = messages.find(message.position); + if (i != messages.end()) { + erase(i); + return true; + } else { + return false; + } +} std::string MessageMap::getKey(const QueuedMessage& message) { @@ -38,30 +48,32 @@ std::string MessageMap::getKey(const QueuedMessage& message) size_t MessageMap::size() { - return messages.size(); + size_t count(0); + for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->second.status == QueuedMessage::AVAILABLE) ++count; + } + return count; } bool MessageMap::empty() { - return messages.empty(); + return size() == 0;//TODO: more efficient implementation } void MessageMap::release(const QueuedMessage& message) { - std::string key = getKey(message); - Index::iterator i = index.find(key); - if (i == index.end()) { - index[key] = message; - messages[message.position] = message; - } //else message has already been replaced + Ordering::iterator i = messages.find(message.position); + if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) { + i->second.status = QueuedMessage::AVAILABLE; + } } bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { Ordering::iterator i = messages.find(position); - if (i != messages.end()) { + if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { + i->second.status = QueuedMessage::ACQUIRED; message = i->second; - erase(i); return true; } else { return false; @@ -71,7 +83,7 @@ bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message) { Ordering::iterator i = messages.find(position); - if (i != messages.end()) { + if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { message = i->second; return true; } else { @@ -79,10 +91,10 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me } } -bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool) +bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) { Ordering::iterator i = messages.lower_bound(position+1); - if (i != messages.end()) { + if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) { message = i->second; return true; } else { @@ -92,14 +104,14 @@ bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& bool MessageMap::consume(QueuedMessage& message) { - Ordering::iterator i = messages.begin(); - if (i != messages.end()) { - message = i->second; - erase(i); - return true; - } else { - return false; + for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->second.status == QueuedMessage::AVAILABLE) { + i->second.status = QueuedMessage::ACQUIRED; + message = i->second; + return true; + } } + return false; } const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update) @@ -115,12 +127,17 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) if (result.second) { //there was no previous message for this key; nothing needs to //be removed, just add the message into its correct position - messages[added.position] = added; + QueuedMessage& a = messages[added.position]; + a = added; + a.status = QueuedMessage::AVAILABLE; + QPID_LOG(debug, "Added message at " << a.position); return false; } else { //there is already a message with that key which needs to be replaced removed = result.first->second; result.first->second = replace(result.first->second, added); + result.first->second.status = QueuedMessage::AVAILABLE; + QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first); return true; } } @@ -128,15 +145,24 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) void MessageMap::foreach(Functor f) { for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { - f(i->second); + if (i->second.status == QueuedMessage::AVAILABLE) f(i->second); } } void MessageMap::removeIf(Predicate p) { - for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) { - if (p(i->second)) { - erase(i); + for (Ordering::iterator i = messages.begin(); i != messages.end();) { + if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) { + index.erase(getKey(i->second)); + //Note: Removing from messages means that the subsequent + //call to deleted() for the same message will return + //false. At present that is not a problem. If this were + //changed to hold onto the message until dequeued + //(e.g. with REMOVED state), then the erase() below would + //need to take that into account. + messages.erase(i++); + } else { + ++i; } } } diff --git a/qpid/cpp/src/qpid/broker/MessageMap.h b/qpid/cpp/src/qpid/broker/MessageMap.h index d1b8217f9b..a668450250 100644 --- a/qpid/cpp/src/qpid/broker/MessageMap.h +++ b/qpid/cpp/src/qpid/broker/MessageMap.h @@ -43,7 +43,7 @@ class MessageMap : public Messages size_t size(); bool empty(); - bool deleted(const QueuedMessage&); + virtual bool deleted(const QueuedMessage&); void release(const QueuedMessage&); virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp index d807ef22b1..da52675a29 100644 --- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp @@ -28,120 +28,125 @@ namespace qpid { namespace broker { PriorityQueue::PriorityQueue(int l) : - levels(l), - messages(levels, Deque()), - frontLevel(0), haveFront(false), cached(false) {} + levels(l) {} -bool PriorityQueue::deleted(const QueuedMessage&) { return true; } - -size_t PriorityQueue::size() +bool PriorityQueue::deleted(const QueuedMessage& message) { - size_t total(0); - for (int i = 0; i < levels; ++i) { - total += messages[i].size(); + Index::iterator i = messages.find(message.position); + if (i != messages.end()) { + //remove from available list if necessary + if (i->second.status == QueuedMessage::AVAILABLE) { + Available::iterator j = std::find(available.begin(), available.end(), &i->second); + if (j != available.end()) available.erase(j); + } + //remove from messages map + messages.erase(i); + return true; + } else { + return false; } - return total; } -void PriorityQueue::release(const QueuedMessage& message) +size_t PriorityQueue::size() { - uint p = getPriorityLevel(message); - messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message); - clearCache(); + return available.size(); } -bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove) +void PriorityQueue::release(const QueuedMessage& message) { - QueuedMessage comp; - comp.position = position; - for (int i = 0; i < levels; ++i) { - if (!messages[i].empty()) { - unsigned long diff = position.getValue() - messages[i].front().position.getValue(); - long maxEnd = diff < messages[i].size() ? diff : messages[i].size(); - Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp); - if (l != messages[i].end() && l->position == position) { - message = *l; - if (remove) { - messages[i].erase(l); - clearCache(); - } - return true; - } - } + Index::iterator i = messages.find(message.position); + if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) { + i->second.status = QueuedMessage::AVAILABLE; + //insert message back into the correct place in available queue, based on priority: + Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2)); + available.insert(j, &i->second); } - return false; } bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { - return find(position, message, true); + Index::iterator i = messages.find(position); + if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { + i->second.status = QueuedMessage::ACQUIRED; + message = i->second; + //remove it from available list (could make this faster by using ordering): + Available::iterator j = std::find(available.begin(), available.end(), &i->second); + assert(j != available.end()); + available.erase(j); + return true; + } else { + return false; + } } bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message) { - return find(position, message, false); + Index::iterator i = messages.find(position); + if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { + message = i->second; + return true; + } else { + return false; + } } -bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool) +bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) { - QueuedMessage match; - match.position = position+1; - Deque::iterator lowest; - bool found = false; - for (int i = 0; i < levels; ++i) { - Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match); - if (m != messages[i].end()) { - if (m->position == match.position) { - message = *m; - return true; - } else if (!found || m->position < lowest->position) { - lowest = m; - found = true; - } - } - } - if (found) { - message = *lowest; + Index::iterator i = messages.lower_bound(position+1); + if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) { + message = i->second; + return true; + } else { + return false; } - return found; } bool PriorityQueue::consume(QueuedMessage& message) { - if (checkFront()) { - message = messages[frontLevel].front(); - messages[frontLevel].pop_front(); - clearCache(); + if (!available.empty()) { + QueuedMessage* next = available.front(); + messages[next->position].status = QueuedMessage::ACQUIRED; + message = *next; + available.pop_front(); return true; } else { return false; } } +bool PriorityQueue::compare(const QueuedMessage* a, const QueuedMessage* b) const +{ + int priorityA = getPriorityLevel(*a); + int priorityB = getPriorityLevel(*b); + if (priorityA == priorityB) return a->position < b->position; + else return priorityA > priorityB; +} + bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { - messages[getPriorityLevel(added)].push_back(added); - clearCache(); + Index::iterator i = messages.insert(Index::value_type(added.position, added)).first; + i->second.status = QueuedMessage::AVAILABLE; + //insert message into the correct place in available queue, based on priority: + Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2)); + available.insert(j, &i->second); return false;//adding a message never causes one to be removed for deque } void PriorityQueue::foreach(Functor f) { - for (int i = 0; i < levels; ++i) { - std::for_each(messages[i].begin(), messages[i].end(), f); + for (Available::iterator i = available.begin(); i != available.end(); ++i) { + f(**i); } } void PriorityQueue::removeIf(Predicate p) { - for (int priority = 0; priority < levels; ++priority) { - for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) { - if (p(*i)) { - i = messages[priority].erase(i); - clearCache(); - } else { - ++i; - } + for (Available::iterator i = available.begin(); i != available.end();) { + if (p(**i)) { + messages[(*i)->position].status = QueuedMessage::REMOVED; + i = available.erase(i); + } else { + ++i; } } } @@ -156,30 +161,6 @@ uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const return std::min(priority - firstLevel, (uint)levels-1); } -void PriorityQueue::clearCache() -{ - cached = false; -} - -bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m) -{ - for (int p = levels-1; p >= 0; --p) { - if (!m[p].empty()) { - l = p; - return true; - } - } - return false; -} - -bool PriorityQueue::checkFront() -{ - if (!cached) { - haveFront = findFrontLevel(frontLevel, messages); - cached = true; - } - return haveFront; -} uint PriorityQueue::getPriority(const QueuedMessage& message) { diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h index 67c31468d2..590cf68003 100644 --- a/qpid/cpp/src/qpid/broker/PriorityQueue.h +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h @@ -23,8 +23,8 @@ */ #include "qpid/broker/Messages.h" #include "qpid/sys/IntegerTypes.h" -#include <deque> -#include <vector> +#include <list> +#include <map> namespace qpid { namespace broker { @@ -46,28 +46,22 @@ class PriorityQueue : public Messages bool acquire(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); - bool consume(QueuedMessage&); + virtual bool consume(QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); void foreach(Functor); void removeIf(Predicate); static uint getPriority(const QueuedMessage&); protected: - typedef std::deque<QueuedMessage> Deque; - typedef std::vector<Deque> PriorityLevels; - virtual bool findFrontLevel(uint& p, PriorityLevels&); + typedef std::list<QueuedMessage*> Available; + typedef std::map<framing::SequenceNumber, QueuedMessage> Index; const int levels; - private: - PriorityLevels messages; - uint frontLevel; - bool haveFront; - bool cached; - - bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove); - uint getPriorityLevel(const QueuedMessage&) const; - void clearCache(); - bool checkFront(); + Index messages; + Available available; + + bool compare(const QueuedMessage* a, const QueuedMessage* b) const; + uint getPriorityLevel(const QueuedMessage& m) const; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 3d5a7be1c3..4f72501d52 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -585,9 +585,9 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi findQueue(qname)->setPosition(position); } -void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count) +void Connection::queueFairshareState(const std::string& qname, const framing::FieldTable& counts) { - if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) { + if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), counts)) { QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies."); } } diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 920c4937db..14f7fb2e1a 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -156,7 +156,7 @@ class Connection : uint32_t credit); void queuePosition(const std::string&, const framing::SequenceNumber&); - void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); + void queueFairshareState(const std::string&, const framing::FieldTable& count); void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&); void txStart(); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 3a3582d032..8460b7c7bb 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -389,9 +389,9 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1)); q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1)); ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition()); - uint priority, count; - if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) { - ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count); + qpid::framing::FieldTable counts; + if (qpid::broker::Fairshare::getState(q->getMessages(), counts)) { + ClusterConnectionProxy(s).queueFairshareState(q->getName(), counts); } ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge()); diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 3207a51b79..ccf25f35b5 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -513,18 +513,21 @@ class BrokerTest(TestCase): finally: r.close() return contents - def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content): + def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" actual_contents = self.browse(session, queue, timeout, transform=transform) - self.assertEqual(expect_contents, actual_contents) + if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) + self.assertEqual(expect_contents, actual_contents, msg) - def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content): + def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg=None): """Wait up to timeout for contents of queue to match expect_contents""" test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents retry(test, timeout, delay) - self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform)) + actual_contents = self.browse(session, queue, 0, transform=transform) + if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) + self.assertEqual(expect_contents, actual_contents, msg) def join(thread, timeout=10): thread.join(timeout) diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 822e07c702..e9d44c21e0 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -100,8 +100,8 @@ class HaCluster(object): def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value -class ShortTests(BrokerTest): - """Short HA functionality tests.""" +class HaTest(BrokerTest): + """Base class for HA test cases, defines convenience functions""" # Wait for an address to become valid. def wait(self, session, address): @@ -135,6 +135,9 @@ class ShortTests(BrokerTest): """Connect to a backup broker as an admin connection""" return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs) +class ReplicationTests(HaTest): + """Correctness tests for HA replication.""" + def test_replication(self): """Test basic replication of configuration and messages before and after backup has connected""" @@ -491,6 +494,51 @@ class ShortTests(BrokerTest): # self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority) self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority) + def test_backup_acquired(self): + """Verify that acquired messages are backed up, for all queue types.""" + class Test: + def __init__(self, queue, arguments, expect): + self.queue = queue + self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%( + self.queue, ",".join(arguments + ["'qpid.replicate':all"])) + self.expect = [str(i) for i in expect] + + def send(self, connection): + """Send messages, then acquire one but don't acknowledge""" + s = connection.session() + for m in range(10): s.sender(self.address).send(str(m)) + s.receiver(self.address).fetch() + + def wait(self, brokertest, backup): + brokertest.wait_backup(backup, self.queue) + + def verify(self, brokertest, backup): + brokertest.assert_browse_backup( + backup, self.queue, self.expect, msg=self.queue) + + tests = [ + Test("plain",[],range(10)), + Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)), + Test("priority",["'qpid.priorities':10"], range(10)), + Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)), + Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9]) + ] + + primary = HaBroker(self, name="primary") + primary.promote() + backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) + c = primary.connect() + for t in tests: t.send(c) # Send messages, leave one unacknowledged. + + backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) + # Wait for backups to catch up. + for t in tests: + t.wait(self, backup1) + t.wait(self, backup2) + # Verify acquired message was replicated + for t in tests: t.verify(self, backup1) + for t in tests: t.verify(self, backup2) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 7b3f2fe63b..580451c5b5 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -304,8 +304,7 @@ <!-- Set the fairshare delivery related state of a replicated queue. --> <control name="queue-fairshare-state" code="0x38"> <field name="queue" type="str8"/> - <field name="position" type="uint8"/> - <field name="count" type="uint8"/> + <field name="counts" type="map"/> </control> <!-- Replicate a QueueObserver for a given queue. --> |