diff options
Diffstat (limited to 'cpp/src')
24 files changed, 1422 insertions, 259 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 4ecbc53c6b..a709056899 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -956,6 +956,11 @@ set (qpidbroker_SOURCES qpid/broker/Broker.cpp qpid/broker/Exchange.cpp qpid/broker/ExpiryPolicy.cpp + qpid/broker/Fairshare.cpp + qpid/broker/LegacyLVQ.cpp + qpid/broker/MessageDeque.cpp + qpid/broker/MessageMap.cpp + qpid/broker/PriorityQueue.cpp qpid/broker/Queue.cpp qpid/broker/QueueCleaner.cpp qpid/broker/QueueListeners.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index cd30526675..c50fa5a6da 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -548,6 +548,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/ExchangeRegistry.h \ qpid/broker/ExpiryPolicy.cpp \ qpid/broker/ExpiryPolicy.h \ + qpid/broker/Fairshare.h \ + qpid/broker/Fairshare.cpp \ qpid/broker/FanOutExchange.cpp \ qpid/broker/FanOutExchange.h \ qpid/broker/FedOps.h \ @@ -556,6 +558,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/HeadersExchange.h \ qpid/broker/IncompleteMessageList.cpp \ qpid/broker/IncompleteMessageList.h \ + qpid/broker/LegacyLVQ.h \ + qpid/broker/LegacyLVQ.cpp \ qpid/broker/Link.cpp \ qpid/broker/Link.h \ qpid/broker/LinkRegistry.cpp \ @@ -566,9 +570,16 @@ libqpidbroker_la_SOURCES = \ qpid/broker/MessageAdapter.h \ qpid/broker/MessageBuilder.cpp \ qpid/broker/MessageBuilder.h \ + qpid/broker/MessageDeque.h \ + qpid/broker/MessageDeque.cpp \ + qpid/broker/MessageMap.h \ + qpid/broker/MessageMap.cpp \ + qpid/broker/Messages.h \ qpid/broker/MessageStore.h \ qpid/broker/MessageStoreModule.cpp \ qpid/broker/MessageStoreModule.h \ + qpid/broker/PriorityQueue.h \ + qpid/broker/PriorityQueue.cpp \ qpid/broker/NameGenerator.cpp \ qpid/broker/NameGenerator.h \ qpid/broker/NullMessageStore.cpp \ diff --git a/cpp/src/qpid/broker/Fairshare.cpp b/cpp/src/qpid/broker/Fairshare.cpp new file mode 100644 index 0000000000..e6bbf86691 --- /dev/null +++ b/cpp/src/qpid/broker/Fairshare.cpp @@ -0,0 +1,156 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Fairshare.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/log/Statement.h" +#include <boost/format.hpp> +#include <boost/lexical_cast.hpp> + +namespace qpid { +namespace broker { + +Fairshare::Fairshare(size_t levels, uint limit) : + PriorityQueue(levels), + limits(levels, limit), priority(levels-1), count(0) {} + + +void Fairshare::setLimit(size_t level, uint limit) +{ + limits[level] = limit; +} + +bool Fairshare::limitReached() +{ + uint l = limits[priority]; + return l && ++count > l; +} + +uint Fairshare::currentLevel() +{ + if (limitReached()) { + return nextLevel(); + } else { + return priority; + } +} + +uint Fairshare::nextLevel() +{ + count = 1; + if (priority) --priority; + else priority = levels-1; + return priority; +} + +bool Fairshare::isNull() +{ + for (int i = 0; i < levels; i++) if (limits[i]) return false; + return true; +} + +bool Fairshare::getState(uint& p, uint& c) const +{ + p = priority; + c = count; + return true; +} + +bool Fairshare::setState(uint p, uint c) +{ + priority = p; + count = c; + return true; +} + +bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages) +{ + const uint start = p = currentLevel(); + do { + if (!messages[p].empty()) return true; + } while ((p = nextLevel()) != start); + return false; +} + + + +bool Fairshare::getState(const Messages& m, uint& priority, uint& count) +{ + const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m); + return fairshare && fairshare->getState(priority, count); +} + +bool Fairshare::setState(Messages& m, uint priority, uint count) +{ + Fairshare* fairshare = dynamic_cast<Fairshare*>(&m); + return fairshare && fairshare->setState(priority, count); +} + +int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key) +{ + qpid::framing::FieldTable::ValuePtr v = settings.get(key); + if (!v) { + return 0; + } else if (v->convertsTo<int>()) { + return v->get<int>(); + } else if (v->convertsTo<std::string>()){ + std::string s = v->get<std::string>(); + try { + return boost::lexical_cast<int>(s); + } catch(const boost::bad_lexical_cast&) { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s); + return 0; + } + } else { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v); + return 0; + } +} + +int getSetting(const qpid::framing::FieldTable& settings, const std::string& key, int minvalue, int maxvalue) +{ + return std::max(minvalue,std::min(getIntegerSetting(settings, key), maxvalue)); +} + +std::auto_ptr<Messages> Fairshare::create(const qpid::framing::FieldTable& settings) +{ + std::auto_ptr<Messages> result; + size_t levels = getSetting(settings, "x-qpid-priorities", 1, 100); + if (levels) { + uint defaultLimit = getIntegerSetting(settings, "x-qpid-fairshare"); + std::auto_ptr<Fairshare> fairshare(new Fairshare(levels, defaultLimit)); + for (uint i = 0; i < levels; i++) { + std::string key = (boost::format("x-qpid-fairshare-%1%") % i).str(); + if(settings.isSet(key)) { + fairshare->setLimit(i, getIntegerSetting(settings, key)); + } + } + + if (fairshare->isNull()) { + result = std::auto_ptr<Messages>(new PriorityQueue(levels)); + } else { + result = fairshare; + } + } + return result; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Fairshare.h b/cpp/src/qpid/broker/Fairshare.h new file mode 100644 index 0000000000..6c4b87f857 --- /dev/null +++ b/cpp/src/qpid/broker/Fairshare.h @@ -0,0 +1,61 @@ +#ifndef QPID_BROKER_FAIRSHARE_H +#define QPID_BROKER_FAIRSHARE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/PriorityQueue.h" + +namespace qpid { +namespace framing { +class FieldTable; +} +namespace broker { + +/** + * Modifies a basic prioirty queue by limiting the number of messages + * from each priority level that are dispatched before allowing + * dispatch from the next level. + */ +class Fairshare : public PriorityQueue +{ + public: + Fairshare(size_t levels, uint limit); + bool getState(uint& priority, uint& count) const; + bool setState(uint priority, uint count); + void setLimit(size_t level, uint limit); + static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings); + static bool getState(const Messages&, uint& priority, uint& count); + static bool setState(Messages&, uint priority, uint count); + private: + std::vector<uint> limits; + + uint priority; + uint count; + + uint currentLevel(); + uint nextLevel(); + bool isNull(); + bool limitReached(); + bool findFrontLevel(uint& p, PriorityLevels&); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_FAIRSHARE_H*/ diff --git a/cpp/src/qpid/broker/LegacyLVQ.cpp b/cpp/src/qpid/broker/LegacyLVQ.cpp new file mode 100644 index 0000000000..a811a86492 --- /dev/null +++ b/cpp/src/qpid/broker/LegacyLVQ.cpp @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/LegacyLVQ.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/QueuedMessage.h" + +namespace qpid { +namespace broker { + +LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {} + +void LegacyLVQ::setNoBrowse(bool b) +{ + noBrowse = b; +} + +bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& message) +{ + Ordering::iterator i = messages.find(position); + if (i != messages.end() && i->second.payload == message.payload) { + message = i->second; + erase(i); + return true; + } else { + return false; + } +} + +bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message) +{ + if (MessageMap::next(position, message)) { + if (!noBrowse) index.erase(getKey(message)); + return true; + } else { + return false; + } +} + +bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed) +{ + //Hack to disable LVQ behaviour on cluster update: + if (broker && broker->isClusterUpdatee()) { + messages[added.position] = added; + return false; + } else { + return MessageMap::push(added, removed); + } +} + +const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update) +{ + //add the new message into the original position of the replaced message + Ordering::iterator i = messages.find(original.position); + i->second = update; + i->second.position = original.position; + return i->second; +} + +void LegacyLVQ::removeIf(Predicate p) +{ + //Note: This method is currently called periodically on the timer + //thread to expire messages. In a clustered broker this means that + //the purging does not occur on the cluster event dispatch thread + //and consequently that is not totally ordered w.r.t other events + //(including publication of messages). The cluster does ensure + //that the actual expiration of messages (as distinct from the + //removing of those expired messages from the queue) *is* + //consistently ordered w.r.t. cluster events. This means that + //delivery of messages is in general consistent across the cluster + //inspite of any non-determinism in the triggering of a + //purge. However at present purging a last value queue (of the + //legacy sort) could potentially cause inconsistencies in the + //cluster (as the order w.r.t publications can affect the order in + //which messages appear in the queue). Consequently periodic + //purging of an LVQ is not enabled if the broker is clustered + //(expired messages will be removed on delivery and consolidated + //by key as part of normal LVQ operation). + + //TODO: Is there a neater way to check whether broker is + //clustered? Here we assume that if the clustered timer is the + //same as the regular timer, we are not clustered: + if (!broker || &(broker->getClusterTimer()) == &(broker->getTimer())) + MessageMap::removeIf(p); +} + +std::auto_ptr<Messages> LegacyLVQ::updateOrReplace(std::auto_ptr<Messages> current, + const std::string& key, bool noBrowse, Broker* broker) +{ + LegacyLVQ* lvq = dynamic_cast<LegacyLVQ*>(current.get()); + if (lvq) { + lvq->setNoBrowse(noBrowse); + return current; + } else { + return std::auto_ptr<Messages>(new LegacyLVQ(key, noBrowse, broker)); + } +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/LegacyLVQ.h b/cpp/src/qpid/broker/LegacyLVQ.h new file mode 100644 index 0000000000..dd0fd7aaec --- /dev/null +++ b/cpp/src/qpid/broker/LegacyLVQ.h @@ -0,0 +1,59 @@ +#ifndef QPID_BROKER_LEGACYLVQ_H +#define QPID_BROKER_LEGACYLVQ_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/MessageMap.h" +#include <memory> + +namespace qpid { +namespace broker { +class Broker; + +/** + * This class encapsulates the behaviour of the old style LVQ where a + * message replacing another messages for the given key will use the + * position in the queue of the previous message. This however causes + * problems for browsing. Either browsers stop the coalescing of + * messages by key (default) or they may mis updates (if the no-browse + * option is specified). + */ +class LegacyLVQ : public MessageMap +{ + public: + LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0); + bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool next(const framing::SequenceNumber&, QueuedMessage&); + bool push(const QueuedMessage& added, QueuedMessage& removed); + void removeIf(Predicate); + void setNoBrowse(bool); + static std::auto_ptr<Messages> updateOrReplace(std::auto_ptr<Messages> current, + const std::string& key, bool noBrowse, + Broker* broker); + protected: + bool noBrowse; + Broker* broker; + + const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_LEGACYLVQ_H*/ diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 147b9e7a6a..c589669e5a 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -399,22 +399,6 @@ bool Message::hasExpired() return expiryPolicy && expiryPolicy->hasExpired(*this); } -boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const -{ - sys::Mutex::ScopedLock l(lock); - Replacement::iterator i = replacement.find(qfor); - if (i != replacement.end()){ - return i->second; - } - return empty; -} - -void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor) -{ - sys::Mutex::ScopedLock l(lock); - replacement[qfor] = msg; -} - namespace { struct ScopedSet { sys::Monitor& lock; diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index ee80657f39..f7dd2734b6 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -153,8 +153,6 @@ public: void forcePersistent(); bool isForcedPersistent(); - boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const; - void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor); /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */ void setEnqueueCompleteCallback(MessageCallback& cb); @@ -167,8 +165,6 @@ public: uint8_t getPriority() const; private: - typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement; - MessageAdapter& getAdapter() const; void allEnqueuesComplete(); void allDequeuesComplete(); @@ -188,7 +184,6 @@ public: static TransferAdapter TRANSFER; - mutable Replacement replacement; mutable boost::intrusive_ptr<Message> empty; sys::Monitor callbackLock; diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp new file mode 100644 index 0000000000..24b8f6f895 --- /dev/null +++ b/cpp/src/qpid/broker/MessageDeque.cpp @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/MessageDeque.h" +#include "qpid/broker/QueuedMessage.h" + +namespace qpid { +namespace broker { + +size_t MessageDeque::size() +{ + return messages.size(); +} + +bool MessageDeque::empty() +{ + return messages.empty(); +} + +void MessageDeque::reinsert(const QueuedMessage& message) +{ + messages.insert(lower_bound(messages.begin(), messages.end(), message), message); +} + +MessageDeque::Deque::iterator MessageDeque::seek(const framing::SequenceNumber& position) +{ + if (!messages.empty()) { + QueuedMessage comp; + comp.position = position; + unsigned long diff = position.getValue() - messages.front().position.getValue(); + long maxEnd = diff < messages.size()? diff : messages.size(); + return lower_bound(messages.begin(),messages.begin()+maxEnd,comp); + } else { + return messages.end(); + } +} + +bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove) +{ + Deque::iterator i = seek(position); + if (i != messages.end() && i->position == position) { + message = *i; + if (remove) messages.erase(i); + return true; + } else { + return false; + } +} + +bool MessageDeque::remove(const framing::SequenceNumber& position, QueuedMessage& message) +{ + return find(position, message, true); +} + +bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message) +{ + return find(position, message, false); +} + +bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message) +{ + if (messages.empty()) { + return false; + } else if (position < front().position) { + message = front(); + return true; + } else { + Deque::iterator i = seek(position+1); + if (i != messages.end()) { + message = *i; + return true; + } else { + return false; + } + } +} + +QueuedMessage& MessageDeque::front() +{ + return messages.front(); +} + +void MessageDeque::pop() +{ + if (!messages.empty()) { + messages.pop_front(); + } +} + +bool MessageDeque::pop(QueuedMessage& out) +{ + if (messages.empty()) { + return false; + } else { + out = front(); + messages.pop_front(); + return true; + } +} + +bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) +{ + messages.push_back(added); + return false;//adding a message never causes one to be removed for deque +} + +void MessageDeque::foreach(Functor f) +{ + std::for_each(messages.begin(), messages.end(), f); +} + +void MessageDeque::removeIf(Predicate p) +{ + for (Deque::iterator i = messages.begin(); i != messages.end();) { + if (p(*i)) { + i = messages.erase(i); + } else { + ++i; + } + } +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/MessageDeque.h b/cpp/src/qpid/broker/MessageDeque.h new file mode 100644 index 0000000000..eb074fc8dc --- /dev/null +++ b/cpp/src/qpid/broker/MessageDeque.h @@ -0,0 +1,61 @@ +#ifndef QPID_BROKER_MESSAGEDEQUE_H +#define QPID_BROKER_MESSAGEDEQUE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Messages.h" +#include <deque> + +namespace qpid { +namespace broker { + +/** + * Provides the standard FIFO queue behaviour. + */ +class MessageDeque : public Messages +{ + public: + size_t size(); + bool empty(); + + void reinsert(const QueuedMessage&); + bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool find(const framing::SequenceNumber&, QueuedMessage&); + bool next(const framing::SequenceNumber&, QueuedMessage&); + + QueuedMessage& front(); + void pop(); + bool pop(QueuedMessage&); + bool push(const QueuedMessage& added, QueuedMessage& removed); + + void foreach(Functor); + void removeIf(Predicate); + + private: + typedef std::deque<QueuedMessage> Deque; + Deque messages; + + Deque::iterator seek(const framing::SequenceNumber&); + bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_MESSAGEDEQUE_H*/ diff --git a/cpp/src/qpid/broker/MessageMap.cpp b/cpp/src/qpid/broker/MessageMap.cpp new file mode 100644 index 0000000000..39e23df533 --- /dev/null +++ b/cpp/src/qpid/broker/MessageMap.cpp @@ -0,0 +1,166 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/MessageMap.h" +#include "qpid/broker/QueuedMessage.h" + +namespace qpid { +namespace broker { +namespace { +const std::string EMPTY; +} + +std::string MessageMap::getKey(const QueuedMessage& message) +{ + const framing::FieldTable* ft = message.payload->getApplicationHeaders(); + if (ft) return ft->getAsString(key); + else return EMPTY; +} + +size_t MessageMap::size() +{ + return messages.size(); +} + +bool MessageMap::empty() +{ + return messages.empty(); +} + +void MessageMap::reinsert(const QueuedMessage& message) +{ + std::string key = getKey(message); + Index::iterator i = index.find(key); + if (i == index.end()) { + index[key] = message; + messages[message.position] = message; + } //else message has already been replaced +} + +bool MessageMap::remove(const framing::SequenceNumber& position, QueuedMessage& message) +{ + Ordering::iterator i = messages.find(position); + if (i != messages.end()) { + message = i->second; + erase(i); + return true; + } else { + return false; + } +} + +bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message) +{ + Ordering::iterator i = messages.find(position); + if (i != messages.end()) { + message = i->second; + return true; + } else { + return false; + } +} + +bool MessageMap::next(const framing::SequenceNumber& position, QueuedMessage& message) +{ + if (!messages.empty() && position < front().position) { + message = front(); + return true; + } else { + Ordering::iterator i = messages.lower_bound(position+1); + if (i != messages.end()) { + message = i->second; + return true; + } else { + return false; + } + } +} + +QueuedMessage& MessageMap::front() +{ + return messages.begin()->second; +} + +void MessageMap::pop() +{ + QueuedMessage dummy; + pop(dummy); +} + +bool MessageMap::pop(QueuedMessage& out) +{ + Ordering::iterator i = messages.begin(); + if (i != messages.end()) { + out = i->second; + erase(i); + return true; + } else { + return false; + } +} + +const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update) +{ + messages.erase(original.position); + messages[update.position] = update; + return update; +} + +bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) +{ + std::pair<Index::iterator, bool> result = index.insert(Index::value_type(getKey(added), added)); + if (result.second) { + //there was no previous message for this key; nothing needs to + //be removed, just add the message into its correct position + messages[added.position] = added; + return false; + } else { + //there is already a message with that key which needs to be replaced + removed = result.first->second; + result.first->second = replace(result.first->second, added); + return true; + } +} + +void MessageMap::foreach(Functor f) +{ + for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { + f(i->second); + } +} + +void MessageMap::removeIf(Predicate p) +{ + for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) { + if (p(i->second)) { + erase(i); + } + } +} + +void MessageMap::erase(Ordering::iterator i) +{ + index.erase(getKey(i->second)); + messages.erase(i); +} + +MessageMap::MessageMap(const std::string& k) : key(k) {} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/MessageMap.h b/cpp/src/qpid/broker/MessageMap.h new file mode 100644 index 0000000000..1128a1d54a --- /dev/null +++ b/cpp/src/qpid/broker/MessageMap.h @@ -0,0 +1,72 @@ +#ifndef QPID_BROKER_MESSAGEMAP_H +#define QPID_BROKER_MESSAGEMAP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Messages.h" +#include "qpid/framing/SequenceNumber.h" +#include <map> +#include <string> + +namespace qpid { +namespace broker { + +/** + * Provides a last value queue behaviour, whereby a messages replace + * any previous message with the same value for a defined property + * (i.e. the key). + */ +class MessageMap : public Messages +{ + public: + MessageMap(const std::string& key); + virtual ~MessageMap() {} + + size_t size(); + bool empty(); + + void reinsert(const QueuedMessage&); + virtual bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool find(const framing::SequenceNumber&, QueuedMessage&); + virtual bool next(const framing::SequenceNumber&, QueuedMessage&); + + QueuedMessage& front(); + void pop(); + bool pop(QueuedMessage&); + virtual bool push(const QueuedMessage& added, QueuedMessage& removed); + + void foreach(Functor); + virtual void removeIf(Predicate); + + protected: + typedef std::map<std::string, QueuedMessage> Index; + typedef std::map<framing::SequenceNumber, QueuedMessage> Ordering; + const std::string key; + Index index; + Ordering messages; + + std::string getKey(const QueuedMessage&); + virtual const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&); + void erase(Ordering::iterator); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_MESSAGEMAP_H*/ diff --git a/cpp/src/qpid/broker/Messages.h b/cpp/src/qpid/broker/Messages.h new file mode 100644 index 0000000000..0d75417640 --- /dev/null +++ b/cpp/src/qpid/broker/Messages.h @@ -0,0 +1,117 @@ +#ifndef QPID_BROKER_MESSAGES_H +#define QPID_BROKER_MESSAGES_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/function.hpp> + +namespace qpid { +namespace framing { +class SequenceNumber; +} +namespace broker { +struct QueuedMessage; + +/** + * This interface abstracts out the access to the messages held for + * delivery by a Queue instance. + */ +class Messages +{ + public: + typedef boost::function1<void, QueuedMessage&> Functor; + typedef boost::function1<bool, QueuedMessage&> Predicate; + + virtual ~Messages() {} + /** + * @return the number of messages available for delivery. + */ + virtual size_t size() = 0; + /** + * @return true if there are no messages for delivery, false otherwise + */ + virtual bool empty() = 0; + + /** + * Re-inserts a message back into its original position - used + * when requeing released messages. + */ + virtual void reinsert(const QueuedMessage&) = 0; + /** + * Remove the message at the specified position, returning true if + * found, false otherwise. The removed message is passed back via + * the second parameter. + */ + virtual bool remove(const framing::SequenceNumber&, QueuedMessage&) = 0; + /** + * Find the message at the specified position, returning true if + * found, false otherwise. The matched message is passed back via + * the second parameter. + */ + virtual bool find(const framing::SequenceNumber&, QueuedMessage&) = 0; + /** + * Return the next message to be given to a browsing subscrption + * that has reached the specified poisition. The next messages is + * passed back via the second parameter. + * + * @return true if there is another message, false otherwise. + */ + virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0; + + /** + * Note: Caller is responsible for ensuring that there is a front + * (e.g. empty() returns false) + * + * @return the next message to be delivered + */ + virtual QueuedMessage& front() = 0; + /** + * Removes the front message + */ + virtual void pop() = 0; + /** + * @return true if there is a mesage to be delivered - in which + * case that message will be returned via the parameter and + * removed - otherwise false. + */ + virtual bool pop(QueuedMessage&) = 0; + /** + * Pushes a message to the back of the 'queue'. For some types of + * queue this may cause another message to be removed; if that is + * the case the method will return true and the removed message + * will be passed out via the second parameter. + */ + virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0; + + /** + * Apply the functor to each message held + */ + virtual void foreach(Functor) = 0; + /** + * Remove every message held that for which the specified + * predicate returns true + */ + virtual void removeIf(Predicate) = 0; + private: +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_MESSAGES_H*/ diff --git a/cpp/src/qpid/broker/PriorityQueue.cpp b/cpp/src/qpid/broker/PriorityQueue.cpp new file mode 100644 index 0000000000..0272e968ec --- /dev/null +++ b/cpp/src/qpid/broker/PriorityQueue.cpp @@ -0,0 +1,211 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/PriorityQueue.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace broker { + +PriorityQueue::PriorityQueue(int l) : + levels(l), + messages(levels, Deque()), + frontLevel(0), haveFront(false), cached(false) {} + +size_t PriorityQueue::size() +{ + size_t total(0); + for (int i = 0; i < levels; ++i) { + total += messages[i].size(); + } + return total; +} + +bool PriorityQueue::empty() +{ + for (int i = 0; i < levels; ++i) { + if (!messages[i].empty()) return false; + } + return true; +} + +void PriorityQueue::reinsert(const QueuedMessage& message) +{ + uint p = getPriorityLevel(message); + messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message); + clearCache(); +} + +bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove) +{ + QueuedMessage comp; + comp.position = position; + for (int i = 0; i < levels; ++i) { + if (!messages[i].empty()) { + unsigned long diff = position.getValue() - messages[i].front().position.getValue(); + long maxEnd = diff < messages[i].size() ? diff : messages[i].size(); + Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp); + if (l != messages[i].end() && l->position == position) { + message = *l; + if (remove) { + messages[i].erase(l); + clearCache(); + } + return true; + } + } + } + return false; +} + +bool PriorityQueue::remove(const framing::SequenceNumber& position, QueuedMessage& message) +{ + return find(position, message, true); +} + +bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message) +{ + return find(position, message, false); +} + +bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& message) +{ + QueuedMessage match; + match.position = position+1; + Deque::iterator lowest; + bool found = false; + for (int i = 0; i < levels; ++i) { + Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match); + if (m != messages[i].end()) { + if (m->position == match.position) { + message = *m; + return true; + } else if (!found || m->position < lowest->position) { + lowest = m; + found = true; + } + } + } + if (found) { + message = *lowest; + } + return found; +} + +QueuedMessage& PriorityQueue::front() +{ + if (checkFront()) { + return messages[frontLevel].front(); + } else { + throw qpid::framing::InternalErrorException(QPID_MSG("No message available")); + } +} + +bool PriorityQueue::pop(QueuedMessage& message) +{ + if (checkFront()) { + message = messages[frontLevel].front(); + messages[frontLevel].pop_front(); + clearCache(); + return true; + } else { + return false; + } +} + +void PriorityQueue::pop() +{ + QueuedMessage dummy; + pop(dummy); +} + +bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) +{ + messages[getPriorityLevel(added)].push_back(added); + clearCache(); + return false;//adding a message never causes one to be removed for deque +} + +void PriorityQueue::foreach(Functor f) +{ + for (int i = 0; i < levels; ++i) { + std::for_each(messages[i].begin(), messages[i].end(), f); + } +} + +void PriorityQueue::removeIf(Predicate p) +{ + for (int priority = 0; priority < levels; ++priority) { + for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) { + if (p(*i)) { + i = messages[priority].erase(i); + clearCache(); + } else { + ++i; + } + } + } +} + +uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const +{ + uint priority = m.payload->getPriority(); + //Use AMQP 0-10 approach to mapping priorities to a fixed level + //(see rule priority-level-implementation) + const uint firstLevel = 5 - std::min(5.0, ceil((double) levels/2.0)); + if (priority <= firstLevel) return 0; + return std::min(priority - firstLevel, (uint)levels-1); +} + +void PriorityQueue::clearCache() +{ + cached = false; +} + +bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m) +{ + for (int p = levels-1; p >= 0; --p) { + if (!m[p].empty()) { + l = p; + return true; + } + } + return false; +} + +bool PriorityQueue::checkFront() +{ + if (!cached) { + haveFront = findFrontLevel(frontLevel, messages); + cached = true; + } + return haveFront; +} + +uint PriorityQueue::getPriority(const QueuedMessage& message) +{ + const PriorityQueue* queue = dynamic_cast<const PriorityQueue*>(&(message.queue->getMessages())); + if (queue) return queue->getPriorityLevel(message); + else return 0; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PriorityQueue.h b/cpp/src/qpid/broker/PriorityQueue.h new file mode 100644 index 0000000000..7e97d929fb --- /dev/null +++ b/cpp/src/qpid/broker/PriorityQueue.h @@ -0,0 +1,77 @@ +#ifndef QPID_BROKER_PRIORITYQUEUE_H +#define QPID_BROKER_PRIORITYQUEUE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Messages.h" +#include <deque> +#include <vector> + +namespace qpid { +namespace broker { + +/** + * Basic priority queue with a configurable number of recognised + * priority levels. This is implemented as a separate deque per + * priority level. Browsing is FIFO not priority order. + */ +class PriorityQueue : public Messages +{ + public: + PriorityQueue(int levels); + virtual ~PriorityQueue() {} + size_t size(); + bool empty(); + + void reinsert(const QueuedMessage&); + bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool find(const framing::SequenceNumber&, QueuedMessage&); + bool next(const framing::SequenceNumber&, QueuedMessage&); + + QueuedMessage& front(); + void pop(); + bool pop(QueuedMessage&); + bool push(const QueuedMessage& added, QueuedMessage& removed); + + void foreach(Functor); + void removeIf(Predicate); + static uint getPriority(const QueuedMessage&); + protected: + typedef std::deque<QueuedMessage> Deque; + typedef std::vector<Deque> PriorityLevels; + virtual bool findFrontLevel(uint& p, PriorityLevels&); + + const int levels; + private: + PriorityLevels messages; + uint frontLevel; + bool haveFront; + bool cached; + + bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove); + uint getPriorityLevel(const QueuedMessage&) const; + void clearCache(); + bool checkFront(); +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_PRIORITYQUEUE_H*/ diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index e59857462c..43d1a2b27c 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -23,7 +23,11 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/QueueEvents.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/Fairshare.h" #include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/LegacyLVQ.h" +#include "qpid/broker/MessageDeque.h" +#include "qpid/broker/MessageMap.h" #include "qpid/broker/MessageStore.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" @@ -66,6 +70,7 @@ const std::string qpidMaxCount("qpid.max_count"); const std::string qpidNoLocal("no-local"); const std::string qpidTraceIdentity("qpid.trace.id"); const std::string qpidTraceExclude("qpid.trace.exclude"); +const std::string qpidLastValueQueueKey("qpid.last_value_queue_key"); const std::string qpidLastValueQueue("qpid.last_value_queue"); const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); const std::string qpidPersistLastNode("qpid.persist_last_node"); @@ -92,10 +97,9 @@ Queue::Queue(const string& _name, bool _autodelete, consumerCount(0), exclusive(0), noLocal(false), - lastValueQueue(false), - lastValueQueueNoBrowse(false), persistLastNode(false), inLastNodeFailure(false), + messages(new MessageDeque()), persistenceId(0), policyExceeded(false), mgmtObject(0), @@ -212,7 +216,7 @@ void Queue::requeue(const QueuedMessage& msg){ Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; msg.payload->enqueueComplete(); // mark the message as enqueued - messages.insert(lower_bound(messages.begin(), messages.end(), msg), msg); + messages->reinsert(msg); listeners.populate(copy); // for persistLastNode - don't force a message twice to disk, but force it if no force before @@ -227,57 +231,23 @@ void Queue::requeue(const QueuedMessage& msg){ copy.notify(); } -void Queue::clearLVQIndex(const QueuedMessage& msg){ - assertClusterSafe(); - const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0; - if (lastValueQueue && ft){ - string key = ft->getAsString(qpidVQMatchProperty); - lvq.erase(key); - } -} - bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); - - Messages::iterator i = findAt(position); - if (i != messages.end() ) { - message = *i; - if (lastValueQueue) { - clearLVQIndex(*i); - } - QPID_LOG(debug, - "Acquired message at " << i->position << " from " << name); - messages.erase(i); + if (messages->remove(position, message)) { + QPID_LOG(debug, "Acquired message at " << position << " from " << name); return true; - } - QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); - return false; + } else { + QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); + return false; + } } bool Queue::acquire(const QueuedMessage& msg) { - Mutex::ScopedLock locker(messageLock); - assertClusterSafe(); - - QPID_LOG(debug, "attempting to acquire " << msg.position); - Messages::iterator i = findAt(msg.position); - if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set - (!lastValueQueue || - (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0 - ) { - - clearLVQIndex(msg); - QPID_LOG(debug, - "Match found, acquire succeeded: " << - i->position << " == " << msg.position); - messages.erase(i); - return true; - } - - QPID_LOG(debug, "Acquire failed for " << msg.position); - return false; + QueuedMessage copy = msg; + return acquireMessageAt(msg.position, copy); } void Queue::notifyListener() @@ -286,7 +256,7 @@ void Queue::notifyListener() QueueListeners::NotificationSet set; { Mutex::ScopedLock locker(messageLock); - if (messages.size()) { + if (messages->size()) { listeners.populate(set); } } @@ -315,12 +285,12 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ { while (true) { Mutex::ScopedLock locker(messageLock); - if (messages.empty()) { + if (messages->empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; } else { - QueuedMessage msg = getFront(); + QueuedMessage msg = messages->front(); if (msg.payload->hasExpired()) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); popAndDequeue(); @@ -330,7 +300,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { m = msg; - popMsg(msg); + pop(); return CONSUMED; } else { //message(s) are available but consumer hasn't got enough credit @@ -356,11 +326,6 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) //consumer wants the message c->position = msg.position; m = msg; - if (!lastValueQueueNoBrowse) clearLVQIndex(msg); - if (lastValueQueue) { - boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); - if (replacement.get()) m.payload = replacement; - } return true; } else { //browser hasn't got enough credit for the message @@ -382,7 +347,7 @@ void Queue::removeListener(Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); listeners.removeListener(c); - if (messages.size()) { + if (messages->size()) { listeners.populate(set); } } @@ -403,52 +368,20 @@ bool Queue::dispatch(Consumer::shared_ptr c) // Find the next message bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); - if (!messages.empty() && messages.back().position > c->position) { - if (c->position < getFront().position) { - msg = getFront(); - return true; - } else { - Messages::iterator pos = findAt(c->position); - if (pos != messages.end() && pos+1 != messages.end()) { - msg = *(pos+1); - return true; - } - } + if (messages->next(c->position, msg)) { + return true; + } else { + listeners.addListener(c); + return false; } - listeners.addListener(c); - return false; } -Queue::Messages::iterator Queue::findAt(SequenceNumber pos) { - - if(!messages.empty()){ - QueuedMessage compM; - compM.position = pos; - unsigned long diff = pos.getValue() - messages.front().position.getValue(); - long maxEnd = diff < messages.size()? diff : messages.size(); - - Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM); - if (i!= messages.end() && i->position == pos) - return i; - } - return messages.end(); // no match found. -} - - QueuedMessage Queue::find(SequenceNumber pos) const { Mutex::ScopedLock locker(messageLock); - if(!messages.empty()){ - QueuedMessage compM; - compM.position = pos; - unsigned long diff = pos.getValue() - messages.front().position.getValue(); - long maxEnd = diff < messages.size()? diff : messages.size(); - - Messages::const_iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM); - if (i != messages.end()) - return *i; - } - return QueuedMessage(); + QueuedMessage msg; + messages->find(pos, msg); + return msg; } void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ @@ -482,12 +415,18 @@ void Queue::cancel(Consumer::shared_ptr c){ QueuedMessage Queue::get(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); + messages->pop(msg); + return msg; +} - if(!messages.empty()){ - msg = getFront(); - popMsg(msg); +bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message) +{ + if (message.payload->hasExpired()) { + expired.push_back(message); + return true; + } else { + return false; } - return msg; } void Queue::purgeExpired() @@ -496,37 +435,11 @@ void Queue::purgeExpired() //bother explicitly expiring if the rate of dequeues since last //attempt is less than one per second. - //Note: This method is currently called periodically on the timer - //thread. In a clustered broker this means that the purging does - //not occur on the cluster event dispatch thread and consequently - //that is not totally ordered w.r.t other events (including - //publication of messages). However the cluster does ensure that - //the actual expiration of messages (as distinct from the removing - //of those expired messages from the queue) *is* consistently - //ordered w.r.t. cluster events. This means that delivery of - //messages is in general consistent across the cluster inspite of - //any non-determinism in the triggering of a purge. However at - //present purging a last value queue could potentially cause - //inconsistencies in the cluster (as the order w.r.t publications - //can affect the order in which messages appear in the - //queue). Consequently periodic purging of an LVQ is not enabled - //(expired messages will be removed on delivery and consolidated - //by key as part of normal LVQ operation). - - if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) { - Messages expired; + if (dequeueTracker.sampleRatePerSecond() < 1) { + std::deque<QueuedMessage> expired; { Mutex::ScopedLock locker(messageLock); - for (Messages::iterator i = messages.begin(); i != messages.end();) { - //Re-introduce management of LVQ-specific state here - //if purging is renabled for that case (see note above) - if (i->payload->hasExpired()) { - expired.push_back(*i); - i = messages.erase(i); - } else { - ++i; - } - } + messages->removeIf(boost::bind(&collect_if_expired, expired, _1)); } for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } @@ -552,13 +465,13 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> uint32_t count = 0; // Either purge them all or just the some (purge_count) while the queue isn't empty. - while((!purge_request || purge_count--) && !messages.empty()) { + while((!purge_request || purge_count--) && !messages->empty()) { if (dest.get()) { // // If there is a destination exchange, stage the messages onto a reroute queue // so they don't wind up getting purged more than once. // - DeliverableMessage msg(getFront().payload); + DeliverableMessage msg(messages->front().payload); rerouteQueue.push_back(msg); } popAndDequeue(); @@ -584,64 +497,37 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { uint32_t move_count = qty; // only comes into play if qty >0 uint32_t count = 0; // count how many were moved for returning - while((!qty || move_count--) && !messages.empty()) { - QueuedMessage qmsg = getFront(); + while((!qty || move_count--) && !messages->empty()) { + QueuedMessage qmsg = messages->front(); boost::intrusive_ptr<Message> msg = qmsg.payload; destq->deliver(msg); // deliver message to the destination queue - popMsg(qmsg); + pop(); dequeue(0, qmsg); count++; } return count; } -void Queue::popMsg(QueuedMessage& qmsg) +void Queue::pop() { assertClusterSafe(); - const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders(); - if (lastValueQueue && ft){ - string key = ft->getAsString(qpidVQMatchProperty); - lvq.erase(key); - } - messages.pop_front(); + messages->pop(); ++dequeueTracker; } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ assertClusterSafe(); QueueListeners::NotificationSet copy; + QueuedMessage removed; + bool dequeueRequired = false; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); - LVQ::iterator i; - const framing::FieldTable* ft = msg->getApplicationHeaders(); - if (lastValueQueue && ft){ - string key = ft->getAsString(qpidVQMatchProperty); - - i = lvq.find(key); - if (i == lvq.end() || (broker && broker->isClusterUpdatee())) { - messages.push_back(qm); - listeners.populate(copy); - lvq[key] = msg; - }else { - boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this); - if (!old) old = i->second; - i->second->setReplacementMessage(msg,this); - if (isRecovery) { - //can't issue new requests for the store until - //recovery is complete - pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); - } else { - Mutex::ScopedUnlock u(messageLock); - dequeue(0, QueuedMessage(qm.queue, old, qm.position)); - } - } - }else { - messages.push_back(qm); - listeners.populate(copy); - } + dequeueRequired = messages->push(qm, removed); + listeners.populate(copy); + if (eventMode) { if (eventMgr) eventMgr->enqueued(qm); else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); @@ -651,32 +537,20 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ } } copy.notify(); -} - -QueuedMessage Queue::getFront() -{ - QueuedMessage msg = messages.front(); - if (lastValueQueue) { - boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); - if (replacement.get()) msg.payload = replacement; + if (dequeueRequired) { + if (isRecovery) { + //can't issue new requests for the store until + //recovery is complete + pendingDequeues.push_back(removed); + } else { + dequeue(0, removed); + } } - return msg; } -QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) +void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) { - boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); - if (replacement.get()) { - const framing::FieldTable* ft = replacement->getApplicationHeaders(); - if (ft) { - string key = ft->getAsString(qpidVQMatchProperty); - if (lvq.find(key) != lvq.end()){ - lvq[key] = replacement; - } - } - msg.payload = replacement; - } - return msg; + if (message.payload->isEnqueueComplete()) (*result)++; } /** function only provided for unit tests, or code not in critical message path */ @@ -684,20 +558,14 @@ uint32_t Queue::getEnqueueCompleteMessageCount() const { Mutex::ScopedLock locker(messageLock); uint32_t count = 0; - for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { - //NOTE: don't need to use checkLvqReplace() here as it - //is only relevant for LVQ which does not support persistence - //so the enqueueComplete check has no effect - if ( i->payload->isEnqueueComplete() ) count ++; - } - + messages->foreach(boost::bind(&isEnqueueComplete, &count, _1)); return count; } uint32_t Queue::getMessageCount() const { Mutex::ScopedLock locker(messageLock); - return messages.size(); + return messages->size(); } uint32_t Queue::getConsumerCount() const @@ -717,21 +585,22 @@ void Queue::clearLastNodeFailure() inLastNodeFailure = false; } +void Queue::forcePersistent(QueuedMessage& message) +{ + if(!message.payload->isStoredOnQueue(shared_from_this())) { + message.payload->forcePersistent(); + if (message.payload->isForcedPersistent() ){ + enqueue(0, message.payload); + } + } +} + void Queue::setLastNodeFailure() { if (persistLastNode){ Mutex::ScopedLock locker(messageLock); try { - for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) { - if (lastValueQueue) checkLvqReplace(*i); - // don't force a message twice to disk. - if(!i->payload->isStoredOnQueue(shared_from_this())) { - i->payload->forcePersistent(); - if (i->payload->isForcedPersistent() ){ - enqueue(0, i->payload); - } - } - } + messages->foreach(boost::bind(&Queue::forcePersistent, this, _1)); } catch (const std::exception& e) { // Could not go into last node standing (for example journal not large enough) QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what()); @@ -748,7 +617,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg if (!u.acquired) return false; if (policy.get() && !suppressPolicyCheck) { - Messages dequeues; + std::deque<QueuedMessage> dequeues; { Mutex::ScopedLock locker(messageLock); policy->tryEnqueue(msg); @@ -835,8 +704,8 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) */ void Queue::popAndDequeue() { - QueuedMessage msg = getFront(); - popMsg(msg); + QueuedMessage msg = messages->front(); + pop(); dequeue(0, msg); } @@ -885,13 +754,22 @@ void Queue::configure(const FieldTable& _settings, bool recovering) noLocal = _settings.get(qpidNoLocal); QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal); - lastValueQueue= _settings.get(qpidLastValueQueue); - if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName()); - - lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse); - if (lastValueQueueNoBrowse){ - QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName()); - lastValueQueue = lastValueQueueNoBrowse; + std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey); + if (lvqKey.size()) { + QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey); + messages = std::auto_ptr<Messages>(new MessageMap(lvqKey)); + } else if (_settings.get(qpidLastValueQueueNoBrowse)) { + QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on"); + messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker); + } else if (_settings.get(qpidLastValueQueue)) { + QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue"); + messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker); + } else { + std::auto_ptr<Messages> m = Fairshare::create(_settings); + if (m.get()) { + messages = m; + QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); + } } persistLastNode= _settings.get(qpidPersistLastNode); @@ -919,8 +797,8 @@ void Queue::destroy() { if (alternateExchange.get()) { Mutex::ScopedLock locker(messageLock); - while(!messages.empty()){ - DeliverableMessage msg(getFront().payload); + while(!messages->empty()){ + DeliverableMessage msg(messages->front().payload); alternateExchange->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); popAndDequeue(); @@ -1198,6 +1076,8 @@ bool Queue::isEnqueued(const QueuedMessage& msg) } QueueListeners& Queue::getListeners() { return listeners; } +Messages& Queue::getMessages() { return *messages; } +const Messages& Queue::getMessages() const { return *messages; } void Queue::checkNotDeleted() { diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 96c79d1b92..66e4c5fa22 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -26,6 +26,7 @@ #include "qpid/broker/OwnershipToken.h" #include "qpid/broker/Consumer.h" #include "qpid/broker/Message.h" +#include "qpid/broker/Messages.h" #include "qpid/broker/PersistableQueue.h" #include "qpid/broker/QueuePolicy.h" #include "qpid/broker/QueueBindings.h" @@ -85,10 +86,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, ~ScopedUse() { if (acquired) barrier.release(); } }; - typedef std::deque<QueuedMessage> Messages; - typedef std::map<std::string,boost::intrusive_ptr<Message> > LVQ; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; + const std::string name; const bool autodelete; MessageStore* store; @@ -96,16 +96,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, uint32_t consumerCount; OwnershipToken* exclusive; bool noLocal; - bool lastValueQueue; - bool lastValueQueueNoBrowse; bool persistLastNode; bool inLastNodeFailure; std::string traceId; std::vector<std::string> traceExclude; QueueListeners listeners; - Messages messages; - Messages pendingDequeues;//used to avoid dequeuing during recovery - LVQ lvq; + std::auto_ptr<Messages> messages; + std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery mutable qpid::sys::Mutex consumerLock; mutable qpid::sys::Monitor messageLock; mutable qpid::sys::Mutex ownershipLock; @@ -140,11 +137,10 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool isExcluded(boost::intrusive_ptr<Message>& msg); void dequeued(const QueuedMessage& msg); - void popMsg(QueuedMessage& qmsg); + void pop(); void popAndDequeue(); QueuedMessage getFront(); - QueuedMessage& checkLvqReplace(QueuedMessage& msg); - void clearLVQIndex(const QueuedMessage& msg); + void forcePersistent(QueuedMessage& msg); inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg) { @@ -169,7 +165,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, } } - Messages::iterator findAt(framing::SequenceNumber pos); void checkNotDeleted(); public: @@ -320,13 +315,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, /** Apply f to each Message on the queue. */ template <class F> void eachMessage(F f) { sys::Mutex::ScopedLock l(messageLock); - if (lastValueQueue) { - for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) { - f(checkLvqReplace(*i)); - } - } else { - std::for_each(messages.begin(), messages.end(), f); - } + messages->foreach(f); } /** Apply f to each QueueBinding on the queue */ @@ -352,6 +341,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, // For cluster update QueueListeners& getListeners(); + Messages& getMessages(); + const Messages& getMessages() const; /** * Reserve space in policy for an enqueued message that diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index f311ea8321..4168221ad0 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/QueuePolicy.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/PriorityQueue.h" #include "qpid/Exception.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/reply_exceptions.h" @@ -213,7 +214,10 @@ RingQueuePolicy::RingQueuePolicy(const std::string& _name, bool before(const QueuedMessage& a, const QueuedMessage& b) { - return a.position < b.position; + int priorityA = PriorityQueue::getPriority(a); + int priorityB = PriorityQueue::getPriority(b); + if (priorityA == priorityB) return a.position < b.position; + else return priorityA < priorityB; } void RingQueuePolicy::enqueued(const QueuedMessage& m) diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index c7689577a7..0582945a9c 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -32,6 +32,7 @@ #include "qpid/broker/RecoveredEnqueue.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/Fairshare.h" #include "qpid/broker/Link.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Queue.h" @@ -548,6 +549,13 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi findQueue(qname)->setPosition(position); } +void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count) +{ + if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) { + QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies."); + } +} + void Connection::expiryId(uint64_t id) { cluster.getExpiryPolicy().setId(id); } diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index d90cdd898b..7ee85bf1aa 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -152,6 +152,7 @@ class Connection : uint32_t credit); void queuePosition(const std::string&, const framing::SequenceNumber&); + void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); void expiryId(uint64_t); void txStart(); diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 4f6488a28a..8f751add9b 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -32,6 +32,7 @@ #include "qpid/client/ConnectionImpl.h" #include "qpid/client/Future.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Fairshare.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/LinkRegistry.h" @@ -352,6 +353,10 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1)); q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1)); ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition()); + uint priority, count; + if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) { + ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count); + } } void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 903a20ec28..f2ccd0ba84 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -1191,5 +1191,41 @@ QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) { BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); } +QPID_AUTO_TEST_CASE(testFairsharePriorityDelivery) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + + FieldTable arguments; + arguments.setInt("x-qpid-priorities", 10); + arguments.setInt("x-qpid-fairshare", 5); + c0.session.queueDeclare("q", arg::durable=durableFlag, arg::arguments=arguments); + + //send messages of different priorities + for (int i = 0; i < 20; i++) { + Message msg = makeMessage((boost::format("msg-%1%") % i).str(), "q", durableFlag); + msg.getDeliveryProperties().setPriority(i % 2 ? 9 : 5); + c0.session.messageTransfer(arg::content=msg); + } + + //pull off a couple of the messages (first four should be the top priority messages + for (int i = 0; i < 4; i++) { + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % ((i*2)+1)).str(), c0.subs.get("q", TIMEOUT).getData()); + } + + // Add another member + cluster.add(); + Client c1(cluster[1], "c1"); + + //pull off some more messages + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 9).str(), c0.subs.get("q", TIMEOUT).getData()); + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 0).str(), c1.subs.get("q", TIMEOUT).getData()); + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 2).str(), c0.subs.get("q", TIMEOUT).getData()); + + //check queue has same content on both nodes + BOOST_CHECK_EQUAL(browse(c0, "q", 12), browse(c1, "q", 12)); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/cpp/src/tests/qpid-receive.cpp b/cpp/src/tests/qpid-receive.cpp index 28e229ca27..012d544a2e 100644 --- a/cpp/src/tests/qpid-receive.cpp +++ b/cpp/src/tests/qpid-receive.cpp @@ -206,6 +206,7 @@ int main(int argc, char ** argv) if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl; if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl; if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl; + if (msg.getPriority()) std::cout << "Priority: " << msg.getPriority() << std::endl; if (msg.getDurable()) std::cout << "Durable: true" << std::endl; if (msg.getRedelivered()) std::cout << "Redelivered: true" << std::endl; std::cout << "Properties: " << msg.getProperties() << std::endl; diff --git a/cpp/src/tests/qpid-send.cpp b/cpp/src/tests/qpid-send.cpp index c71cb83f9a..3824a870bf 100644 --- a/cpp/src/tests/qpid-send.cpp +++ b/cpp/src/tests/qpid-send.cpp @@ -56,6 +56,7 @@ struct Options : public qpid::Options uint sendEos; bool durable; uint ttl; + uint priority; std::string userid; std::string correlationid; string_vector properties; @@ -84,6 +85,7 @@ struct Options : public qpid::Options sendEos(0), durable(false), ttl(0), + priority(0), contentString(), contentSize(0), contentStdin(false), @@ -110,6 +112,7 @@ struct Options : public qpid::Options ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input") ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds") + ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)") ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property") ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message") ("user-id", qpid::optValue(userid, "USERID"), "userid for message") @@ -266,6 +269,9 @@ int main(int argc, char ** argv) if (opts.ttl) { msg.setTtl(Duration(opts.ttl)); } + if (opts.priority) { + msg.setPriority(opts.priority); + } if (!opts.replyto.empty()) msg.setReplyTo(Address(opts.replyto)); if (!opts.userid.empty()) msg.setUserId(opts.userid); if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid); |