summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-03-30 19:36:48 +0000
committerAlan Conway <aconway@apache.org>2012-03-30 19:36:48 +0000
commite54ef8dc737196343ad974c91a86681efca5fb14 (patch)
treefc1cb9b1d5035dc06795ae877e02b895e86b2a9f
parent38d1f36fe4238a887f867350adaa56489e53e0e6 (diff)
downloadqpid-python-e54ef8dc737196343ad974c91a86681efca5fb14.tar.gz
QPID-3603: Keep acquired messages on queues for all queue types.
Updated priority and lvq queues to keep acquired messages, and supply them to browsers if requested. This is necessary so replicating subscriptions can back-up these queue types without message loss. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1307582 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Fairshare.cpp87
-rw-r--r--qpid/cpp/src/qpid/broker/Fairshare.h19
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.cpp29
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.h1
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.cpp78
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.h2
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.cpp169
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.h26
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp6
-rw-r--r--qpid/cpp/src/tests/brokertest.py11
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py52
-rw-r--r--qpid/cpp/xml/cluster.xml3
14 files changed, 277 insertions, 212 deletions
diff --git a/qpid/cpp/src/qpid/broker/Fairshare.cpp b/qpid/cpp/src/qpid/broker/Fairshare.cpp
index 7cdad1a44f..c30b64c7ae 100644
--- a/qpid/cpp/src/qpid/broker/Fairshare.cpp
+++ b/qpid/cpp/src/qpid/broker/Fairshare.cpp
@@ -23,6 +23,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
+#include <algorithm>
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/assign/list_of.hpp>
@@ -32,7 +33,7 @@ namespace broker {
Fairshare::Fairshare(size_t levels, uint limit) :
PriorityQueue(levels),
- limits(levels, limit), priority(levels-1), count(0) {}
+ limits(levels, limit), counts(levels, 0) {}
void Fairshare::setLimit(size_t level, uint limit)
@@ -40,70 +41,63 @@ void Fairshare::setLimit(size_t level, uint limit)
limits[level] = limit;
}
-bool Fairshare::limitReached()
-{
- uint l = limits[priority];
- return l && ++count > l;
-}
-
-uint Fairshare::currentLevel()
-{
- if (limitReached()) {
- return nextLevel();
- } else {
- return priority;
- }
-}
-
-uint Fairshare::nextLevel()
-{
- count = 1;
- if (priority) --priority;
- else priority = levels-1;
- return priority;
-}
-
bool Fairshare::isNull()
{
for (int i = 0; i < levels; i++) if (limits[i]) return false;
return true;
}
-bool Fairshare::getState(uint& p, uint& c) const
+bool Fairshare::getState(qpid::framing::FieldTable& state) const
{
- p = priority;
- c = count;
+ for (int i = 0; i < levels; i++) {
+ if (counts[i]) {
+ std::string key = (boost::format("fairshare-count-%1%") % i).str();
+ state.setInt(key, counts[i]);
+ }
+ }
return true;
}
-bool Fairshare::setState(uint p, uint c)
+bool Fairshare::checkLevel(uint level)
{
- priority = p;
- count = c;
- return true;
+ if (!limits[level] || counts[level] < limits[level]) {
+ counts[level]++;
+ return true;
+ } else {
+ return false;
+ }
}
-bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages)
+bool Fairshare::consume(QueuedMessage& message)
{
- const uint start = p = currentLevel();
- do {
- if (!messages[p].empty()) return true;
- } while ((p = nextLevel()) != start);
- return false;
+ for (Available::iterator i = available.begin(); i != available.end(); ++i) {
+ QueuedMessage* next = *i;
+ if (checkLevel(getPriorityLevel(*next))) {
+ messages[next->position].status = QueuedMessage::ACQUIRED;
+ message = *next;
+ available.erase(i);
+ return true;
+ }
+ }
+ if (!available.empty()) {
+ std::fill(counts.begin(), counts.end(), 0);//reset counts
+ return consume(message);
+ } else {
+ return false;
+ }
}
-
-bool Fairshare::getState(const Messages& m, uint& priority, uint& count)
+bool Fairshare::getState(const Messages& m, qpid::framing::FieldTable& counts)
{
const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m);
- return fairshare && fairshare->getState(priority, count);
+ return fairshare && fairshare->getState(counts);
}
-bool Fairshare::setState(Messages& m, uint priority, uint count)
+bool Fairshare::setState(Messages& m, const qpid::framing::FieldTable& counts)
{
Fairshare* fairshare = dynamic_cast<Fairshare*>(&m);
- return fairshare && fairshare->setState(priority, count);
+ return fairshare && fairshare->setState(counts);
}
int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys)
@@ -136,7 +130,14 @@ int getIntegerSettingForKey(const qpid::framing::FieldTable& settings, const std
{
return getIntegerSetting(settings, boost::assign::list_of<std::string>(key));
}
-
+bool Fairshare::setState(const qpid::framing::FieldTable& state)
+{
+ for (int i = 0; i < levels; i++) {
+ std::string key = (boost::format("fairshare-count-%1%") % i).str();
+ counts[i] = state.isSet(key) ? getIntegerSettingForKey(state, key) : 0;
+ }
+ return true;
+}
int getSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys, int minvalue, int maxvalue)
{
return std::max(minvalue,std::min(getIntegerSetting(settings, keys), maxvalue));
diff --git a/qpid/cpp/src/qpid/broker/Fairshare.h b/qpid/cpp/src/qpid/broker/Fairshare.h
index 1b25721e0c..dfcbdf280e 100644
--- a/qpid/cpp/src/qpid/broker/Fairshare.h
+++ b/qpid/cpp/src/qpid/broker/Fairshare.h
@@ -22,6 +22,7 @@
*
*/
#include "qpid/broker/PriorityQueue.h"
+#include <vector>
namespace qpid {
namespace framing {
@@ -38,23 +39,19 @@ class Fairshare : public PriorityQueue
{
public:
Fairshare(size_t levels, uint limit);
- bool getState(uint& priority, uint& count) const;
- bool setState(uint priority, uint count);
+ bool getState(qpid::framing::FieldTable& counts) const;
+ bool setState(const qpid::framing::FieldTable& counts);
void setLimit(size_t level, uint limit);
bool isNull();
+ bool consume(QueuedMessage&);
static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings);
- static bool getState(const Messages&, uint& priority, uint& count);
- static bool setState(Messages&, uint priority, uint count);
+ static bool getState(const Messages&, qpid::framing::FieldTable& counts);
+ static bool setState(Messages&, const qpid::framing::FieldTable& counts);
private:
std::vector<uint> limits;
+ std::vector<uint> counts;
- uint priority;
- uint count;
-
- uint currentLevel();
- uint nextLevel();
- bool limitReached();
- bool findFrontLevel(uint& p, PriorityLevels&);
+ bool checkLevel(uint level);
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
index 49c0a32c19..f1deddf4c8 100644
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
@@ -28,16 +28,26 @@ namespace broker {
LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {}
void LegacyLVQ::setNoBrowse(bool b)
-{
+{
noBrowse = b;
}
+bool LegacyLVQ::deleted(const QueuedMessage& message)
+{
+ Ordering::iterator i = messages.find(message.position);
+ if (i != messages.end() && i->second.payload == message.payload) {
+ erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
bool LegacyLVQ::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
- if (i != messages.end() && i->second.payload == message.payload) {
+ if (i != messages.end() && i->second.payload == message.payload && i->second.status == QueuedMessage::AVAILABLE) {
+ i->second.status = QueuedMessage::ACQUIRED;
message = i->second;
- erase(i);
return true;
} else {
return false;
@@ -66,12 +76,17 @@ bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed)
}
const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update)
-{
+{
//add the new message into the original position of the replaced message
Ordering::iterator i = messages.find(original.position);
- i->second = update;
- i->second.position = original.position;
- return i->second;
+ if (i != messages.end()) {
+ i->second = update;
+ i->second.position = original.position;
+ return i->second;
+ } else {
+ QPID_LOG(error, "Failed to replace message at " << original.position);
+ return update;
+ }
}
void LegacyLVQ::removeIf(Predicate p)
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
index 695e51131d..9355069f37 100644
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.h
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
@@ -40,6 +40,7 @@ class LegacyLVQ : public MessageMap
{
public:
LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
+ bool deleted(const QueuedMessage&);
bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
bool push(const QueuedMessage& added, QueuedMessage& removed);
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp
index 048df45434..9b164d4e5c 100644
--- a/qpid/cpp/src/qpid/broker/MessageMap.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/MessageMap.h"
#include "qpid/broker/QueuedMessage.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
@@ -27,7 +28,16 @@ namespace {
const std::string EMPTY;
}
-bool MessageMap::deleted(const QueuedMessage&) { return true; }
+bool MessageMap::deleted(const QueuedMessage& message)
+{
+ Ordering::iterator i = messages.find(message.position);
+ if (i != messages.end()) {
+ erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
std::string MessageMap::getKey(const QueuedMessage& message)
{
@@ -38,30 +48,32 @@ std::string MessageMap::getKey(const QueuedMessage& message)
size_t MessageMap::size()
{
- return messages.size();
+ size_t count(0);
+ for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->second.status == QueuedMessage::AVAILABLE) ++count;
+ }
+ return count;
}
bool MessageMap::empty()
{
- return messages.empty();
+ return size() == 0;//TODO: more efficient implementation
}
void MessageMap::release(const QueuedMessage& message)
{
- std::string key = getKey(message);
- Index::iterator i = index.find(key);
- if (i == index.end()) {
- index[key] = message;
- messages[message.position] = message;
- } //else message has already been replaced
+ Ordering::iterator i = messages.find(message.position);
+ if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
+ i->second.status = QueuedMessage::AVAILABLE;
+ }
}
bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
- if (i != messages.end()) {
+ if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
+ i->second.status = QueuedMessage::ACQUIRED;
message = i->second;
- erase(i);
return true;
} else {
return false;
@@ -71,7 +83,7 @@ bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage&
bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
- if (i != messages.end()) {
+ if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
message = i->second;
return true;
} else {
@@ -79,10 +91,10 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me
}
}
-bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
+bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
{
Ordering::iterator i = messages.lower_bound(position+1);
- if (i != messages.end()) {
+ if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
message = i->second;
return true;
} else {
@@ -92,14 +104,14 @@ bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage&
bool MessageMap::consume(QueuedMessage& message)
{
- Ordering::iterator i = messages.begin();
- if (i != messages.end()) {
- message = i->second;
- erase(i);
- return true;
- } else {
- return false;
+ for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->second.status == QueuedMessage::AVAILABLE) {
+ i->second.status = QueuedMessage::ACQUIRED;
+ message = i->second;
+ return true;
+ }
}
+ return false;
}
const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update)
@@ -115,12 +127,17 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
if (result.second) {
//there was no previous message for this key; nothing needs to
//be removed, just add the message into its correct position
- messages[added.position] = added;
+ QueuedMessage& a = messages[added.position];
+ a = added;
+ a.status = QueuedMessage::AVAILABLE;
+ QPID_LOG(debug, "Added message at " << a.position);
return false;
} else {
//there is already a message with that key which needs to be replaced
removed = result.first->second;
result.first->second = replace(result.first->second, added);
+ result.first->second.status = QueuedMessage::AVAILABLE;
+ QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first);
return true;
}
}
@@ -128,15 +145,24 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
void MessageMap::foreach(Functor f)
{
for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
- f(i->second);
+ if (i->second.status == QueuedMessage::AVAILABLE) f(i->second);
}
}
void MessageMap::removeIf(Predicate p)
{
- for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) {
- if (p(i->second)) {
- erase(i);
+ for (Ordering::iterator i = messages.begin(); i != messages.end();) {
+ if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) {
+ index.erase(getKey(i->second));
+ //Note: Removing from messages means that the subsequent
+ //call to deleted() for the same message will return
+ //false. At present that is not a problem. If this were
+ //changed to hold onto the message until dequeued
+ //(e.g. with REMOVED state), then the erase() below would
+ //need to take that into account.
+ messages.erase(i++);
+ } else {
+ ++i;
}
}
}
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.h b/qpid/cpp/src/qpid/broker/MessageMap.h
index d1b8217f9b..a668450250 100644
--- a/qpid/cpp/src/qpid/broker/MessageMap.h
+++ b/qpid/cpp/src/qpid/broker/MessageMap.h
@@ -43,7 +43,7 @@ class MessageMap : public Messages
size_t size();
bool empty();
- bool deleted(const QueuedMessage&);
+ virtual bool deleted(const QueuedMessage&);
void release(const QueuedMessage&);
virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
index d807ef22b1..da52675a29 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -28,120 +28,125 @@ namespace qpid {
namespace broker {
PriorityQueue::PriorityQueue(int l) :
- levels(l),
- messages(levels, Deque()),
- frontLevel(0), haveFront(false), cached(false) {}
+ levels(l) {}
-bool PriorityQueue::deleted(const QueuedMessage&) { return true; }
-
-size_t PriorityQueue::size()
+bool PriorityQueue::deleted(const QueuedMessage& message)
{
- size_t total(0);
- for (int i = 0; i < levels; ++i) {
- total += messages[i].size();
+ Index::iterator i = messages.find(message.position);
+ if (i != messages.end()) {
+ //remove from available list if necessary
+ if (i->second.status == QueuedMessage::AVAILABLE) {
+ Available::iterator j = std::find(available.begin(), available.end(), &i->second);
+ if (j != available.end()) available.erase(j);
+ }
+ //remove from messages map
+ messages.erase(i);
+ return true;
+ } else {
+ return false;
}
- return total;
}
-void PriorityQueue::release(const QueuedMessage& message)
+size_t PriorityQueue::size()
{
- uint p = getPriorityLevel(message);
- messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message);
- clearCache();
+ return available.size();
}
-bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
+void PriorityQueue::release(const QueuedMessage& message)
{
- QueuedMessage comp;
- comp.position = position;
- for (int i = 0; i < levels; ++i) {
- if (!messages[i].empty()) {
- unsigned long diff = position.getValue() - messages[i].front().position.getValue();
- long maxEnd = diff < messages[i].size() ? diff : messages[i].size();
- Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp);
- if (l != messages[i].end() && l->position == position) {
- message = *l;
- if (remove) {
- messages[i].erase(l);
- clearCache();
- }
- return true;
- }
- }
+ Index::iterator i = messages.find(message.position);
+ if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
+ i->second.status = QueuedMessage::AVAILABLE;
+ //insert message back into the correct place in available queue, based on priority:
+ Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2));
+ available.insert(j, &i->second);
}
- return false;
}
bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
- return find(position, message, true);
+ Index::iterator i = messages.find(position);
+ if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
+ i->second.status = QueuedMessage::ACQUIRED;
+ message = i->second;
+ //remove it from available list (could make this faster by using ordering):
+ Available::iterator j = std::find(available.begin(), available.end(), &i->second);
+ assert(j != available.end());
+ available.erase(j);
+ return true;
+ } else {
+ return false;
+ }
}
bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
{
- return find(position, message, false);
+ Index::iterator i = messages.find(position);
+ if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
+ message = i->second;
+ return true;
+ } else {
+ return false;
+ }
}
-bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
+bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
{
- QueuedMessage match;
- match.position = position+1;
- Deque::iterator lowest;
- bool found = false;
- for (int i = 0; i < levels; ++i) {
- Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match);
- if (m != messages[i].end()) {
- if (m->position == match.position) {
- message = *m;
- return true;
- } else if (!found || m->position < lowest->position) {
- lowest = m;
- found = true;
- }
- }
- }
- if (found) {
- message = *lowest;
+ Index::iterator i = messages.lower_bound(position+1);
+ if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
+ message = i->second;
+ return true;
+ } else {
+ return false;
}
- return found;
}
bool PriorityQueue::consume(QueuedMessage& message)
{
- if (checkFront()) {
- message = messages[frontLevel].front();
- messages[frontLevel].pop_front();
- clearCache();
+ if (!available.empty()) {
+ QueuedMessage* next = available.front();
+ messages[next->position].status = QueuedMessage::ACQUIRED;
+ message = *next;
+ available.pop_front();
return true;
} else {
return false;
}
}
+bool PriorityQueue::compare(const QueuedMessage* a, const QueuedMessage* b) const
+{
+ int priorityA = getPriorityLevel(*a);
+ int priorityB = getPriorityLevel(*b);
+ if (priorityA == priorityB) return a->position < b->position;
+ else return priorityA > priorityB;
+}
+
bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
{
- messages[getPriorityLevel(added)].push_back(added);
- clearCache();
+ Index::iterator i = messages.insert(Index::value_type(added.position, added)).first;
+ i->second.status = QueuedMessage::AVAILABLE;
+ //insert message into the correct place in available queue, based on priority:
+ Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2));
+ available.insert(j, &i->second);
return false;//adding a message never causes one to be removed for deque
}
void PriorityQueue::foreach(Functor f)
{
- for (int i = 0; i < levels; ++i) {
- std::for_each(messages[i].begin(), messages[i].end(), f);
+ for (Available::iterator i = available.begin(); i != available.end(); ++i) {
+ f(**i);
}
}
void PriorityQueue::removeIf(Predicate p)
{
- for (int priority = 0; priority < levels; ++priority) {
- for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
- if (p(*i)) {
- i = messages[priority].erase(i);
- clearCache();
- } else {
- ++i;
- }
+ for (Available::iterator i = available.begin(); i != available.end();) {
+ if (p(**i)) {
+ messages[(*i)->position].status = QueuedMessage::REMOVED;
+ i = available.erase(i);
+ } else {
+ ++i;
}
}
}
@@ -156,30 +161,6 @@ uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
return std::min(priority - firstLevel, (uint)levels-1);
}
-void PriorityQueue::clearCache()
-{
- cached = false;
-}
-
-bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m)
-{
- for (int p = levels-1; p >= 0; --p) {
- if (!m[p].empty()) {
- l = p;
- return true;
- }
- }
- return false;
-}
-
-bool PriorityQueue::checkFront()
-{
- if (!cached) {
- haveFront = findFrontLevel(frontLevel, messages);
- cached = true;
- }
- return haveFront;
-}
uint PriorityQueue::getPriority(const QueuedMessage& message)
{
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h
index 67c31468d2..590cf68003 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.h
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h
@@ -23,8 +23,8 @@
*/
#include "qpid/broker/Messages.h"
#include "qpid/sys/IntegerTypes.h"
-#include <deque>
-#include <vector>
+#include <list>
+#include <map>
namespace qpid {
namespace broker {
@@ -46,28 +46,22 @@ class PriorityQueue : public Messages
bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
- bool consume(QueuedMessage&);
+ virtual bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void foreach(Functor);
void removeIf(Predicate);
static uint getPriority(const QueuedMessage&);
protected:
- typedef std::deque<QueuedMessage> Deque;
- typedef std::vector<Deque> PriorityLevels;
- virtual bool findFrontLevel(uint& p, PriorityLevels&);
+ typedef std::list<QueuedMessage*> Available;
+ typedef std::map<framing::SequenceNumber, QueuedMessage> Index;
const int levels;
- private:
- PriorityLevels messages;
- uint frontLevel;
- bool haveFront;
- bool cached;
-
- bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove);
- uint getPriorityLevel(const QueuedMessage&) const;
- void clearCache();
- bool checkFront();
+ Index messages;
+ Available available;
+
+ bool compare(const QueuedMessage* a, const QueuedMessage* b) const;
+ uint getPriorityLevel(const QueuedMessage& m) const;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 3d5a7be1c3..4f72501d52 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -585,9 +585,9 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi
findQueue(qname)->setPosition(position);
}
-void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count)
+void Connection::queueFairshareState(const std::string& qname, const framing::FieldTable& counts)
{
- if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) {
+ if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), counts)) {
QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies.");
}
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 920c4937db..14f7fb2e1a 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -156,7 +156,7 @@ class Connection :
uint32_t credit);
void queuePosition(const std::string&, const framing::SequenceNumber&);
- void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
+ void queueFairshareState(const std::string&, const framing::FieldTable& count);
void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
void txStart();
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 3a3582d032..8460b7c7bb 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -389,9 +389,9 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1));
ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition());
- uint priority, count;
- if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
- ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
+ qpid::framing::FieldTable counts;
+ if (qpid::broker::Fairshare::getState(q->getMessages(), counts)) {
+ ClusterConnectionProxy(s).queueFairshareState(q->getName(), counts);
}
ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge());
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 3207a51b79..ccf25f35b5 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -513,18 +513,21 @@ class BrokerTest(TestCase):
finally: r.close()
return contents
- def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content):
+ def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
actual_contents = self.browse(session, queue, timeout, transform=transform)
- self.assertEqual(expect_contents, actual_contents)
+ if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+ self.assertEqual(expect_contents, actual_contents, msg)
- def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content):
+ def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg=None):
"""Wait up to timeout for contents of queue to match expect_contents"""
test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents
retry(test, timeout, delay)
- self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform))
+ actual_contents = self.browse(session, queue, 0, transform=transform)
+ if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+ self.assertEqual(expect_contents, actual_contents, msg)
def join(thread, timeout=10):
thread.join(timeout)
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 822e07c702..e9d44c21e0 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -100,8 +100,8 @@ class HaCluster(object):
def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
-class ShortTests(BrokerTest):
- """Short HA functionality tests."""
+class HaTest(BrokerTest):
+ """Base class for HA test cases, defines convenience functions"""
# Wait for an address to become valid.
def wait(self, session, address):
@@ -135,6 +135,9 @@ class ShortTests(BrokerTest):
"""Connect to a backup broker as an admin connection"""
return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
+class ReplicationTests(HaTest):
+ """Correctness tests for HA replication."""
+
def test_replication(self):
"""Test basic replication of configuration and messages before and
after backup has connected"""
@@ -491,6 +494,51 @@ class ShortTests(BrokerTest):
# self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority)
+ def test_backup_acquired(self):
+ """Verify that acquired messages are backed up, for all queue types."""
+ class Test:
+ def __init__(self, queue, arguments, expect):
+ self.queue = queue
+ self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%(
+ self.queue, ",".join(arguments + ["'qpid.replicate':all"]))
+ self.expect = [str(i) for i in expect]
+
+ def send(self, connection):
+ """Send messages, then acquire one but don't acknowledge"""
+ s = connection.session()
+ for m in range(10): s.sender(self.address).send(str(m))
+ s.receiver(self.address).fetch()
+
+ def wait(self, brokertest, backup):
+ brokertest.wait_backup(backup, self.queue)
+
+ def verify(self, brokertest, backup):
+ brokertest.assert_browse_backup(
+ backup, self.queue, self.expect, msg=self.queue)
+
+ tests = [
+ Test("plain",[],range(10)),
+ Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)),
+ Test("priority",["'qpid.priorities':10"], range(10)),
+ Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)),
+ Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9])
+ ]
+
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ c = primary.connect()
+ for t in tests: t.send(c) # Send messages, leave one unacknowledged.
+
+ backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ # Wait for backups to catch up.
+ for t in tests:
+ t.wait(self, backup1)
+ t.wait(self, backup2)
+ # Verify acquired message was replicated
+ for t in tests: t.verify(self, backup1)
+ for t in tests: t.verify(self, backup2)
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 7b3f2fe63b..580451c5b5 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -304,8 +304,7 @@
<!-- Set the fairshare delivery related state of a replicated queue. -->
<control name="queue-fairshare-state" code="0x38">
<field name="queue" type="str8"/>
- <field name="position" type="uint8"/>
- <field name="count" type="uint8"/>
+ <field name="counts" type="map"/>
</control>
<!-- Replicate a QueueObserver for a given queue. -->