summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-04-23 15:51:46 +0000
committerAlan Conway <aconway@apache.org>2012-04-23 15:51:46 +0000
commitfa2d517b97721670a0c5bd55276b346e242288ba (patch)
treeb80174ae0abc7edf88c1f764d11918ae1a5e2107
parenta7c7e18f5d000b1e6e211a97e725b8bdca2900cc (diff)
downloadqpid-python-fa2d517b97721670a0c5bd55276b346e242288ba.tar.gz
QPID-3960: Fix performance regression in priority queue implementation.
Revision r1307582 created a serious degredation in priority queue performance. It replaced a muti-deque implementation with o(1) complexity for consuming with a map implementation with o(log(n)) performance. This revision returns to a mutli-deque algorithm but with the addition of a FIFO index for fast browsing of acquired and unacquired messages. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1329301 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Fairshare.cpp87
-rw-r--r--qpid/cpp/src/qpid/broker/Fairshare.h19
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.cpp20
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.h7
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.cpp177
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.h45
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp6
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py1
-rw-r--r--qpid/cpp/xml/cluster.xml3
11 files changed, 211 insertions, 160 deletions
diff --git a/qpid/cpp/src/qpid/broker/Fairshare.cpp b/qpid/cpp/src/qpid/broker/Fairshare.cpp
index c30b64c7ae..7cdad1a44f 100644
--- a/qpid/cpp/src/qpid/broker/Fairshare.cpp
+++ b/qpid/cpp/src/qpid/broker/Fairshare.cpp
@@ -23,7 +23,6 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
-#include <algorithm>
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/assign/list_of.hpp>
@@ -33,7 +32,7 @@ namespace broker {
Fairshare::Fairshare(size_t levels, uint limit) :
PriorityQueue(levels),
- limits(levels, limit), counts(levels, 0) {}
+ limits(levels, limit), priority(levels-1), count(0) {}
void Fairshare::setLimit(size_t level, uint limit)
@@ -41,63 +40,70 @@ void Fairshare::setLimit(size_t level, uint limit)
limits[level] = limit;
}
+bool Fairshare::limitReached()
+{
+ uint l = limits[priority];
+ return l && ++count > l;
+}
+
+uint Fairshare::currentLevel()
+{
+ if (limitReached()) {
+ return nextLevel();
+ } else {
+ return priority;
+ }
+}
+
+uint Fairshare::nextLevel()
+{
+ count = 1;
+ if (priority) --priority;
+ else priority = levels-1;
+ return priority;
+}
+
bool Fairshare::isNull()
{
for (int i = 0; i < levels; i++) if (limits[i]) return false;
return true;
}
-bool Fairshare::getState(qpid::framing::FieldTable& state) const
+bool Fairshare::getState(uint& p, uint& c) const
{
- for (int i = 0; i < levels; i++) {
- if (counts[i]) {
- std::string key = (boost::format("fairshare-count-%1%") % i).str();
- state.setInt(key, counts[i]);
- }
- }
+ p = priority;
+ c = count;
return true;
}
-bool Fairshare::checkLevel(uint level)
+bool Fairshare::setState(uint p, uint c)
{
- if (!limits[level] || counts[level] < limits[level]) {
- counts[level]++;
- return true;
- } else {
- return false;
- }
+ priority = p;
+ count = c;
+ return true;
}
-bool Fairshare::consume(QueuedMessage& message)
+bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages)
{
- for (Available::iterator i = available.begin(); i != available.end(); ++i) {
- QueuedMessage* next = *i;
- if (checkLevel(getPriorityLevel(*next))) {
- messages[next->position].status = QueuedMessage::ACQUIRED;
- message = *next;
- available.erase(i);
- return true;
- }
- }
- if (!available.empty()) {
- std::fill(counts.begin(), counts.end(), 0);//reset counts
- return consume(message);
- } else {
- return false;
- }
+ const uint start = p = currentLevel();
+ do {
+ if (!messages[p].empty()) return true;
+ } while ((p = nextLevel()) != start);
+ return false;
}
-bool Fairshare::getState(const Messages& m, qpid::framing::FieldTable& counts)
+
+bool Fairshare::getState(const Messages& m, uint& priority, uint& count)
{
const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m);
- return fairshare && fairshare->getState(counts);
+ return fairshare && fairshare->getState(priority, count);
}
-bool Fairshare::setState(Messages& m, const qpid::framing::FieldTable& counts)
+bool Fairshare::setState(Messages& m, uint priority, uint count)
{
Fairshare* fairshare = dynamic_cast<Fairshare*>(&m);
- return fairshare && fairshare->setState(counts);
+ return fairshare && fairshare->setState(priority, count);
}
int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys)
@@ -130,14 +136,7 @@ int getIntegerSettingForKey(const qpid::framing::FieldTable& settings, const std
{
return getIntegerSetting(settings, boost::assign::list_of<std::string>(key));
}
-bool Fairshare::setState(const qpid::framing::FieldTable& state)
-{
- for (int i = 0; i < levels; i++) {
- std::string key = (boost::format("fairshare-count-%1%") % i).str();
- counts[i] = state.isSet(key) ? getIntegerSettingForKey(state, key) : 0;
- }
- return true;
-}
+
int getSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys, int minvalue, int maxvalue)
{
return std::max(minvalue,std::min(getIntegerSetting(settings, keys), maxvalue));
diff --git a/qpid/cpp/src/qpid/broker/Fairshare.h b/qpid/cpp/src/qpid/broker/Fairshare.h
index dfcbdf280e..1b25721e0c 100644
--- a/qpid/cpp/src/qpid/broker/Fairshare.h
+++ b/qpid/cpp/src/qpid/broker/Fairshare.h
@@ -22,7 +22,6 @@
*
*/
#include "qpid/broker/PriorityQueue.h"
-#include <vector>
namespace qpid {
namespace framing {
@@ -39,19 +38,23 @@ class Fairshare : public PriorityQueue
{
public:
Fairshare(size_t levels, uint limit);
- bool getState(qpid::framing::FieldTable& counts) const;
- bool setState(const qpid::framing::FieldTable& counts);
+ bool getState(uint& priority, uint& count) const;
+ bool setState(uint priority, uint count);
void setLimit(size_t level, uint limit);
bool isNull();
- bool consume(QueuedMessage&);
static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings);
- static bool getState(const Messages&, qpid::framing::FieldTable& counts);
- static bool setState(Messages&, const qpid::framing::FieldTable& counts);
+ static bool getState(const Messages&, uint& priority, uint& count);
+ static bool setState(Messages&, uint priority, uint count);
private:
std::vector<uint> limits;
- std::vector<uint> counts;
- bool checkLevel(uint level);
+ uint priority;
+ uint count;
+
+ uint currentLevel();
+ uint nextLevel();
+ bool limitReached();
+ bool findFrontLevel(uint& p, PriorityLevels&);
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
index 709d99876b..f70c996975 100644
--- a/qpid/cpp/src/qpid/broker/MessageDeque.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
@@ -21,6 +21,7 @@
#include "qpid/broker/MessageDeque.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/log/Statement.h"
+#include "assert.h"
namespace qpid {
namespace broker {
@@ -39,7 +40,7 @@ size_t MessageDeque::index(const framing::SequenceNumber& position)
bool MessageDeque::deleted(const QueuedMessage& m)
{
size_t i = index(m.position);
- if (i < messages.size()) {
+ if (i < messages.size() && messages[i].status != QueuedMessage::DELETED) {
messages[i].status = QueuedMessage::DELETED;
clean();
return true;
@@ -53,7 +54,7 @@ size_t MessageDeque::size()
return available;
}
-void MessageDeque::release(const QueuedMessage& message)
+QueuedMessage* MessageDeque::releasePtr(const QueuedMessage& message)
{
size_t i = index(message.position);
if (i < messages.size()) {
@@ -62,12 +63,17 @@ void MessageDeque::release(const QueuedMessage& message)
if (head > i) head = i;
m.status = QueuedMessage::AVAILABLE;
++available;
+ return &messages[i];
}
} else {
+ assert(0);
QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")");
}
+ return 0;
}
+void MessageDeque::release(const QueuedMessage& message) { releasePtr(message); }
+
bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
if (position < messages.front().position) return false;
@@ -129,8 +135,7 @@ QueuedMessage padding(uint32_t pos) {
}
} // namespace
-bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
-{
+QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) {
//add padding to prevent gaps in sequence, which break the index
//calculation (needed for queue replication)
while (messages.size() && (added.position - messages.back().position) > 1)
@@ -139,7 +144,12 @@ bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*
messages.back().status = QueuedMessage::AVAILABLE;
if (head >= messages.size()) head = messages.size() - 1;
++available;
- return false;//adding a message never causes one to be removed for deque
+ return &messages.back();
+}
+
+bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) {
+ pushPtr(added);
+ return false; // adding a message never causes one to be removed for deque
}
void MessageDeque::updateAcquired(const QueuedMessage& acquired)
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.h b/qpid/cpp/src/qpid/broker/MessageDeque.h
index bb5943b09b..9b53716d4e 100644
--- a/qpid/cpp/src/qpid/broker/MessageDeque.h
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.h
@@ -48,6 +48,12 @@ class MessageDeque : public Messages
void foreach(Functor);
void removeIf(Predicate);
+ // For use by other Messages implementations that use MessageDeque as a FIFO index
+ // and keep pointers to its elements in their own indexing strctures.
+ void clean();
+ QueuedMessage* releasePtr(const QueuedMessage&);
+ QueuedMessage* pushPtr(const QueuedMessage& added);
+
private:
typedef std::deque<QueuedMessage> Deque;
Deque messages;
@@ -55,7 +61,6 @@ class MessageDeque : public Messages
size_t head;
size_t index(const framing::SequenceNumber&);
- void clean();
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
index da52675a29..ab5ec7235a 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -3,13 +3,13 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+ * regarding copyright ownersip. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,133 +22,124 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
#include <cmath>
namespace qpid {
namespace broker {
-PriorityQueue::PriorityQueue(int l) :
- levels(l) {}
+PriorityQueue::PriorityQueue(int l) :
+ levels(l),
+ messages(levels, Deque()),
+ frontLevel(0), haveFront(false), cached(false) {}
-bool PriorityQueue::deleted(const QueuedMessage& message)
-{
- Index::iterator i = messages.find(message.position);
- if (i != messages.end()) {
- //remove from available list if necessary
- if (i->second.status == QueuedMessage::AVAILABLE) {
- Available::iterator j = std::find(available.begin(), available.end(), &i->second);
- if (j != available.end()) available.erase(j);
- }
- //remove from messages map
- messages.erase(i);
- return true;
- } else {
- return false;
- }
+bool PriorityQueue::deleted(const QueuedMessage& qm) {
+ bool deleted = fifo.deleted(qm);
+ if (deleted) erase(qm);
+ return deleted;
}
size_t PriorityQueue::size()
{
- return available.size();
+ return fifo.size();
+}
+
+namespace {
+bool before(QueuedMessage* a, QueuedMessage* b) { return *a < *b; }
}
void PriorityQueue::release(const QueuedMessage& message)
{
- Index::iterator i = messages.find(message.position);
- if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
- i->second.status = QueuedMessage::AVAILABLE;
- //insert message back into the correct place in available queue, based on priority:
- Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2));
- available.insert(j, &i->second);
+ QueuedMessage* qm = fifo.releasePtr(message);
+ if (qm) {
+ uint p = getPriorityLevel(message);
+ messages[p].insert(
+ lower_bound(messages[p].begin(), messages[p].end(), qm, before), qm);
+ clearCache();
+ }
+}
+
+
+void PriorityQueue::erase(const QueuedMessage& qm) {
+ size_t i = getPriorityLevel(qm);
+ if (!messages[i].empty()) {
+ long diff = qm.position.getValue() - messages[i].front()->position.getValue();
+ if (diff < 0) return;
+ long maxEnd = std::min(size_t(diff), messages[i].size());
+ QueuedMessage mutableQm = qm; // need non-const qm for lower_bound
+ Deque::iterator l =
+ lower_bound(messages[i].begin(),messages[i].begin()+maxEnd, &mutableQm, before);
+ if (l != messages[i].end() && (*l)->position == qm.position) {
+ messages[i].erase(l);
+ clearCache();
+ return;
+ }
}
}
bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
- Index::iterator i = messages.find(position);
- if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
- i->second.status = QueuedMessage::ACQUIRED;
- message = i->second;
- //remove it from available list (could make this faster by using ordering):
- Available::iterator j = std::find(available.begin(), available.end(), &i->second);
- assert(j != available.end());
- available.erase(j);
- return true;
- } else {
- return false;
- }
+ bool acquired = fifo.acquire(position, message);
+ if (acquired) erase(message); // No longer available
+ return acquired;
}
bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
{
- Index::iterator i = messages.find(position);
- if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
- message = i->second;
- return true;
- } else {
- return false;
- }
+ return fifo.find(position, message);
}
-bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
+bool PriorityQueue::browse(
+ const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
{
- Index::iterator i = messages.lower_bound(position+1);
- if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
- message = i->second;
- return true;
- } else {
- return false;
- }
+ return fifo.browse(position, message, unacquired);
}
bool PriorityQueue::consume(QueuedMessage& message)
{
- if (!available.empty()) {
- QueuedMessage* next = available.front();
- messages[next->position].status = QueuedMessage::ACQUIRED;
- message = *next;
- available.pop_front();
+ if (checkFront()) {
+ QueuedMessage* pm = messages[frontLevel].front();
+ messages[frontLevel].pop_front();
+ clearCache();
+ pm->status = QueuedMessage::ACQUIRED; // Updates FIFO index
+ message = *pm;
return true;
} else {
return false;
}
}
-bool PriorityQueue::compare(const QueuedMessage* a, const QueuedMessage* b) const
+bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
{
- int priorityA = getPriorityLevel(*a);
- int priorityB = getPriorityLevel(*b);
- if (priorityA == priorityB) return a->position < b->position;
- else return priorityA > priorityB;
+ QueuedMessage* qmp = fifo.pushPtr(added);
+ messages[getPriorityLevel(added)].push_back(qmp);
+ clearCache();
+ return false; // Adding a message never causes one to be removed for deque
}
-bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
-{
- Index::iterator i = messages.insert(Index::value_type(added.position, added)).first;
- i->second.status = QueuedMessage::AVAILABLE;
- //insert message into the correct place in available queue, based on priority:
- Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2));
- available.insert(j, &i->second);
- return false;//adding a message never causes one to be removed for deque
+void PriorityQueue::updateAcquired(const QueuedMessage& acquired) {
+ fifo.updateAcquired(acquired);
}
void PriorityQueue::foreach(Functor f)
{
- for (Available::iterator i = available.begin(); i != available.end(); ++i) {
- f(**i);
- }
+ fifo.foreach(f);
}
void PriorityQueue::removeIf(Predicate p)
{
- for (Available::iterator i = available.begin(); i != available.end();) {
- if (p(**i)) {
- messages[(*i)->position].status = QueuedMessage::REMOVED;
- i = available.erase(i);
- } else {
- ++i;
+ for (int priority = 0; priority < levels; ++priority) {
+ for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
+ if (p(**i)) {
+ (*i)->status = QueuedMessage::DELETED; // Updates fifo index
+ i = messages[priority].erase(i);
+ clearCache();
+ } else {
+ ++i;
+ }
}
}
+ fifo.clean();
}
uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
@@ -161,6 +152,30 @@ uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
return std::min(priority - firstLevel, (uint)levels-1);
}
+void PriorityQueue::clearCache()
+{
+ cached = false;
+}
+
+bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m)
+{
+ for (int p = levels-1; p >= 0; --p) {
+ if (!m[p].empty()) {
+ l = p;
+ return true;
+ }
+ }
+ return false;
+}
+
+bool PriorityQueue::checkFront()
+{
+ if (!cached) {
+ haveFront = findFrontLevel(frontLevel, messages);
+ cached = true;
+ }
+ return haveFront;
+}
uint PriorityQueue::getPriority(const QueuedMessage& message)
{
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h
index 590cf68003..8628745db1 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.h
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,10 +21,10 @@
* under the License.
*
*/
-#include "qpid/broker/Messages.h"
+#include "qpid/broker/MessageDeque.h"
#include "qpid/sys/IntegerTypes.h"
-#include <list>
-#include <map>
+#include <deque>
+#include <vector>
namespace qpid {
namespace broker {
@@ -32,7 +32,10 @@ namespace broker {
/**
* Basic priority queue with a configurable number of recognised
* priority levels. This is implemented as a separate deque per
- * priority level. Browsing is FIFO not priority order.
+ * priority level.
+ *
+ * Browsing is FIFO not priority order. There is a MessageDeque
+ * for fast browsing.
*/
class PriorityQueue : public Messages
{
@@ -46,22 +49,36 @@ class PriorityQueue : public Messages
bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
- virtual bool consume(QueuedMessage&);
+ bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
-
+ void updateAcquired(const QueuedMessage& acquired);
void foreach(Functor);
void removeIf(Predicate);
+
static uint getPriority(const QueuedMessage&);
+
protected:
- typedef std::list<QueuedMessage*> Available;
- typedef std::map<framing::SequenceNumber, QueuedMessage> Index;
+ typedef std::deque<QueuedMessage*> Deque;
+ typedef std::vector<Deque> PriorityLevels;
+ virtual bool findFrontLevel(uint& p, PriorityLevels&);
const int levels;
- Index messages;
- Available available;
- bool compare(const QueuedMessage* a, const QueuedMessage* b) const;
- uint getPriorityLevel(const QueuedMessage& m) const;
+ private:
+ /** Available messages separated by priority and sorted in priority order.
+ * Holds pointers to the QueuedMessages in fifo
+ */
+ PriorityLevels messages;
+ /** FIFO index of all messsagse (including acquired messages) for fast browsing and indexing */
+ MessageDeque fifo;
+ uint frontLevel;
+ bool haveFront;
+ bool cached;
+
+ void erase(const QueuedMessage&);
+ uint getPriorityLevel(const QueuedMessage&) const;
+ void clearCache();
+ bool checkFront();
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 4f72501d52..3d5a7be1c3 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -585,9 +585,9 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi
findQueue(qname)->setPosition(position);
}
-void Connection::queueFairshareState(const std::string& qname, const framing::FieldTable& counts)
+void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count)
{
- if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), counts)) {
+ if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) {
QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies.");
}
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 14f7fb2e1a..920c4937db 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -156,7 +156,7 @@ class Connection :
uint32_t credit);
void queuePosition(const std::string&, const framing::SequenceNumber&);
- void queueFairshareState(const std::string&, const framing::FieldTable& count);
+ void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
void txStart();
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 8460b7c7bb..3a3582d032 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -389,9 +389,9 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1));
ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition());
- qpid::framing::FieldTable counts;
- if (qpid::broker::Fairshare::getState(q->getMessages(), counts)) {
- ClusterConnectionProxy(s).queueFairshareState(q->getName(), counts);
+ uint priority, count;
+ if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
+ ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
}
ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge());
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 61b42ccc07..827cb7dca9 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -468,6 +468,7 @@ class ReplicationTests(BrokerTest):
s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
+
# FIXME aconway 2012-02-22: there is a bug in priority ring
# queues that allows a low priority message to displace a high
# one. The following commented-out assert_browse is for the
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 580451c5b5..7b3f2fe63b 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -304,7 +304,8 @@
<!-- Set the fairshare delivery related state of a replicated queue. -->
<control name="queue-fairshare-state" code="0x38">
<field name="queue" type="str8"/>
- <field name="counts" type="map"/>
+ <field name="position" type="uint8"/>
+ <field name="count" type="uint8"/>
</control>
<!-- Replicate a QueueObserver for a given queue. -->