summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/broker/Fairshare.cpp87
-rw-r--r--qpid/cpp/src/qpid/broker/Fairshare.h19
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.cpp29
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.h1
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.cpp78
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.h2
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.cpp169
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.h26
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp6
-rw-r--r--qpid/cpp/src/tests/brokertest.py11
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py52
-rw-r--r--qpid/cpp/xml/cluster.xml3
14 files changed, 277 insertions, 212 deletions
diff --git a/qpid/cpp/src/qpid/broker/Fairshare.cpp b/qpid/cpp/src/qpid/broker/Fairshare.cpp
index 7cdad1a44f..c30b64c7ae 100644
--- a/qpid/cpp/src/qpid/broker/Fairshare.cpp
+++ b/qpid/cpp/src/qpid/broker/Fairshare.cpp
@@ -23,6 +23,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
+#include <algorithm>
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/assign/list_of.hpp>
@@ -32,7 +33,7 @@ namespace broker {
Fairshare::Fairshare(size_t levels, uint limit) :
PriorityQueue(levels),
- limits(levels, limit), priority(levels-1), count(0) {}
+ limits(levels, limit), counts(levels, 0) {}
void Fairshare::setLimit(size_t level, uint limit)
@@ -40,70 +41,63 @@ void Fairshare::setLimit(size_t level, uint limit)
limits[level] = limit;
}
-bool Fairshare::limitReached()
-{
- uint l = limits[priority];
- return l && ++count > l;
-}
-
-uint Fairshare::currentLevel()
-{
- if (limitReached()) {
- return nextLevel();
- } else {
- return priority;
- }
-}
-
-uint Fairshare::nextLevel()
-{
- count = 1;
- if (priority) --priority;
- else priority = levels-1;
- return priority;
-}
-
bool Fairshare::isNull()
{
for (int i = 0; i < levels; i++) if (limits[i]) return false;
return true;
}
-bool Fairshare::getState(uint& p, uint& c) const
+bool Fairshare::getState(qpid::framing::FieldTable& state) const
{
- p = priority;
- c = count;
+ for (int i = 0; i < levels; i++) {
+ if (counts[i]) {
+ std::string key = (boost::format("fairshare-count-%1%") % i).str();
+ state.setInt(key, counts[i]);
+ }
+ }
return true;
}
-bool Fairshare::setState(uint p, uint c)
+bool Fairshare::checkLevel(uint level)
{
- priority = p;
- count = c;
- return true;
+ if (!limits[level] || counts[level] < limits[level]) {
+ counts[level]++;
+ return true;
+ } else {
+ return false;
+ }
}
-bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages)
+bool Fairshare::consume(QueuedMessage& message)
{
- const uint start = p = currentLevel();
- do {
- if (!messages[p].empty()) return true;
- } while ((p = nextLevel()) != start);
- return false;
+ for (Available::iterator i = available.begin(); i != available.end(); ++i) {
+ QueuedMessage* next = *i;
+ if (checkLevel(getPriorityLevel(*next))) {
+ messages[next->position].status = QueuedMessage::ACQUIRED;
+ message = *next;
+ available.erase(i);
+ return true;
+ }
+ }
+ if (!available.empty()) {
+ std::fill(counts.begin(), counts.end(), 0);//reset counts
+ return consume(message);
+ } else {
+ return false;
+ }
}
-
-bool Fairshare::getState(const Messages& m, uint& priority, uint& count)
+bool Fairshare::getState(const Messages& m, qpid::framing::FieldTable& counts)
{
const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m);
- return fairshare && fairshare->getState(priority, count);
+ return fairshare && fairshare->getState(counts);
}
-bool Fairshare::setState(Messages& m, uint priority, uint count)
+bool Fairshare::setState(Messages& m, const qpid::framing::FieldTable& counts)
{
Fairshare* fairshare = dynamic_cast<Fairshare*>(&m);
- return fairshare && fairshare->setState(priority, count);
+ return fairshare && fairshare->setState(counts);
}
int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys)
@@ -136,7 +130,14 @@ int getIntegerSettingForKey(const qpid::framing::FieldTable& settings, const std
{
return getIntegerSetting(settings, boost::assign::list_of<std::string>(key));
}
-
+bool Fairshare::setState(const qpid::framing::FieldTable& state)
+{
+ for (int i = 0; i < levels; i++) {
+ std::string key = (boost::format("fairshare-count-%1%") % i).str();
+ counts[i] = state.isSet(key) ? getIntegerSettingForKey(state, key) : 0;
+ }
+ return true;
+}
int getSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys, int minvalue, int maxvalue)
{
return std::max(minvalue,std::min(getIntegerSetting(settings, keys), maxvalue));
diff --git a/qpid/cpp/src/qpid/broker/Fairshare.h b/qpid/cpp/src/qpid/broker/Fairshare.h
index 1b25721e0c..dfcbdf280e 100644
--- a/qpid/cpp/src/qpid/broker/Fairshare.h
+++ b/qpid/cpp/src/qpid/broker/Fairshare.h
@@ -22,6 +22,7 @@
*
*/
#include "qpid/broker/PriorityQueue.h"
+#include <vector>
namespace qpid {
namespace framing {
@@ -38,23 +39,19 @@ class Fairshare : public PriorityQueue
{
public:
Fairshare(size_t levels, uint limit);
- bool getState(uint& priority, uint& count) const;
- bool setState(uint priority, uint count);
+ bool getState(qpid::framing::FieldTable& counts) const;
+ bool setState(const qpid::framing::FieldTable& counts);
void setLimit(size_t level, uint limit);
bool isNull();
+ bool consume(QueuedMessage&);
static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings);
- static bool getState(const Messages&, uint& priority, uint& count);
- static bool setState(Messages&, uint priority, uint count);
+ static bool getState(const Messages&, qpid::framing::FieldTable& counts);
+ static bool setState(Messages&, const qpid::framing::FieldTable& counts);
private:
std::vector<uint> limits;
+ std::vector<uint> counts;
- uint priority;
- uint count;
-
- uint currentLevel();
- uint nextLevel();
- bool limitReached();
- bool findFrontLevel(uint& p, PriorityLevels&);
+ bool checkLevel(uint level);
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
index 49c0a32c19..f1deddf4c8 100644
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
@@ -28,16 +28,26 @@ namespace broker {
LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {}
void LegacyLVQ::setNoBrowse(bool b)
-{
+{
noBrowse = b;
}
+bool LegacyLVQ::deleted(const QueuedMessage& message)
+{
+ Ordering::iterator i = messages.find(message.position);
+ if (i != messages.end() && i->second.payload == message.payload) {
+ erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
bool LegacyLVQ::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
- if (i != messages.end() && i->second.payload == message.payload) {
+ if (i != messages.end() && i->second.payload == message.payload && i->second.status == QueuedMessage::AVAILABLE) {
+ i->second.status = QueuedMessage::ACQUIRED;
message = i->second;
- erase(i);
return true;
} else {
return false;
@@ -66,12 +76,17 @@ bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed)
}
const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update)
-{
+{
//add the new message into the original position of the replaced message
Ordering::iterator i = messages.find(original.position);
- i->second = update;
- i->second.position = original.position;
- return i->second;
+ if (i != messages.end()) {
+ i->second = update;
+ i->second.position = original.position;
+ return i->second;
+ } else {
+ QPID_LOG(error, "Failed to replace message at " << original.position);
+ return update;
+ }
}
void LegacyLVQ::removeIf(Predicate p)
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
index 695e51131d..9355069f37 100644
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.h
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
@@ -40,6 +40,7 @@ class LegacyLVQ : public MessageMap
{
public:
LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
+ bool deleted(const QueuedMessage&);
bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
bool push(const QueuedMessage& added, QueuedMessage& removed);
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp
index 048df45434..9b164d4e5c 100644
--- a/qpid/cpp/src/qpid/broker/MessageMap.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/MessageMap.h"
#include "qpid/broker/QueuedMessage.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
@@ -27,7 +28,16 @@ namespace {
const std::string EMPTY;
}
-bool MessageMap::deleted(const QueuedMessage&) { return true; }
+bool MessageMap::deleted(const QueuedMessage& message)
+{
+ Ordering::iterator i = messages.find(message.position);
+ if (i != messages.end()) {
+ erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
std::string MessageMap::getKey(const QueuedMessage& message)
{
@@ -38,30 +48,32 @@ std::string MessageMap::getKey(const QueuedMessage& message)
size_t MessageMap::size()
{
- return messages.size();
+ size_t count(0);
+ for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->second.status == QueuedMessage::AVAILABLE) ++count;
+ }
+ return count;
}
bool MessageMap::empty()
{
- return messages.empty();
+ return size() == 0;//TODO: more efficient implementation
}
void MessageMap::release(const QueuedMessage& message)
{
- std::string key = getKey(message);
- Index::iterator i = index.find(key);
- if (i == index.end()) {
- index[key] = message;
- messages[message.position] = message;
- } //else message has already been replaced
+ Ordering::iterator i = messages.find(message.position);
+ if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
+ i->second.status = QueuedMessage::AVAILABLE;
+ }
}
bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
- if (i != messages.end()) {
+ if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
+ i->second.status = QueuedMessage::ACQUIRED;
message = i->second;
- erase(i);
return true;
} else {
return false;
@@ -71,7 +83,7 @@ bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage&
bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
- if (i != messages.end()) {
+ if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
message = i->second;
return true;
} else {
@@ -79,10 +91,10 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me
}
}
-bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
+bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
{
Ordering::iterator i = messages.lower_bound(position+1);
- if (i != messages.end()) {
+ if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
message = i->second;
return true;
} else {
@@ -92,14 +104,14 @@ bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage&
bool MessageMap::consume(QueuedMessage& message)
{
- Ordering::iterator i = messages.begin();
- if (i != messages.end()) {
- message = i->second;
- erase(i);
- return true;
- } else {
- return false;
+ for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->second.status == QueuedMessage::AVAILABLE) {
+ i->second.status = QueuedMessage::ACQUIRED;
+ message = i->second;
+ return true;
+ }
}
+ return false;
}
const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update)
@@ -115,12 +127,17 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
if (result.second) {
//there was no previous message for this key; nothing needs to
//be removed, just add the message into its correct position
- messages[added.position] = added;
+ QueuedMessage& a = messages[added.position];
+ a = added;
+ a.status = QueuedMessage::AVAILABLE;
+ QPID_LOG(debug, "Added message at " << a.position);
return false;
} else {
//there is already a message with that key which needs to be replaced
removed = result.first->second;
result.first->second = replace(result.first->second, added);
+ result.first->second.status = QueuedMessage::AVAILABLE;
+ QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first);
return true;
}
}
@@ -128,15 +145,24 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
void MessageMap::foreach(Functor f)
{
for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
- f(i->second);
+ if (i->second.status == QueuedMessage::AVAILABLE) f(i->second);
}
}
void MessageMap::removeIf(Predicate p)
{
- for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) {
- if (p(i->second)) {
- erase(i);
+ for (Ordering::iterator i = messages.begin(); i != messages.end();) {
+ if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) {
+ index.erase(getKey(i->second));
+ //Note: Removing from messages means that the subsequent
+ //call to deleted() for the same message will return
+ //false. At present that is not a problem. If this were
+ //changed to hold onto the message until dequeued
+ //(e.g. with REMOVED state), then the erase() below would
+ //need to take that into account.
+ messages.erase(i++);
+ } else {
+ ++i;
}
}
}
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.h b/qpid/cpp/src/qpid/broker/MessageMap.h
index d1b8217f9b..a668450250 100644
--- a/qpid/cpp/src/qpid/broker/MessageMap.h
+++ b/qpid/cpp/src/qpid/broker/MessageMap.h
@@ -43,7 +43,7 @@ class MessageMap : public Messages
size_t size();
bool empty();
- bool deleted(const QueuedMessage&);
+ virtual bool deleted(const QueuedMessage&);
void release(const QueuedMessage&);
virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
index d807ef22b1..da52675a29 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -28,120 +28,125 @@ namespace qpid {
namespace broker {
PriorityQueue::PriorityQueue(int l) :
- levels(l),
- messages(levels, Deque()),
- frontLevel(0), haveFront(false), cached(false) {}
+ levels(l) {}
-bool PriorityQueue::deleted(const QueuedMessage&) { return true; }
-
-size_t PriorityQueue::size()
+bool PriorityQueue::deleted(const QueuedMessage& message)
{
- size_t total(0);
- for (int i = 0; i < levels; ++i) {
- total += messages[i].size();
+ Index::iterator i = messages.find(message.position);
+ if (i != messages.end()) {
+ //remove from available list if necessary
+ if (i->second.status == QueuedMessage::AVAILABLE) {
+ Available::iterator j = std::find(available.begin(), available.end(), &i->second);
+ if (j != available.end()) available.erase(j);
+ }
+ //remove from messages map
+ messages.erase(i);
+ return true;
+ } else {
+ return false;
}
- return total;
}
-void PriorityQueue::release(const QueuedMessage& message)
+size_t PriorityQueue::size()
{
- uint p = getPriorityLevel(message);
- messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message);
- clearCache();
+ return available.size();
}
-bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
+void PriorityQueue::release(const QueuedMessage& message)
{
- QueuedMessage comp;
- comp.position = position;
- for (int i = 0; i < levels; ++i) {
- if (!messages[i].empty()) {
- unsigned long diff = position.getValue() - messages[i].front().position.getValue();
- long maxEnd = diff < messages[i].size() ? diff : messages[i].size();
- Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp);
- if (l != messages[i].end() && l->position == position) {
- message = *l;
- if (remove) {
- messages[i].erase(l);
- clearCache();
- }
- return true;
- }
- }
+ Index::iterator i = messages.find(message.position);
+ if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
+ i->second.status = QueuedMessage::AVAILABLE;
+ //insert message back into the correct place in available queue, based on priority:
+ Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2));
+ available.insert(j, &i->second);
}
- return false;
}
bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
- return find(position, message, true);
+ Index::iterator i = messages.find(position);
+ if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
+ i->second.status = QueuedMessage::ACQUIRED;
+ message = i->second;
+ //remove it from available list (could make this faster by using ordering):
+ Available::iterator j = std::find(available.begin(), available.end(), &i->second);
+ assert(j != available.end());
+ available.erase(j);
+ return true;
+ } else {
+ return false;
+ }
}
bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
{
- return find(position, message, false);
+ Index::iterator i = messages.find(position);
+ if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
+ message = i->second;
+ return true;
+ } else {
+ return false;
+ }
}
-bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
+bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
{
- QueuedMessage match;
- match.position = position+1;
- Deque::iterator lowest;
- bool found = false;
- for (int i = 0; i < levels; ++i) {
- Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match);
- if (m != messages[i].end()) {
- if (m->position == match.position) {
- message = *m;
- return true;
- } else if (!found || m->position < lowest->position) {
- lowest = m;
- found = true;
- }
- }
- }
- if (found) {
- message = *lowest;
+ Index::iterator i = messages.lower_bound(position+1);
+ if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
+ message = i->second;
+ return true;
+ } else {
+ return false;
}
- return found;
}
bool PriorityQueue::consume(QueuedMessage& message)
{
- if (checkFront()) {
- message = messages[frontLevel].front();
- messages[frontLevel].pop_front();
- clearCache();
+ if (!available.empty()) {
+ QueuedMessage* next = available.front();
+ messages[next->position].status = QueuedMessage::ACQUIRED;
+ message = *next;
+ available.pop_front();
return true;
} else {
return false;
}
}
+bool PriorityQueue::compare(const QueuedMessage* a, const QueuedMessage* b) const
+{
+ int priorityA = getPriorityLevel(*a);
+ int priorityB = getPriorityLevel(*b);
+ if (priorityA == priorityB) return a->position < b->position;
+ else return priorityA > priorityB;
+}
+
bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
{
- messages[getPriorityLevel(added)].push_back(added);
- clearCache();
+ Index::iterator i = messages.insert(Index::value_type(added.position, added)).first;
+ i->second.status = QueuedMessage::AVAILABLE;
+ //insert message into the correct place in available queue, based on priority:
+ Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2));
+ available.insert(j, &i->second);
return false;//adding a message never causes one to be removed for deque
}
void PriorityQueue::foreach(Functor f)
{
- for (int i = 0; i < levels; ++i) {
- std::for_each(messages[i].begin(), messages[i].end(), f);
+ for (Available::iterator i = available.begin(); i != available.end(); ++i) {
+ f(**i);
}
}
void PriorityQueue::removeIf(Predicate p)
{
- for (int priority = 0; priority < levels; ++priority) {
- for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
- if (p(*i)) {
- i = messages[priority].erase(i);
- clearCache();
- } else {
- ++i;
- }
+ for (Available::iterator i = available.begin(); i != available.end();) {
+ if (p(**i)) {
+ messages[(*i)->position].status = QueuedMessage::REMOVED;
+ i = available.erase(i);
+ } else {
+ ++i;
}
}
}
@@ -156,30 +161,6 @@ uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
return std::min(priority - firstLevel, (uint)levels-1);
}
-void PriorityQueue::clearCache()
-{
- cached = false;
-}
-
-bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m)
-{
- for (int p = levels-1; p >= 0; --p) {
- if (!m[p].empty()) {
- l = p;
- return true;
- }
- }
- return false;
-}
-
-bool PriorityQueue::checkFront()
-{
- if (!cached) {
- haveFront = findFrontLevel(frontLevel, messages);
- cached = true;
- }
- return haveFront;
-}
uint PriorityQueue::getPriority(const QueuedMessage& message)
{
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h
index 67c31468d2..590cf68003 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.h
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h
@@ -23,8 +23,8 @@
*/
#include "qpid/broker/Messages.h"
#include "qpid/sys/IntegerTypes.h"
-#include <deque>
-#include <vector>
+#include <list>
+#include <map>
namespace qpid {
namespace broker {
@@ -46,28 +46,22 @@ class PriorityQueue : public Messages
bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
- bool consume(QueuedMessage&);
+ virtual bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void foreach(Functor);
void removeIf(Predicate);
static uint getPriority(const QueuedMessage&);
protected:
- typedef std::deque<QueuedMessage> Deque;
- typedef std::vector<Deque> PriorityLevels;
- virtual bool findFrontLevel(uint& p, PriorityLevels&);
+ typedef std::list<QueuedMessage*> Available;
+ typedef std::map<framing::SequenceNumber, QueuedMessage> Index;
const int levels;
- private:
- PriorityLevels messages;
- uint frontLevel;
- bool haveFront;
- bool cached;
-
- bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove);
- uint getPriorityLevel(const QueuedMessage&) const;
- void clearCache();
- bool checkFront();
+ Index messages;
+ Available available;
+
+ bool compare(const QueuedMessage* a, const QueuedMessage* b) const;
+ uint getPriorityLevel(const QueuedMessage& m) const;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 3d5a7be1c3..4f72501d52 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -585,9 +585,9 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi
findQueue(qname)->setPosition(position);
}
-void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count)
+void Connection::queueFairshareState(const std::string& qname, const framing::FieldTable& counts)
{
- if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) {
+ if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), counts)) {
QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies.");
}
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 920c4937db..14f7fb2e1a 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -156,7 +156,7 @@ class Connection :
uint32_t credit);
void queuePosition(const std::string&, const framing::SequenceNumber&);
- void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
+ void queueFairshareState(const std::string&, const framing::FieldTable& count);
void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
void txStart();
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 3a3582d032..8460b7c7bb 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -389,9 +389,9 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1));
ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition());
- uint priority, count;
- if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
- ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
+ qpid::framing::FieldTable counts;
+ if (qpid::broker::Fairshare::getState(q->getMessages(), counts)) {
+ ClusterConnectionProxy(s).queueFairshareState(q->getName(), counts);
}
ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge());
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 3207a51b79..ccf25f35b5 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -513,18 +513,21 @@ class BrokerTest(TestCase):
finally: r.close()
return contents
- def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content):
+ def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
actual_contents = self.browse(session, queue, timeout, transform=transform)
- self.assertEqual(expect_contents, actual_contents)
+ if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+ self.assertEqual(expect_contents, actual_contents, msg)
- def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content):
+ def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg=None):
"""Wait up to timeout for contents of queue to match expect_contents"""
test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents
retry(test, timeout, delay)
- self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform))
+ actual_contents = self.browse(session, queue, 0, transform=transform)
+ if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+ self.assertEqual(expect_contents, actual_contents, msg)
def join(thread, timeout=10):
thread.join(timeout)
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 822e07c702..e9d44c21e0 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -100,8 +100,8 @@ class HaCluster(object):
def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
-class ShortTests(BrokerTest):
- """Short HA functionality tests."""
+class HaTest(BrokerTest):
+ """Base class for HA test cases, defines convenience functions"""
# Wait for an address to become valid.
def wait(self, session, address):
@@ -135,6 +135,9 @@ class ShortTests(BrokerTest):
"""Connect to a backup broker as an admin connection"""
return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
+class ReplicationTests(HaTest):
+ """Correctness tests for HA replication."""
+
def test_replication(self):
"""Test basic replication of configuration and messages before and
after backup has connected"""
@@ -491,6 +494,51 @@ class ShortTests(BrokerTest):
# self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority)
+ def test_backup_acquired(self):
+ """Verify that acquired messages are backed up, for all queue types."""
+ class Test:
+ def __init__(self, queue, arguments, expect):
+ self.queue = queue
+ self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%(
+ self.queue, ",".join(arguments + ["'qpid.replicate':all"]))
+ self.expect = [str(i) for i in expect]
+
+ def send(self, connection):
+ """Send messages, then acquire one but don't acknowledge"""
+ s = connection.session()
+ for m in range(10): s.sender(self.address).send(str(m))
+ s.receiver(self.address).fetch()
+
+ def wait(self, brokertest, backup):
+ brokertest.wait_backup(backup, self.queue)
+
+ def verify(self, brokertest, backup):
+ brokertest.assert_browse_backup(
+ backup, self.queue, self.expect, msg=self.queue)
+
+ tests = [
+ Test("plain",[],range(10)),
+ Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)),
+ Test("priority",["'qpid.priorities':10"], range(10)),
+ Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)),
+ Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9])
+ ]
+
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ c = primary.connect()
+ for t in tests: t.send(c) # Send messages, leave one unacknowledged.
+
+ backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ # Wait for backups to catch up.
+ for t in tests:
+ t.wait(self, backup1)
+ t.wait(self, backup2)
+ # Verify acquired message was replicated
+ for t in tests: t.verify(self, backup1)
+ for t in tests: t.verify(self, backup2)
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 7b3f2fe63b..580451c5b5 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -304,8 +304,7 @@
<!-- Set the fairshare delivery related state of a replicated queue. -->
<control name="queue-fairshare-state" code="0x38">
<field name="queue" type="str8"/>
- <field name="position" type="uint8"/>
- <field name="count" type="uint8"/>
+ <field name="counts" type="map"/>
</control>
<!-- Replicate a QueueObserver for a given queue. -->