summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/CMakeLists.txt5
-rw-r--r--cpp/src/Makefile.am11
-rw-r--r--cpp/src/qpid/broker/Fairshare.cpp156
-rw-r--r--cpp/src/qpid/broker/Fairshare.h61
-rw-r--r--cpp/src/qpid/broker/LegacyLVQ.cpp116
-rw-r--r--cpp/src/qpid/broker/LegacyLVQ.h59
-rw-r--r--cpp/src/qpid/broker/Message.cpp16
-rw-r--r--cpp/src/qpid/broker/Message.h5
-rw-r--r--cpp/src/qpid/broker/MessageDeque.cpp140
-rw-r--r--cpp/src/qpid/broker/MessageDeque.h61
-rw-r--r--cpp/src/qpid/broker/MessageMap.cpp166
-rw-r--r--cpp/src/qpid/broker/MessageMap.h72
-rw-r--r--cpp/src/qpid/broker/Messages.h117
-rw-r--r--cpp/src/qpid/broker/PriorityQueue.cpp211
-rw-r--r--cpp/src/qpid/broker/PriorityQueue.h77
-rw-r--r--cpp/src/qpid/broker/Queue.cpp318
-rw-r--r--cpp/src/qpid/broker/Queue.h27
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp6
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp8
-rw-r--r--cpp/src/qpid/cluster/Connection.h1
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp5
-rw-r--r--cpp/src/tests/cluster_test.cpp36
-rw-r--r--cpp/src/tests/qpid-receive.cpp1
-rw-r--r--cpp/src/tests/qpid-send.cpp6
24 files changed, 1422 insertions, 259 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 4ecbc53c6b..a709056899 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -956,6 +956,11 @@ set (qpidbroker_SOURCES
qpid/broker/Broker.cpp
qpid/broker/Exchange.cpp
qpid/broker/ExpiryPolicy.cpp
+ qpid/broker/Fairshare.cpp
+ qpid/broker/LegacyLVQ.cpp
+ qpid/broker/MessageDeque.cpp
+ qpid/broker/MessageMap.cpp
+ qpid/broker/PriorityQueue.cpp
qpid/broker/Queue.cpp
qpid/broker/QueueCleaner.cpp
qpid/broker/QueueListeners.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index cd30526675..c50fa5a6da 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -548,6 +548,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/ExchangeRegistry.h \
qpid/broker/ExpiryPolicy.cpp \
qpid/broker/ExpiryPolicy.h \
+ qpid/broker/Fairshare.h \
+ qpid/broker/Fairshare.cpp \
qpid/broker/FanOutExchange.cpp \
qpid/broker/FanOutExchange.h \
qpid/broker/FedOps.h \
@@ -556,6 +558,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/HeadersExchange.h \
qpid/broker/IncompleteMessageList.cpp \
qpid/broker/IncompleteMessageList.h \
+ qpid/broker/LegacyLVQ.h \
+ qpid/broker/LegacyLVQ.cpp \
qpid/broker/Link.cpp \
qpid/broker/Link.h \
qpid/broker/LinkRegistry.cpp \
@@ -566,9 +570,16 @@ libqpidbroker_la_SOURCES = \
qpid/broker/MessageAdapter.h \
qpid/broker/MessageBuilder.cpp \
qpid/broker/MessageBuilder.h \
+ qpid/broker/MessageDeque.h \
+ qpid/broker/MessageDeque.cpp \
+ qpid/broker/MessageMap.h \
+ qpid/broker/MessageMap.cpp \
+ qpid/broker/Messages.h \
qpid/broker/MessageStore.h \
qpid/broker/MessageStoreModule.cpp \
qpid/broker/MessageStoreModule.h \
+ qpid/broker/PriorityQueue.h \
+ qpid/broker/PriorityQueue.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
qpid/broker/NullMessageStore.cpp \
diff --git a/cpp/src/qpid/broker/Fairshare.cpp b/cpp/src/qpid/broker/Fairshare.cpp
new file mode 100644
index 0000000000..e6bbf86691
--- /dev/null
+++ b/cpp/src/qpid/broker/Fairshare.cpp
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Fairshare.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace broker {
+
+Fairshare::Fairshare(size_t levels, uint limit) :
+ PriorityQueue(levels),
+ limits(levels, limit), priority(levels-1), count(0) {}
+
+
+void Fairshare::setLimit(size_t level, uint limit)
+{
+ limits[level] = limit;
+}
+
+bool Fairshare::limitReached()
+{
+ uint l = limits[priority];
+ return l && ++count > l;
+}
+
+uint Fairshare::currentLevel()
+{
+ if (limitReached()) {
+ return nextLevel();
+ } else {
+ return priority;
+ }
+}
+
+uint Fairshare::nextLevel()
+{
+ count = 1;
+ if (priority) --priority;
+ else priority = levels-1;
+ return priority;
+}
+
+bool Fairshare::isNull()
+{
+ for (int i = 0; i < levels; i++) if (limits[i]) return false;
+ return true;
+}
+
+bool Fairshare::getState(uint& p, uint& c) const
+{
+ p = priority;
+ c = count;
+ return true;
+}
+
+bool Fairshare::setState(uint p, uint c)
+{
+ priority = p;
+ count = c;
+ return true;
+}
+
+bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages)
+{
+ const uint start = p = currentLevel();
+ do {
+ if (!messages[p].empty()) return true;
+ } while ((p = nextLevel()) != start);
+ return false;
+}
+
+
+
+bool Fairshare::getState(const Messages& m, uint& priority, uint& count)
+{
+ const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m);
+ return fairshare && fairshare->getState(priority, count);
+}
+
+bool Fairshare::setState(Messages& m, uint priority, uint count)
+{
+ Fairshare* fairshare = dynamic_cast<Fairshare*>(&m);
+ return fairshare && fairshare->setState(priority, count);
+}
+
+int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
+{
+ qpid::framing::FieldTable::ValuePtr v = settings.get(key);
+ if (!v) {
+ return 0;
+ } else if (v->convertsTo<int>()) {
+ return v->get<int>();
+ } else if (v->convertsTo<std::string>()){
+ std::string s = v->get<std::string>();
+ try {
+ return boost::lexical_cast<int>(s);
+ } catch(const boost::bad_lexical_cast&) {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
+ return 0;
+ }
+ } else {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v);
+ return 0;
+ }
+}
+
+int getSetting(const qpid::framing::FieldTable& settings, const std::string& key, int minvalue, int maxvalue)
+{
+ return std::max(minvalue,std::min(getIntegerSetting(settings, key), maxvalue));
+}
+
+std::auto_ptr<Messages> Fairshare::create(const qpid::framing::FieldTable& settings)
+{
+ std::auto_ptr<Messages> result;
+ size_t levels = getSetting(settings, "x-qpid-priorities", 1, 100);
+ if (levels) {
+ uint defaultLimit = getIntegerSetting(settings, "x-qpid-fairshare");
+ std::auto_ptr<Fairshare> fairshare(new Fairshare(levels, defaultLimit));
+ for (uint i = 0; i < levels; i++) {
+ std::string key = (boost::format("x-qpid-fairshare-%1%") % i).str();
+ if(settings.isSet(key)) {
+ fairshare->setLimit(i, getIntegerSetting(settings, key));
+ }
+ }
+
+ if (fairshare->isNull()) {
+ result = std::auto_ptr<Messages>(new PriorityQueue(levels));
+ } else {
+ result = fairshare;
+ }
+ }
+ return result;
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Fairshare.h b/cpp/src/qpid/broker/Fairshare.h
new file mode 100644
index 0000000000..6c4b87f857
--- /dev/null
+++ b/cpp/src/qpid/broker/Fairshare.h
@@ -0,0 +1,61 @@
+#ifndef QPID_BROKER_FAIRSHARE_H
+#define QPID_BROKER_FAIRSHARE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/PriorityQueue.h"
+
+namespace qpid {
+namespace framing {
+class FieldTable;
+}
+namespace broker {
+
+/**
+ * Modifies a basic prioirty queue by limiting the number of messages
+ * from each priority level that are dispatched before allowing
+ * dispatch from the next level.
+ */
+class Fairshare : public PriorityQueue
+{
+ public:
+ Fairshare(size_t levels, uint limit);
+ bool getState(uint& priority, uint& count) const;
+ bool setState(uint priority, uint count);
+ void setLimit(size_t level, uint limit);
+ static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings);
+ static bool getState(const Messages&, uint& priority, uint& count);
+ static bool setState(Messages&, uint priority, uint count);
+ private:
+ std::vector<uint> limits;
+
+ uint priority;
+ uint count;
+
+ uint currentLevel();
+ uint nextLevel();
+ bool isNull();
+ bool limitReached();
+ bool findFrontLevel(uint& p, PriorityLevels&);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_FAIRSHARE_H*/
diff --git a/cpp/src/qpid/broker/LegacyLVQ.cpp b/cpp/src/qpid/broker/LegacyLVQ.cpp
new file mode 100644
index 0000000000..a811a86492
--- /dev/null
+++ b/cpp/src/qpid/broker/LegacyLVQ.cpp
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/LegacyLVQ.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/QueuedMessage.h"
+
+namespace qpid {
+namespace broker {
+
+LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {}
+
+void LegacyLVQ::setNoBrowse(bool b)
+{
+ noBrowse = b;
+}
+
+bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ Ordering::iterator i = messages.find(position);
+ if (i != messages.end() && i->second.payload == message.payload) {
+ message = i->second;
+ erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ if (MessageMap::next(position, message)) {
+ if (!noBrowse) index.erase(getKey(message));
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed)
+{
+ //Hack to disable LVQ behaviour on cluster update:
+ if (broker && broker->isClusterUpdatee()) {
+ messages[added.position] = added;
+ return false;
+ } else {
+ return MessageMap::push(added, removed);
+ }
+}
+
+const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update)
+{
+ //add the new message into the original position of the replaced message
+ Ordering::iterator i = messages.find(original.position);
+ i->second = update;
+ i->second.position = original.position;
+ return i->second;
+}
+
+void LegacyLVQ::removeIf(Predicate p)
+{
+ //Note: This method is currently called periodically on the timer
+ //thread to expire messages. In a clustered broker this means that
+ //the purging does not occur on the cluster event dispatch thread
+ //and consequently that is not totally ordered w.r.t other events
+ //(including publication of messages). The cluster does ensure
+ //that the actual expiration of messages (as distinct from the
+ //removing of those expired messages from the queue) *is*
+ //consistently ordered w.r.t. cluster events. This means that
+ //delivery of messages is in general consistent across the cluster
+ //inspite of any non-determinism in the triggering of a
+ //purge. However at present purging a last value queue (of the
+ //legacy sort) could potentially cause inconsistencies in the
+ //cluster (as the order w.r.t publications can affect the order in
+ //which messages appear in the queue). Consequently periodic
+ //purging of an LVQ is not enabled if the broker is clustered
+ //(expired messages will be removed on delivery and consolidated
+ //by key as part of normal LVQ operation).
+
+ //TODO: Is there a neater way to check whether broker is
+ //clustered? Here we assume that if the clustered timer is the
+ //same as the regular timer, we are not clustered:
+ if (!broker || &(broker->getClusterTimer()) == &(broker->getTimer()))
+ MessageMap::removeIf(p);
+}
+
+std::auto_ptr<Messages> LegacyLVQ::updateOrReplace(std::auto_ptr<Messages> current,
+ const std::string& key, bool noBrowse, Broker* broker)
+{
+ LegacyLVQ* lvq = dynamic_cast<LegacyLVQ*>(current.get());
+ if (lvq) {
+ lvq->setNoBrowse(noBrowse);
+ return current;
+ } else {
+ return std::auto_ptr<Messages>(new LegacyLVQ(key, noBrowse, broker));
+ }
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/LegacyLVQ.h b/cpp/src/qpid/broker/LegacyLVQ.h
new file mode 100644
index 0000000000..dd0fd7aaec
--- /dev/null
+++ b/cpp/src/qpid/broker/LegacyLVQ.h
@@ -0,0 +1,59 @@
+#ifndef QPID_BROKER_LEGACYLVQ_H
+#define QPID_BROKER_LEGACYLVQ_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/MessageMap.h"
+#include <memory>
+
+namespace qpid {
+namespace broker {
+class Broker;
+
+/**
+ * This class encapsulates the behaviour of the old style LVQ where a
+ * message replacing another messages for the given key will use the
+ * position in the queue of the previous message. This however causes
+ * problems for browsing. Either browsers stop the coalescing of
+ * messages by key (default) or they may mis updates (if the no-browse
+ * option is specified).
+ */
+class LegacyLVQ : public MessageMap
+{
+ public:
+ LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
+ bool remove(const framing::SequenceNumber&, QueuedMessage&);
+ bool next(const framing::SequenceNumber&, QueuedMessage&);
+ bool push(const QueuedMessage& added, QueuedMessage& removed);
+ void removeIf(Predicate);
+ void setNoBrowse(bool);
+ static std::auto_ptr<Messages> updateOrReplace(std::auto_ptr<Messages> current,
+ const std::string& key, bool noBrowse,
+ Broker* broker);
+ protected:
+ bool noBrowse;
+ Broker* broker;
+
+ const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_LEGACYLVQ_H*/
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 147b9e7a6a..c589669e5a 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -399,22 +399,6 @@ bool Message::hasExpired()
return expiryPolicy && expiryPolicy->hasExpired(*this);
}
-boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const
-{
- sys::Mutex::ScopedLock l(lock);
- Replacement::iterator i = replacement.find(qfor);
- if (i != replacement.end()){
- return i->second;
- }
- return empty;
-}
-
-void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor)
-{
- sys::Mutex::ScopedLock l(lock);
- replacement[qfor] = msg;
-}
-
namespace {
struct ScopedSet {
sys::Monitor& lock;
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index ee80657f39..f7dd2734b6 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -153,8 +153,6 @@ public:
void forcePersistent();
bool isForcedPersistent();
- boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const;
- void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor);
/** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */
void setEnqueueCompleteCallback(MessageCallback& cb);
@@ -167,8 +165,6 @@ public:
uint8_t getPriority() const;
private:
- typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
-
MessageAdapter& getAdapter() const;
void allEnqueuesComplete();
void allDequeuesComplete();
@@ -188,7 +184,6 @@ public:
static TransferAdapter TRANSFER;
- mutable Replacement replacement;
mutable boost::intrusive_ptr<Message> empty;
sys::Monitor callbackLock;
diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp
new file mode 100644
index 0000000000..24b8f6f895
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageDeque.cpp
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/QueuedMessage.h"
+
+namespace qpid {
+namespace broker {
+
+size_t MessageDeque::size()
+{
+ return messages.size();
+}
+
+bool MessageDeque::empty()
+{
+ return messages.empty();
+}
+
+void MessageDeque::reinsert(const QueuedMessage& message)
+{
+ messages.insert(lower_bound(messages.begin(), messages.end(), message), message);
+}
+
+MessageDeque::Deque::iterator MessageDeque::seek(const framing::SequenceNumber& position)
+{
+ if (!messages.empty()) {
+ QueuedMessage comp;
+ comp.position = position;
+ unsigned long diff = position.getValue() - messages.front().position.getValue();
+ long maxEnd = diff < messages.size()? diff : messages.size();
+ return lower_bound(messages.begin(),messages.begin()+maxEnd,comp);
+ } else {
+ return messages.end();
+ }
+}
+
+bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
+{
+ Deque::iterator i = seek(position);
+ if (i != messages.end() && i->position == position) {
+ message = *i;
+ if (remove) messages.erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool MessageDeque::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ return find(position, message, true);
+}
+
+bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ return find(position, message, false);
+}
+
+bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ if (messages.empty()) {
+ return false;
+ } else if (position < front().position) {
+ message = front();
+ return true;
+ } else {
+ Deque::iterator i = seek(position+1);
+ if (i != messages.end()) {
+ message = *i;
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
+
+QueuedMessage& MessageDeque::front()
+{
+ return messages.front();
+}
+
+void MessageDeque::pop()
+{
+ if (!messages.empty()) {
+ messages.pop_front();
+ }
+}
+
+bool MessageDeque::pop(QueuedMessage& out)
+{
+ if (messages.empty()) {
+ return false;
+ } else {
+ out = front();
+ messages.pop_front();
+ return true;
+ }
+}
+
+bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
+{
+ messages.push_back(added);
+ return false;//adding a message never causes one to be removed for deque
+}
+
+void MessageDeque::foreach(Functor f)
+{
+ std::for_each(messages.begin(), messages.end(), f);
+}
+
+void MessageDeque::removeIf(Predicate p)
+{
+ for (Deque::iterator i = messages.begin(); i != messages.end();) {
+ if (p(*i)) {
+ i = messages.erase(i);
+ } else {
+ ++i;
+ }
+ }
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/MessageDeque.h b/cpp/src/qpid/broker/MessageDeque.h
new file mode 100644
index 0000000000..eb074fc8dc
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageDeque.h
@@ -0,0 +1,61 @@
+#ifndef QPID_BROKER_MESSAGEDEQUE_H
+#define QPID_BROKER_MESSAGEDEQUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Messages.h"
+#include <deque>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Provides the standard FIFO queue behaviour.
+ */
+class MessageDeque : public Messages
+{
+ public:
+ size_t size();
+ bool empty();
+
+ void reinsert(const QueuedMessage&);
+ bool remove(const framing::SequenceNumber&, QueuedMessage&);
+ bool find(const framing::SequenceNumber&, QueuedMessage&);
+ bool next(const framing::SequenceNumber&, QueuedMessage&);
+
+ QueuedMessage& front();
+ void pop();
+ bool pop(QueuedMessage&);
+ bool push(const QueuedMessage& added, QueuedMessage& removed);
+
+ void foreach(Functor);
+ void removeIf(Predicate);
+
+ private:
+ typedef std::deque<QueuedMessage> Deque;
+ Deque messages;
+
+ Deque::iterator seek(const framing::SequenceNumber&);
+ bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_MESSAGEDEQUE_H*/
diff --git a/cpp/src/qpid/broker/MessageMap.cpp b/cpp/src/qpid/broker/MessageMap.cpp
new file mode 100644
index 0000000000..39e23df533
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageMap.cpp
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/MessageMap.h"
+#include "qpid/broker/QueuedMessage.h"
+
+namespace qpid {
+namespace broker {
+namespace {
+const std::string EMPTY;
+}
+
+std::string MessageMap::getKey(const QueuedMessage& message)
+{
+ const framing::FieldTable* ft = message.payload->getApplicationHeaders();
+ if (ft) return ft->getAsString(key);
+ else return EMPTY;
+}
+
+size_t MessageMap::size()
+{
+ return messages.size();
+}
+
+bool MessageMap::empty()
+{
+ return messages.empty();
+}
+
+void MessageMap::reinsert(const QueuedMessage& message)
+{
+ std::string key = getKey(message);
+ Index::iterator i = index.find(key);
+ if (i == index.end()) {
+ index[key] = message;
+ messages[message.position] = message;
+ } //else message has already been replaced
+}
+
+bool MessageMap::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ Ordering::iterator i = messages.find(position);
+ if (i != messages.end()) {
+ message = i->second;
+ erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ Ordering::iterator i = messages.find(position);
+ if (i != messages.end()) {
+ message = i->second;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool MessageMap::next(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ if (!messages.empty() && position < front().position) {
+ message = front();
+ return true;
+ } else {
+ Ordering::iterator i = messages.lower_bound(position+1);
+ if (i != messages.end()) {
+ message = i->second;
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
+
+QueuedMessage& MessageMap::front()
+{
+ return messages.begin()->second;
+}
+
+void MessageMap::pop()
+{
+ QueuedMessage dummy;
+ pop(dummy);
+}
+
+bool MessageMap::pop(QueuedMessage& out)
+{
+ Ordering::iterator i = messages.begin();
+ if (i != messages.end()) {
+ out = i->second;
+ erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update)
+{
+ messages.erase(original.position);
+ messages[update.position] = update;
+ return update;
+}
+
+bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
+{
+ std::pair<Index::iterator, bool> result = index.insert(Index::value_type(getKey(added), added));
+ if (result.second) {
+ //there was no previous message for this key; nothing needs to
+ //be removed, just add the message into its correct position
+ messages[added.position] = added;
+ return false;
+ } else {
+ //there is already a message with that key which needs to be replaced
+ removed = result.first->second;
+ result.first->second = replace(result.first->second, added);
+ return true;
+ }
+}
+
+void MessageMap::foreach(Functor f)
+{
+ for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+ f(i->second);
+ }
+}
+
+void MessageMap::removeIf(Predicate p)
+{
+ for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) {
+ if (p(i->second)) {
+ erase(i);
+ }
+ }
+}
+
+void MessageMap::erase(Ordering::iterator i)
+{
+ index.erase(getKey(i->second));
+ messages.erase(i);
+}
+
+MessageMap::MessageMap(const std::string& k) : key(k) {}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/MessageMap.h b/cpp/src/qpid/broker/MessageMap.h
new file mode 100644
index 0000000000..1128a1d54a
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageMap.h
@@ -0,0 +1,72 @@
+#ifndef QPID_BROKER_MESSAGEMAP_H
+#define QPID_BROKER_MESSAGEMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Messages.h"
+#include "qpid/framing/SequenceNumber.h"
+#include <map>
+#include <string>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Provides a last value queue behaviour, whereby a messages replace
+ * any previous message with the same value for a defined property
+ * (i.e. the key).
+ */
+class MessageMap : public Messages
+{
+ public:
+ MessageMap(const std::string& key);
+ virtual ~MessageMap() {}
+
+ size_t size();
+ bool empty();
+
+ void reinsert(const QueuedMessage&);
+ virtual bool remove(const framing::SequenceNumber&, QueuedMessage&);
+ bool find(const framing::SequenceNumber&, QueuedMessage&);
+ virtual bool next(const framing::SequenceNumber&, QueuedMessage&);
+
+ QueuedMessage& front();
+ void pop();
+ bool pop(QueuedMessage&);
+ virtual bool push(const QueuedMessage& added, QueuedMessage& removed);
+
+ void foreach(Functor);
+ virtual void removeIf(Predicate);
+
+ protected:
+ typedef std::map<std::string, QueuedMessage> Index;
+ typedef std::map<framing::SequenceNumber, QueuedMessage> Ordering;
+ const std::string key;
+ Index index;
+ Ordering messages;
+
+ std::string getKey(const QueuedMessage&);
+ virtual const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&);
+ void erase(Ordering::iterator);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_MESSAGEMAP_H*/
diff --git a/cpp/src/qpid/broker/Messages.h b/cpp/src/qpid/broker/Messages.h
new file mode 100644
index 0000000000..0d75417640
--- /dev/null
+++ b/cpp/src/qpid/broker/Messages.h
@@ -0,0 +1,117 @@
+#ifndef QPID_BROKER_MESSAGES_H
+#define QPID_BROKER_MESSAGES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace framing {
+class SequenceNumber;
+}
+namespace broker {
+struct QueuedMessage;
+
+/**
+ * This interface abstracts out the access to the messages held for
+ * delivery by a Queue instance.
+ */
+class Messages
+{
+ public:
+ typedef boost::function1<void, QueuedMessage&> Functor;
+ typedef boost::function1<bool, QueuedMessage&> Predicate;
+
+ virtual ~Messages() {}
+ /**
+ * @return the number of messages available for delivery.
+ */
+ virtual size_t size() = 0;
+ /**
+ * @return true if there are no messages for delivery, false otherwise
+ */
+ virtual bool empty() = 0;
+
+ /**
+ * Re-inserts a message back into its original position - used
+ * when requeing released messages.
+ */
+ virtual void reinsert(const QueuedMessage&) = 0;
+ /**
+ * Remove the message at the specified position, returning true if
+ * found, false otherwise. The removed message is passed back via
+ * the second parameter.
+ */
+ virtual bool remove(const framing::SequenceNumber&, QueuedMessage&) = 0;
+ /**
+ * Find the message at the specified position, returning true if
+ * found, false otherwise. The matched message is passed back via
+ * the second parameter.
+ */
+ virtual bool find(const framing::SequenceNumber&, QueuedMessage&) = 0;
+ /**
+ * Return the next message to be given to a browsing subscrption
+ * that has reached the specified poisition. The next messages is
+ * passed back via the second parameter.
+ *
+ * @return true if there is another message, false otherwise.
+ */
+ virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0;
+
+ /**
+ * Note: Caller is responsible for ensuring that there is a front
+ * (e.g. empty() returns false)
+ *
+ * @return the next message to be delivered
+ */
+ virtual QueuedMessage& front() = 0;
+ /**
+ * Removes the front message
+ */
+ virtual void pop() = 0;
+ /**
+ * @return true if there is a mesage to be delivered - in which
+ * case that message will be returned via the parameter and
+ * removed - otherwise false.
+ */
+ virtual bool pop(QueuedMessage&) = 0;
+ /**
+ * Pushes a message to the back of the 'queue'. For some types of
+ * queue this may cause another message to be removed; if that is
+ * the case the method will return true and the removed message
+ * will be passed out via the second parameter.
+ */
+ virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0;
+
+ /**
+ * Apply the functor to each message held
+ */
+ virtual void foreach(Functor) = 0;
+ /**
+ * Remove every message held that for which the specified
+ * predicate returns true
+ */
+ virtual void removeIf(Predicate) = 0;
+ private:
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_MESSAGES_H*/
diff --git a/cpp/src/qpid/broker/PriorityQueue.cpp b/cpp/src/qpid/broker/PriorityQueue.cpp
new file mode 100644
index 0000000000..0272e968ec
--- /dev/null
+++ b/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -0,0 +1,211 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/PriorityQueue.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace broker {
+
+PriorityQueue::PriorityQueue(int l) :
+ levels(l),
+ messages(levels, Deque()),
+ frontLevel(0), haveFront(false), cached(false) {}
+
+size_t PriorityQueue::size()
+{
+ size_t total(0);
+ for (int i = 0; i < levels; ++i) {
+ total += messages[i].size();
+ }
+ return total;
+}
+
+bool PriorityQueue::empty()
+{
+ for (int i = 0; i < levels; ++i) {
+ if (!messages[i].empty()) return false;
+ }
+ return true;
+}
+
+void PriorityQueue::reinsert(const QueuedMessage& message)
+{
+ uint p = getPriorityLevel(message);
+ messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message);
+ clearCache();
+}
+
+bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
+{
+ QueuedMessage comp;
+ comp.position = position;
+ for (int i = 0; i < levels; ++i) {
+ if (!messages[i].empty()) {
+ unsigned long diff = position.getValue() - messages[i].front().position.getValue();
+ long maxEnd = diff < messages[i].size() ? diff : messages[i].size();
+ Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp);
+ if (l != messages[i].end() && l->position == position) {
+ message = *l;
+ if (remove) {
+ messages[i].erase(l);
+ clearCache();
+ }
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+bool PriorityQueue::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ return find(position, message, true);
+}
+
+bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ return find(position, message, false);
+}
+
+bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ QueuedMessage match;
+ match.position = position+1;
+ Deque::iterator lowest;
+ bool found = false;
+ for (int i = 0; i < levels; ++i) {
+ Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match);
+ if (m != messages[i].end()) {
+ if (m->position == match.position) {
+ message = *m;
+ return true;
+ } else if (!found || m->position < lowest->position) {
+ lowest = m;
+ found = true;
+ }
+ }
+ }
+ if (found) {
+ message = *lowest;
+ }
+ return found;
+}
+
+QueuedMessage& PriorityQueue::front()
+{
+ if (checkFront()) {
+ return messages[frontLevel].front();
+ } else {
+ throw qpid::framing::InternalErrorException(QPID_MSG("No message available"));
+ }
+}
+
+bool PriorityQueue::pop(QueuedMessage& message)
+{
+ if (checkFront()) {
+ message = messages[frontLevel].front();
+ messages[frontLevel].pop_front();
+ clearCache();
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void PriorityQueue::pop()
+{
+ QueuedMessage dummy;
+ pop(dummy);
+}
+
+bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
+{
+ messages[getPriorityLevel(added)].push_back(added);
+ clearCache();
+ return false;//adding a message never causes one to be removed for deque
+}
+
+void PriorityQueue::foreach(Functor f)
+{
+ for (int i = 0; i < levels; ++i) {
+ std::for_each(messages[i].begin(), messages[i].end(), f);
+ }
+}
+
+void PriorityQueue::removeIf(Predicate p)
+{
+ for (int priority = 0; priority < levels; ++priority) {
+ for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
+ if (p(*i)) {
+ i = messages[priority].erase(i);
+ clearCache();
+ } else {
+ ++i;
+ }
+ }
+ }
+}
+
+uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
+{
+ uint priority = m.payload->getPriority();
+ //Use AMQP 0-10 approach to mapping priorities to a fixed level
+ //(see rule priority-level-implementation)
+ const uint firstLevel = 5 - std::min(5.0, ceil((double) levels/2.0));
+ if (priority <= firstLevel) return 0;
+ return std::min(priority - firstLevel, (uint)levels-1);
+}
+
+void PriorityQueue::clearCache()
+{
+ cached = false;
+}
+
+bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m)
+{
+ for (int p = levels-1; p >= 0; --p) {
+ if (!m[p].empty()) {
+ l = p;
+ return true;
+ }
+ }
+ return false;
+}
+
+bool PriorityQueue::checkFront()
+{
+ if (!cached) {
+ haveFront = findFrontLevel(frontLevel, messages);
+ cached = true;
+ }
+ return haveFront;
+}
+
+uint PriorityQueue::getPriority(const QueuedMessage& message)
+{
+ const PriorityQueue* queue = dynamic_cast<const PriorityQueue*>(&(message.queue->getMessages()));
+ if (queue) return queue->getPriorityLevel(message);
+ else return 0;
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PriorityQueue.h b/cpp/src/qpid/broker/PriorityQueue.h
new file mode 100644
index 0000000000..7e97d929fb
--- /dev/null
+++ b/cpp/src/qpid/broker/PriorityQueue.h
@@ -0,0 +1,77 @@
+#ifndef QPID_BROKER_PRIORITYQUEUE_H
+#define QPID_BROKER_PRIORITYQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Messages.h"
+#include <deque>
+#include <vector>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Basic priority queue with a configurable number of recognised
+ * priority levels. This is implemented as a separate deque per
+ * priority level. Browsing is FIFO not priority order.
+ */
+class PriorityQueue : public Messages
+{
+ public:
+ PriorityQueue(int levels);
+ virtual ~PriorityQueue() {}
+ size_t size();
+ bool empty();
+
+ void reinsert(const QueuedMessage&);
+ bool remove(const framing::SequenceNumber&, QueuedMessage&);
+ bool find(const framing::SequenceNumber&, QueuedMessage&);
+ bool next(const framing::SequenceNumber&, QueuedMessage&);
+
+ QueuedMessage& front();
+ void pop();
+ bool pop(QueuedMessage&);
+ bool push(const QueuedMessage& added, QueuedMessage& removed);
+
+ void foreach(Functor);
+ void removeIf(Predicate);
+ static uint getPriority(const QueuedMessage&);
+ protected:
+ typedef std::deque<QueuedMessage> Deque;
+ typedef std::vector<Deque> PriorityLevels;
+ virtual bool findFrontLevel(uint& p, PriorityLevels&);
+
+ const int levels;
+ private:
+ PriorityLevels messages;
+ uint frontLevel;
+ bool haveFront;
+ bool cached;
+
+ bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove);
+ uint getPriorityLevel(const QueuedMessage&) const;
+ void clearCache();
+ bool checkFront();
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_PRIORITYQUEUE_H*/
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index e59857462c..43d1a2b27c 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -23,7 +23,11 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Fairshare.h"
#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/LegacyLVQ.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/MessageMap.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
@@ -66,6 +70,7 @@ const std::string qpidMaxCount("qpid.max_count");
const std::string qpidNoLocal("no-local");
const std::string qpidTraceIdentity("qpid.trace.id");
const std::string qpidTraceExclude("qpid.trace.exclude");
+const std::string qpidLastValueQueueKey("qpid.last_value_queue_key");
const std::string qpidLastValueQueue("qpid.last_value_queue");
const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
const std::string qpidPersistLastNode("qpid.persist_last_node");
@@ -92,10 +97,9 @@ Queue::Queue(const string& _name, bool _autodelete,
consumerCount(0),
exclusive(0),
noLocal(false),
- lastValueQueue(false),
- lastValueQueueNoBrowse(false),
persistLastNode(false),
inLastNodeFailure(false),
+ messages(new MessageDeque()),
persistenceId(0),
policyExceeded(false),
mgmtObject(0),
@@ -212,7 +216,7 @@ void Queue::requeue(const QueuedMessage& msg){
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
- messages.insert(lower_bound(messages.begin(), messages.end(), msg), msg);
+ messages->reinsert(msg);
listeners.populate(copy);
// for persistLastNode - don't force a message twice to disk, but force it if no force before
@@ -227,57 +231,23 @@ void Queue::requeue(const QueuedMessage& msg){
copy.notify();
}
-void Queue::clearLVQIndex(const QueuedMessage& msg){
- assertClusterSafe();
- const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0;
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
- lvq.erase(key);
- }
-}
-
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
-
- Messages::iterator i = findAt(position);
- if (i != messages.end() ) {
- message = *i;
- if (lastValueQueue) {
- clearLVQIndex(*i);
- }
- QPID_LOG(debug,
- "Acquired message at " << i->position << " from " << name);
- messages.erase(i);
+ if (messages->remove(position, message)) {
+ QPID_LOG(debug, "Acquired message at " << position << " from " << name);
return true;
- }
- QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
- return false;
+ } else {
+ QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
+ return false;
+ }
}
bool Queue::acquire(const QueuedMessage& msg) {
- Mutex::ScopedLock locker(messageLock);
- assertClusterSafe();
-
- QPID_LOG(debug, "attempting to acquire " << msg.position);
- Messages::iterator i = findAt(msg.position);
- if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
- (!lastValueQueue ||
- (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
- ) {
-
- clearLVQIndex(msg);
- QPID_LOG(debug,
- "Match found, acquire succeeded: " <<
- i->position << " == " << msg.position);
- messages.erase(i);
- return true;
- }
-
- QPID_LOG(debug, "Acquire failed for " << msg.position);
- return false;
+ QueuedMessage copy = msg;
+ return acquireMessageAt(msg.position, copy);
}
void Queue::notifyListener()
@@ -286,7 +256,7 @@ void Queue::notifyListener()
QueueListeners::NotificationSet set;
{
Mutex::ScopedLock locker(messageLock);
- if (messages.size()) {
+ if (messages->size()) {
listeners.populate(set);
}
}
@@ -315,12 +285,12 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
{
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
+ if (messages->empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
} else {
- QueuedMessage msg = getFront();
+ QueuedMessage msg = messages->front();
if (msg.payload->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
popAndDequeue();
@@ -330,7 +300,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
- popMsg(msg);
+ pop();
return CONSUMED;
} else {
//message(s) are available but consumer hasn't got enough credit
@@ -356,11 +326,6 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
//consumer wants the message
c->position = msg.position;
m = msg;
- if (!lastValueQueueNoBrowse) clearLVQIndex(msg);
- if (lastValueQueue) {
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) m.payload = replacement;
- }
return true;
} else {
//browser hasn't got enough credit for the message
@@ -382,7 +347,7 @@ void Queue::removeListener(Consumer::shared_ptr c)
{
Mutex::ScopedLock locker(messageLock);
listeners.removeListener(c);
- if (messages.size()) {
+ if (messages->size()) {
listeners.populate(set);
}
}
@@ -403,52 +368,20 @@ bool Queue::dispatch(Consumer::shared_ptr c)
// Find the next message
bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Mutex::ScopedLock locker(messageLock);
- if (!messages.empty() && messages.back().position > c->position) {
- if (c->position < getFront().position) {
- msg = getFront();
- return true;
- } else {
- Messages::iterator pos = findAt(c->position);
- if (pos != messages.end() && pos+1 != messages.end()) {
- msg = *(pos+1);
- return true;
- }
- }
+ if (messages->next(c->position, msg)) {
+ return true;
+ } else {
+ listeners.addListener(c);
+ return false;
}
- listeners.addListener(c);
- return false;
}
-Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
-
- if(!messages.empty()){
- QueuedMessage compM;
- compM.position = pos;
- unsigned long diff = pos.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
-
- Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
- if (i!= messages.end() && i->position == pos)
- return i;
- }
- return messages.end(); // no match found.
-}
-
-
QueuedMessage Queue::find(SequenceNumber pos) const {
Mutex::ScopedLock locker(messageLock);
- if(!messages.empty()){
- QueuedMessage compM;
- compM.position = pos;
- unsigned long diff = pos.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
-
- Messages::const_iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
- if (i != messages.end())
- return *i;
- }
- return QueuedMessage();
+ QueuedMessage msg;
+ messages->find(pos, msg);
+ return msg;
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
@@ -482,12 +415,18 @@ void Queue::cancel(Consumer::shared_ptr c){
QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
+ messages->pop(msg);
+ return msg;
+}
- if(!messages.empty()){
- msg = getFront();
- popMsg(msg);
+bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message)
+{
+ if (message.payload->hasExpired()) {
+ expired.push_back(message);
+ return true;
+ } else {
+ return false;
}
- return msg;
}
void Queue::purgeExpired()
@@ -496,37 +435,11 @@ void Queue::purgeExpired()
//bother explicitly expiring if the rate of dequeues since last
//attempt is less than one per second.
- //Note: This method is currently called periodically on the timer
- //thread. In a clustered broker this means that the purging does
- //not occur on the cluster event dispatch thread and consequently
- //that is not totally ordered w.r.t other events (including
- //publication of messages). However the cluster does ensure that
- //the actual expiration of messages (as distinct from the removing
- //of those expired messages from the queue) *is* consistently
- //ordered w.r.t. cluster events. This means that delivery of
- //messages is in general consistent across the cluster inspite of
- //any non-determinism in the triggering of a purge. However at
- //present purging a last value queue could potentially cause
- //inconsistencies in the cluster (as the order w.r.t publications
- //can affect the order in which messages appear in the
- //queue). Consequently periodic purging of an LVQ is not enabled
- //(expired messages will be removed on delivery and consolidated
- //by key as part of normal LVQ operation).
-
- if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) {
- Messages expired;
+ if (dequeueTracker.sampleRatePerSecond() < 1) {
+ std::deque<QueuedMessage> expired;
{
Mutex::ScopedLock locker(messageLock);
- for (Messages::iterator i = messages.begin(); i != messages.end();) {
- //Re-introduce management of LVQ-specific state here
- //if purging is renabled for that case (see note above)
- if (i->payload->hasExpired()) {
- expired.push_back(*i);
- i = messages.erase(i);
- } else {
- ++i;
- }
- }
+ messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
}
for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
@@ -552,13 +465,13 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
uint32_t count = 0;
// Either purge them all or just the some (purge_count) while the queue isn't empty.
- while((!purge_request || purge_count--) && !messages.empty()) {
+ while((!purge_request || purge_count--) && !messages->empty()) {
if (dest.get()) {
//
// If there is a destination exchange, stage the messages onto a reroute queue
// so they don't wind up getting purged more than once.
//
- DeliverableMessage msg(getFront().payload);
+ DeliverableMessage msg(messages->front().payload);
rerouteQueue.push_back(msg);
}
popAndDequeue();
@@ -584,64 +497,37 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
uint32_t move_count = qty; // only comes into play if qty >0
uint32_t count = 0; // count how many were moved for returning
- while((!qty || move_count--) && !messages.empty()) {
- QueuedMessage qmsg = getFront();
+ while((!qty || move_count--) && !messages->empty()) {
+ QueuedMessage qmsg = messages->front();
boost::intrusive_ptr<Message> msg = qmsg.payload;
destq->deliver(msg); // deliver message to the destination queue
- popMsg(qmsg);
+ pop();
dequeue(0, qmsg);
count++;
}
return count;
}
-void Queue::popMsg(QueuedMessage& qmsg)
+void Queue::pop()
{
assertClusterSafe();
- const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
- lvq.erase(key);
- }
- messages.pop_front();
+ messages->pop();
++dequeueTracker;
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
QueueListeners::NotificationSet copy;
+ QueuedMessage removed;
+ bool dequeueRequired = false;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
- LVQ::iterator i;
- const framing::FieldTable* ft = msg->getApplicationHeaders();
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
-
- i = lvq.find(key);
- if (i == lvq.end() || (broker && broker->isClusterUpdatee())) {
- messages.push_back(qm);
- listeners.populate(copy);
- lvq[key] = msg;
- }else {
- boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
- if (!old) old = i->second;
- i->second->setReplacementMessage(msg,this);
- if (isRecovery) {
- //can't issue new requests for the store until
- //recovery is complete
- pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
- } else {
- Mutex::ScopedUnlock u(messageLock);
- dequeue(0, QueuedMessage(qm.queue, old, qm.position));
- }
- }
- }else {
- messages.push_back(qm);
- listeners.populate(copy);
- }
+ dequeueRequired = messages->push(qm, removed);
+ listeners.populate(copy);
+
if (eventMode) {
if (eventMgr) eventMgr->enqueued(qm);
else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
@@ -651,32 +537,20 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
}
}
copy.notify();
-}
-
-QueuedMessage Queue::getFront()
-{
- QueuedMessage msg = messages.front();
- if (lastValueQueue) {
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) msg.payload = replacement;
+ if (dequeueRequired) {
+ if (isRecovery) {
+ //can't issue new requests for the store until
+ //recovery is complete
+ pendingDequeues.push_back(removed);
+ } else {
+ dequeue(0, removed);
+ }
}
- return msg;
}
-QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg)
+void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
{
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) {
- const framing::FieldTable* ft = replacement->getApplicationHeaders();
- if (ft) {
- string key = ft->getAsString(qpidVQMatchProperty);
- if (lvq.find(key) != lvq.end()){
- lvq[key] = replacement;
- }
- }
- msg.payload = replacement;
- }
- return msg;
+ if (message.payload->isEnqueueComplete()) (*result)++;
}
/** function only provided for unit tests, or code not in critical message path */
@@ -684,20 +558,14 @@ uint32_t Queue::getEnqueueCompleteMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
uint32_t count = 0;
- for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
- //NOTE: don't need to use checkLvqReplace() here as it
- //is only relevant for LVQ which does not support persistence
- //so the enqueueComplete check has no effect
- if ( i->payload->isEnqueueComplete() ) count ++;
- }
-
+ messages->foreach(boost::bind(&isEnqueueComplete, &count, _1));
return count;
}
uint32_t Queue::getMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
- return messages.size();
+ return messages->size();
}
uint32_t Queue::getConsumerCount() const
@@ -717,21 +585,22 @@ void Queue::clearLastNodeFailure()
inLastNodeFailure = false;
}
+void Queue::forcePersistent(QueuedMessage& message)
+{
+ if(!message.payload->isStoredOnQueue(shared_from_this())) {
+ message.payload->forcePersistent();
+ if (message.payload->isForcedPersistent() ){
+ enqueue(0, message.payload);
+ }
+ }
+}
+
void Queue::setLastNodeFailure()
{
if (persistLastNode){
Mutex::ScopedLock locker(messageLock);
try {
- for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
- if (lastValueQueue) checkLvqReplace(*i);
- // don't force a message twice to disk.
- if(!i->payload->isStoredOnQueue(shared_from_this())) {
- i->payload->forcePersistent();
- if (i->payload->isForcedPersistent() ){
- enqueue(0, i->payload);
- }
- }
- }
+ messages->foreach(boost::bind(&Queue::forcePersistent, this, _1));
} catch (const std::exception& e) {
// Could not go into last node standing (for example journal not large enough)
QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what());
@@ -748,7 +617,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
if (!u.acquired) return false;
if (policy.get() && !suppressPolicyCheck) {
- Messages dequeues;
+ std::deque<QueuedMessage> dequeues;
{
Mutex::ScopedLock locker(messageLock);
policy->tryEnqueue(msg);
@@ -835,8 +704,8 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
*/
void Queue::popAndDequeue()
{
- QueuedMessage msg = getFront();
- popMsg(msg);
+ QueuedMessage msg = messages->front();
+ pop();
dequeue(0, msg);
}
@@ -885,13 +754,22 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
- lastValueQueue= _settings.get(qpidLastValueQueue);
- if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName());
-
- lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
- if (lastValueQueueNoBrowse){
- QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
- lastValueQueue = lastValueQueueNoBrowse;
+ std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey);
+ if (lvqKey.size()) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
+ messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
+ } else if (_settings.get(qpidLastValueQueueNoBrowse)) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
+ messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
+ } else if (_settings.get(qpidLastValueQueue)) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
+ messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
+ } else {
+ std::auto_ptr<Messages> m = Fairshare::create(_settings);
+ if (m.get()) {
+ messages = m;
+ QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
+ }
}
persistLastNode= _settings.get(qpidPersistLastNode);
@@ -919,8 +797,8 @@ void Queue::destroy()
{
if (alternateExchange.get()) {
Mutex::ScopedLock locker(messageLock);
- while(!messages.empty()){
- DeliverableMessage msg(getFront().payload);
+ while(!messages->empty()){
+ DeliverableMessage msg(messages->front().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
msg.getMessage().getApplicationHeaders());
popAndDequeue();
@@ -1198,6 +1076,8 @@ bool Queue::isEnqueued(const QueuedMessage& msg)
}
QueueListeners& Queue::getListeners() { return listeners; }
+Messages& Queue::getMessages() { return *messages; }
+const Messages& Queue::getMessages() const { return *messages; }
void Queue::checkNotDeleted()
{
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 96c79d1b92..66e4c5fa22 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -26,6 +26,7 @@
#include "qpid/broker/OwnershipToken.h"
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Message.h"
+#include "qpid/broker/Messages.h"
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/QueueBindings.h"
@@ -85,10 +86,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
~ScopedUse() { if (acquired) barrier.release(); }
};
- typedef std::deque<QueuedMessage> Messages;
- typedef std::map<std::string,boost::intrusive_ptr<Message> > LVQ;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
+
const std::string name;
const bool autodelete;
MessageStore* store;
@@ -96,16 +96,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
uint32_t consumerCount;
OwnershipToken* exclusive;
bool noLocal;
- bool lastValueQueue;
- bool lastValueQueueNoBrowse;
bool persistLastNode;
bool inLastNodeFailure;
std::string traceId;
std::vector<std::string> traceExclude;
QueueListeners listeners;
- Messages messages;
- Messages pendingDequeues;//used to avoid dequeuing during recovery
- LVQ lvq;
+ std::auto_ptr<Messages> messages;
+ std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
mutable qpid::sys::Mutex consumerLock;
mutable qpid::sys::Monitor messageLock;
mutable qpid::sys::Mutex ownershipLock;
@@ -140,11 +137,10 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool isExcluded(boost::intrusive_ptr<Message>& msg);
void dequeued(const QueuedMessage& msg);
- void popMsg(QueuedMessage& qmsg);
+ void pop();
void popAndDequeue();
QueuedMessage getFront();
- QueuedMessage& checkLvqReplace(QueuedMessage& msg);
- void clearLVQIndex(const QueuedMessage& msg);
+ void forcePersistent(QueuedMessage& msg);
inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
{
@@ -169,7 +165,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
}
}
- Messages::iterator findAt(framing::SequenceNumber pos);
void checkNotDeleted();
public:
@@ -320,13 +315,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/** Apply f to each Message on the queue. */
template <class F> void eachMessage(F f) {
sys::Mutex::ScopedLock l(messageLock);
- if (lastValueQueue) {
- for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) {
- f(checkLvqReplace(*i));
- }
- } else {
- std::for_each(messages.begin(), messages.end(), f);
- }
+ messages->foreach(f);
}
/** Apply f to each QueueBinding on the queue */
@@ -352,6 +341,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
// For cluster update
QueueListeners& getListeners();
+ Messages& getMessages();
+ const Messages& getMessages() const;
/**
* Reserve space in policy for an enqueued message that
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index f311ea8321..4168221ad0 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/PriorityQueue.h"
#include "qpid/Exception.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
@@ -213,7 +214,10 @@ RingQueuePolicy::RingQueuePolicy(const std::string& _name,
bool before(const QueuedMessage& a, const QueuedMessage& b)
{
- return a.position < b.position;
+ int priorityA = PriorityQueue::getPriority(a);
+ int priorityB = PriorityQueue::getPriority(b);
+ if (priorityA == priorityB) return a.position < b.position;
+ else return priorityA < priorityB;
}
void RingQueuePolicy::enqueued(const QueuedMessage& m)
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index c7689577a7..0582945a9c 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -32,6 +32,7 @@
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Fairshare.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Queue.h"
@@ -548,6 +549,13 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi
findQueue(qname)->setPosition(position);
}
+void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count)
+{
+ if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) {
+ QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies.");
+ }
+}
+
void Connection::expiryId(uint64_t id) {
cluster.getExpiryPolicy().setId(id);
}
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index d90cdd898b..7ee85bf1aa 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -152,6 +152,7 @@ class Connection :
uint32_t credit);
void queuePosition(const std::string&, const framing::SequenceNumber&);
+ void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
void expiryId(uint64_t);
void txStart();
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 4f6488a28a..8f751add9b 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -32,6 +32,7 @@
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Future.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Fairshare.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/LinkRegistry.h"
@@ -352,6 +353,10 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1));
ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition());
+ uint priority, count;
+ if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
+ ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
+ }
}
void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 903a20ec28..f2ccd0ba84 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -1191,5 +1191,41 @@ QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) {
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
}
+QPID_AUTO_TEST_CASE(testFairsharePriorityDelivery) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+
+ FieldTable arguments;
+ arguments.setInt("x-qpid-priorities", 10);
+ arguments.setInt("x-qpid-fairshare", 5);
+ c0.session.queueDeclare("q", arg::durable=durableFlag, arg::arguments=arguments);
+
+ //send messages of different priorities
+ for (int i = 0; i < 20; i++) {
+ Message msg = makeMessage((boost::format("msg-%1%") % i).str(), "q", durableFlag);
+ msg.getDeliveryProperties().setPriority(i % 2 ? 9 : 5);
+ c0.session.messageTransfer(arg::content=msg);
+ }
+
+ //pull off a couple of the messages (first four should be the top priority messages
+ for (int i = 0; i < 4; i++) {
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % ((i*2)+1)).str(), c0.subs.get("q", TIMEOUT).getData());
+ }
+
+ // Add another member
+ cluster.add();
+ Client c1(cluster[1], "c1");
+
+ //pull off some more messages
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 9).str(), c0.subs.get("q", TIMEOUT).getData());
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 0).str(), c1.subs.get("q", TIMEOUT).getData());
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 2).str(), c0.subs.get("q", TIMEOUT).getData());
+
+ //check queue has same content on both nodes
+ BOOST_CHECK_EQUAL(browse(c0, "q", 12), browse(c1, "q", 12));
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/cpp/src/tests/qpid-receive.cpp b/cpp/src/tests/qpid-receive.cpp
index 28e229ca27..012d544a2e 100644
--- a/cpp/src/tests/qpid-receive.cpp
+++ b/cpp/src/tests/qpid-receive.cpp
@@ -206,6 +206,7 @@ int main(int argc, char ** argv)
if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl;
if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl;
if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl;
+ if (msg.getPriority()) std::cout << "Priority: " << msg.getPriority() << std::endl;
if (msg.getDurable()) std::cout << "Durable: true" << std::endl;
if (msg.getRedelivered()) std::cout << "Redelivered: true" << std::endl;
std::cout << "Properties: " << msg.getProperties() << std::endl;
diff --git a/cpp/src/tests/qpid-send.cpp b/cpp/src/tests/qpid-send.cpp
index c71cb83f9a..3824a870bf 100644
--- a/cpp/src/tests/qpid-send.cpp
+++ b/cpp/src/tests/qpid-send.cpp
@@ -56,6 +56,7 @@ struct Options : public qpid::Options
uint sendEos;
bool durable;
uint ttl;
+ uint priority;
std::string userid;
std::string correlationid;
string_vector properties;
@@ -84,6 +85,7 @@ struct Options : public qpid::Options
sendEos(0),
durable(false),
ttl(0),
+ priority(0),
contentString(),
contentSize(0),
contentStdin(false),
@@ -110,6 +112,7 @@ struct Options : public qpid::Options
("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
+ ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)")
("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
("user-id", qpid::optValue(userid, "USERID"), "userid for message")
@@ -266,6 +269,9 @@ int main(int argc, char ** argv)
if (opts.ttl) {
msg.setTtl(Duration(opts.ttl));
}
+ if (opts.priority) {
+ msg.setPriority(opts.priority);
+ }
if (!opts.replyto.empty()) msg.setReplyTo(Address(opts.replyto));
if (!opts.userid.empty()) msg.setUserId(opts.userid);
if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid);