diff options
author | Alan Conway <aconway@apache.org> | 2012-01-19 23:00:38 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-01-19 23:00:38 +0000 |
commit | 26161156a17164d62010dd9d531a96906b73ce0e (patch) | |
tree | fcdb5c40a42565bbb393617098643771838d8c39 | |
parent | 5ea482321f8ed09ab7c5fd26634d0174ad5ef6fd (diff) | |
download | qpid-python-26161156a17164d62010dd9d531a96906b73ce0e.tar.gz |
QPID-3603: Prototype of replicating browser.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233624 13f79535-47bb-0310-9956-ffa450edef68
28 files changed, 760 insertions, 277 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index c5d2a45f69..d4596f55eb 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1007,6 +1007,7 @@ set (qpidbroker_SOURCES qpid/broker/QueueListeners.cpp qpid/broker/FifoDistributor.cpp qpid/broker/MessageGroupManager.cpp + qpid/broker/QueueReplicator.cpp qpid/broker/PersistableMessage.cpp qpid/broker/Bridge.cpp qpid/broker/Connection.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index fb26251da0..c1e42d382a 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -625,6 +625,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueuedMessage.h \ qpid/broker/QueueFlowLimit.h \ qpid/broker/QueueFlowLimit.cpp \ + qpid/broker/QueueReplicator.h \ + qpid/broker/QueueReplicator.cpp \ qpid/broker/RateFlowcontrol.h \ qpid/broker/RecoverableConfig.h \ qpid/broker/RecoverableExchange.h \ diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 12c2194381..340e53c542 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -24,6 +24,7 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/Link.h" #include "qpid/broker/LinkRegistry.h" +#include "qpid/broker/QueueReplicator.h" #include "qpid/broker/SessionState.h" #include "qpid/management/ManagementAgent.h" @@ -96,8 +97,11 @@ void Bridge::create(Connection& c) } if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking(); - if (args.i_srcIsQueue) { - peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); + if (args.i_srcIsQueue) { + //TODO: something other than this which is nasty... + bool isReplicatingLink = QueueReplicator::initReplicationSettings(args.i_dest, link->getBroker()->getQueues(), options); + + peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, isReplicatingLink ? 1 : 0, false, "", 0, options); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest); diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 0b8fe95d5e..43ca1ae04b 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -37,7 +37,7 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, bool _acquired, bool accepted, bool _windowing, - uint32_t _credit) : msg(_msg), + uint32_t _credit, bool _delayedCompletion) : msg(_msg), queue(_queue), tag(_tag), acquired(_acquired), @@ -46,7 +46,8 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, completed(false), ended(accepted && acquired), windowing(_windowing), - credit(msg.payload ? msg.payload->getRequiredCredit() : _credit) + credit(msg.payload ? msg.payload->getRequiredCredit() : _credit), + delayedCompletion(_delayedCompletion) {} bool DeliveryRecord::setEnded() @@ -111,8 +112,14 @@ void DeliveryRecord::complete() { } bool DeliveryRecord::accept(TransactionContext* ctxt) { - if (acquired && !ended) { - queue->dequeue(ctxt, msg); + if (!ended) { + if (acquired) { + queue->dequeue(ctxt, msg); + } else if (delayedCompletion) { + //TODO: this is a nasty way to do this; change it + msg.payload->getIngressCompletion().finishCompleter(); + QPID_LOG(debug, "Completed " << msg.payload.get()); + } setEnded(); QPID_LOG(debug, "Accepted " << id); } diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h index 5a331357be..90e72aaf0d 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h @@ -63,6 +63,7 @@ class DeliveryRecord * after that). */ uint32_t credit; + bool delayedCompletion; public: QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg, @@ -71,7 +72,8 @@ class DeliveryRecord bool acquired, bool accepted, bool windowing, - uint32_t credit=0 // Only used if msg is empty. + uint32_t credit=0, // Only used if msg is empty. + bool delayedCompletion=false ); bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); } diff --git a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp index eb1f0a402e..074c2b9a9d 100644 --- a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp +++ b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp @@ -30,11 +30,7 @@ FifoDistributor::FifoDistributor(Messages& container) bool FifoDistributor::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next ) { - if (!messages.empty()) { - next = messages.front(); // by default, consume oldest msg - return true; - } - return false; + return messages.consume(next); } bool FifoDistributor::allocate(const std::string&, const QueuedMessage& ) @@ -46,9 +42,7 @@ bool FifoDistributor::allocate(const std::string&, const QueuedMessage& ) bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { - if (!messages.empty() && messages.next(c->getPosition(), next)) - return true; - return false; + return messages.browse(c->getPosition(), next, false); } void FifoDistributor::query(qpid::types::Variant::Map&) const diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp index 3262e343a3..49c0a32c19 100644 --- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp @@ -32,7 +32,7 @@ void LegacyLVQ::setNoBrowse(bool b) noBrowse = b; } -bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& message) +bool LegacyLVQ::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { Ordering::iterator i = messages.find(position); if (i != messages.end() && i->second.payload == message.payload) { @@ -44,9 +44,9 @@ bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& m } } -bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message) +bool LegacyLVQ::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) { - if (MessageMap::next(position, message)) { + if (MessageMap::browse(position, message, unacquired)) { if (!noBrowse) index.erase(getKey(message)); return true; } else { diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h index dd0fd7aaec..695e51131d 100644 --- a/qpid/cpp/src/qpid/broker/LegacyLVQ.h +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.h @@ -40,8 +40,8 @@ class LegacyLVQ : public MessageMap { public: LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0); - bool remove(const framing::SequenceNumber&, QueuedMessage&); - bool next(const framing::SequenceNumber&, QueuedMessage&); + bool acquire(const framing::SequenceNumber&, QueuedMessage&); + bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); bool push(const QueuedMessage& added, QueuedMessage& removed); void removeIf(Predicate); void setNoBrowse(bool); diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp index 24b8f6f895..c9e91495c8 100644 --- a/qpid/cpp/src/qpid/broker/MessageDeque.cpp +++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp @@ -20,121 +20,155 @@ */ #include "qpid/broker/MessageDeque.h" #include "qpid/broker/QueuedMessage.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { -size_t MessageDeque::size() -{ - return messages.size(); -} - -bool MessageDeque::empty() -{ - return messages.empty(); -} +MessageDeque::MessageDeque() : available(0), head(0) {} -void MessageDeque::reinsert(const QueuedMessage& message) +size_t MessageDeque::index(const framing::SequenceNumber& position) { - messages.insert(lower_bound(messages.begin(), messages.end(), message), message); -} - -MessageDeque::Deque::iterator MessageDeque::seek(const framing::SequenceNumber& position) -{ - if (!messages.empty()) { - QueuedMessage comp; - comp.position = position; - unsigned long diff = position.getValue() - messages.front().position.getValue(); - long maxEnd = diff < messages.size()? diff : messages.size(); - return lower_bound(messages.begin(),messages.begin()+maxEnd,comp); - } else { - return messages.end(); - } + //assuming a monotonic sequence, with no messages removed except + //from the ends of the deque, we can use the position to determin + //an index into the deque + if (messages.empty() || position < messages.front().position) return 0; + return position - messages.front().position; } -bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove) +bool MessageDeque::deleted(const QueuedMessage& m) { - Deque::iterator i = seek(position); - if (i != messages.end() && i->position == position) { - message = *i; - if (remove) messages.erase(i); + size_t i = index(m.position); + if (i < messages.size()) { + messages[i].status = QueuedMessage::DELETED; + clean(); return true; } else { return false; } } -bool MessageDeque::remove(const framing::SequenceNumber& position, QueuedMessage& message) +size_t MessageDeque::size() { - return find(position, message, true); + return available; } -bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message) +void MessageDeque::release(const QueuedMessage& message) { - return find(position, message, false); + size_t i = index(message.position); + if (i < messages.size()) { + QueuedMessage& m = messages[i]; + if (m.status == QueuedMessage::ACQUIRED) { + if (head > i) head = i; + m.status = QueuedMessage::AVAILABLE; + ++available; + } + } else { + QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")"); + } } -bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message) +bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { - if (messages.empty()) { - return false; - } else if (position < front().position) { - message = front(); - return true; - } else { - Deque::iterator i = seek(position+1); - if (i != messages.end()) { - message = *i; + size_t i = index(position); + if (i < messages.size()) { + QueuedMessage& temp = messages[i]; + if (temp.status == QueuedMessage::AVAILABLE) { + temp.status = QueuedMessage::ACQUIRED; + --available; + message = temp; return true; - } else { - return false; } } + return false; } -QueuedMessage& MessageDeque::front() +bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message) { - return messages.front(); + size_t i = index(position); + if (i < messages.size()) { + message = messages[i]; + return true; + } else { + return false; + } } -void MessageDeque::pop() +bool MessageDeque::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) { - if (!messages.empty()) { - messages.pop_front(); + //get first message that is greater than position + size_t i = index(position + 1); + while (i < messages.size()) { + QueuedMessage& m = messages[i++]; + if (m.status == QueuedMessage::AVAILABLE || (!unacquired && m.status == QueuedMessage::ACQUIRED)) { + message = m; + return true; + } } + return false; } -bool MessageDeque::pop(QueuedMessage& out) +bool MessageDeque::consume(QueuedMessage& message) { - if (messages.empty()) { - return false; - } else { - out = front(); - messages.pop_front(); - return true; + while (head < messages.size()) { + QueuedMessage& i = messages[head++]; + if (i.status == QueuedMessage::AVAILABLE) { + i.status = QueuedMessage::ACQUIRED; + --available; + message = i; + return true; + } } + return false; } bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { + //add padding to prevent gaps in sequence, which break the index + //calculation (needed for queue replication) + while (messages.size() && (added.position - messages.back().position) > 1) { + QueuedMessage dummy; + dummy.position = messages.back().position + 1; + dummy.status = QueuedMessage::DELETED; + messages.push_back(dummy); + QPID_LOG(debug, "Adding padding at " << dummy.position << ", between " << messages.back().position << " and " << added.position); + } messages.push_back(added); + messages.back().status = QueuedMessage::AVAILABLE; + if (head >= messages.size()) head = messages.size() - 1; + ++available; return false;//adding a message never causes one to be removed for deque } +void MessageDeque::clean() +{ + while (messages.size() && messages.front().status == QueuedMessage::DELETED) { + messages.pop_front(); + if (head) --head; + } +} + void MessageDeque::foreach(Functor f) { - std::for_each(messages.begin(), messages.end(), f); + for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->status == QueuedMessage::AVAILABLE) { + f(*i); + } + } } void MessageDeque::removeIf(Predicate p) { - for (Deque::iterator i = messages.begin(); i != messages.end();) { - if (p(*i)) { - i = messages.erase(i); - } else { - ++i; + for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->status == QueuedMessage::AVAILABLE && p(*i)) { + //Use special status for this as messages are not yet + //dequeued, but should not be considered on the queue + //either (used for purging and moving) + i->status = QueuedMessage::REMOVED; + --available; } } + clean(); } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.h b/qpid/cpp/src/qpid/broker/MessageDeque.h index 0e1aef2986..4d3a5dcdd5 100644 --- a/qpid/cpp/src/qpid/broker/MessageDeque.h +++ b/qpid/cpp/src/qpid/broker/MessageDeque.h @@ -34,17 +34,14 @@ namespace broker { class MessageDeque : public Messages { public: + MessageDeque(); size_t size(); - bool empty(); - - void reinsert(const QueuedMessage&); - bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool deleted(const QueuedMessage&); + void release(const QueuedMessage&); + bool acquire(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); - bool next(const framing::SequenceNumber&, QueuedMessage&); - - QueuedMessage& front(); - void pop(); - bool pop(QueuedMessage&); + bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); + bool consume(QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); void foreach(Functor); @@ -53,9 +50,11 @@ class MessageDeque : public Messages private: typedef std::deque<QueuedMessage> Deque; Deque messages; + size_t available; + size_t head; - Deque::iterator seek(const framing::SequenceNumber&); - bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove); + size_t index(const framing::SequenceNumber&); + void clean(); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp index 7054ef0310..77f8a0b5df 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -204,7 +204,7 @@ MessageGroupManager::~MessageGroupManager() } bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { - if (messages.empty()) + if (!messages.size()) return false; next.position = c->getPosition(); @@ -216,15 +216,16 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued } } - while (messages.next( next.position, next )) { + while (messages.browse( next.position, next, true )) { GroupState& group = findGroup(next); if (!group.owned()) { - if (group.members.front() == next.position) { // only take from head! + //TODO: make acquire more efficient when we already have the message in question + if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head! return true; } QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group << "'s head message still pending. pos=" << group.members.front()); - } else if (group.owner == c->getName()) { + } else if (group.owner == c->getName() && messages.acquire(next.position, next)) { return true; } } @@ -249,9 +250,7 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { // browse: allow access to any available msg, regardless of group ownership (?ok?) - if (!messages.empty() && messages.next(c->getPosition(), next)) - return true; - return false; + return messages.browse(c->getPosition(), next, false); } void MessageGroupManager::query(qpid::types::Variant::Map& status) const diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp index 39e23df533..048df45434 100644 --- a/qpid/cpp/src/qpid/broker/MessageMap.cpp +++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp @@ -27,6 +27,8 @@ namespace { const std::string EMPTY; } +bool MessageMap::deleted(const QueuedMessage&) { return true; } + std::string MessageMap::getKey(const QueuedMessage& message) { const framing::FieldTable* ft = message.payload->getApplicationHeaders(); @@ -44,7 +46,7 @@ bool MessageMap::empty() return messages.empty(); } -void MessageMap::reinsert(const QueuedMessage& message) +void MessageMap::release(const QueuedMessage& message) { std::string key = getKey(message); Index::iterator i = index.find(key); @@ -54,7 +56,7 @@ void MessageMap::reinsert(const QueuedMessage& message) } //else message has already been replaced } -bool MessageMap::remove(const framing::SequenceNumber& position, QueuedMessage& message) +bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { Ordering::iterator i = messages.find(position); if (i != messages.end()) { @@ -77,38 +79,22 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me } } -bool MessageMap::next(const framing::SequenceNumber& position, QueuedMessage& message) +bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool) { - if (!messages.empty() && position < front().position) { - message = front(); + Ordering::iterator i = messages.lower_bound(position+1); + if (i != messages.end()) { + message = i->second; return true; } else { - Ordering::iterator i = messages.lower_bound(position+1); - if (i != messages.end()) { - message = i->second; - return true; - } else { - return false; - } + return false; } } -QueuedMessage& MessageMap::front() -{ - return messages.begin()->second; -} - -void MessageMap::pop() -{ - QueuedMessage dummy; - pop(dummy); -} - -bool MessageMap::pop(QueuedMessage& out) +bool MessageMap::consume(QueuedMessage& message) { Ordering::iterator i = messages.begin(); if (i != messages.end()) { - out = i->second; + message = i->second; erase(i); return true; } else { diff --git a/qpid/cpp/src/qpid/broker/MessageMap.h b/qpid/cpp/src/qpid/broker/MessageMap.h index 1128a1d54a..d1b8217f9b 100644 --- a/qpid/cpp/src/qpid/broker/MessageMap.h +++ b/qpid/cpp/src/qpid/broker/MessageMap.h @@ -43,14 +43,12 @@ class MessageMap : public Messages size_t size(); bool empty(); - void reinsert(const QueuedMessage&); - virtual bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool deleted(const QueuedMessage&); + void release(const QueuedMessage&); + virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); - virtual bool next(const framing::SequenceNumber&, QueuedMessage&); - - QueuedMessage& front(); - void pop(); - bool pop(QueuedMessage&); + virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); + bool consume(QueuedMessage&); virtual bool push(const QueuedMessage& added, QueuedMessage& removed); void foreach(Functor); diff --git a/qpid/cpp/src/qpid/broker/Messages.h b/qpid/cpp/src/qpid/broker/Messages.h index 448f17432a..89f6d383ae 100644 --- a/qpid/cpp/src/qpid/broker/Messages.h +++ b/qpid/cpp/src/qpid/broker/Messages.h @@ -46,22 +46,21 @@ class Messages * @return the number of messages available for delivery. */ virtual size_t size() = 0; + /** - * @return true if there are no messages for delivery, false otherwise + * Called when a message is deleted from the queue. */ - virtual bool empty() = 0; - + virtual bool deleted(const QueuedMessage&) = 0; /** - * Re-inserts a message back into its original position - used - * when requeing released messages. + * Releases an acquired message, making it available again. */ - virtual void reinsert(const QueuedMessage&) = 0; + virtual void release(const QueuedMessage&) = 0; /** - * Remove the message at the specified position, returning true if - * found, false otherwise. The removed message is passed back via - * the second parameter. + * Acquire the message at the specified position, returning true + * if found, false otherwise. The acquired message is passed back + * via the second parameter. */ - virtual bool remove(const framing::SequenceNumber&, QueuedMessage&) = 0; + virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&) = 0; /** * Find the message at the specified position, returning true if * found, false otherwise. The matched message is passed back via @@ -69,30 +68,22 @@ class Messages */ virtual bool find(const framing::SequenceNumber&, QueuedMessage&) = 0; /** - * Return the next message to be given to a browsing subscrption - * that has reached the specified poisition. The next messages is - * passed back via the second parameter. + * Retrieve the next message to be given to a browsing + * subscription that has reached the specified position. The next + * message is passed back via the second parameter. + * + * @param unacquired, if true, will only browse unacquired messages * * @return true if there is another message, false otherwise. */ - virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0; + virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool unacquired) = 0; /** - * Note: Caller is responsible for ensuring that there is a front - * (e.g. empty() returns false) + * Retrieve the next message available for a consuming + * subscription. * - * @return the next message to be delivered - */ - virtual QueuedMessage& front() = 0; - /** - * Removes the front message - */ - virtual void pop() = 0; - /** - * @return true if there is a mesage to be delivered - in which - * case that message will be returned via the parameter and - * removed - otherwise false. + * @return true if there is such a message, false otherwise. */ - virtual bool pop(QueuedMessage&) = 0; + virtual bool consume(QueuedMessage&) = 0; /** * Pushes a message to the back of the 'queue'. For some types of * queue this may cause another message to be removed; if that is diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp index e07e73d323..d807ef22b1 100644 --- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp @@ -32,6 +32,8 @@ PriorityQueue::PriorityQueue(int l) : messages(levels, Deque()), frontLevel(0), haveFront(false), cached(false) {} +bool PriorityQueue::deleted(const QueuedMessage&) { return true; } + size_t PriorityQueue::size() { size_t total(0); @@ -41,15 +43,7 @@ size_t PriorityQueue::size() return total; } -bool PriorityQueue::empty() -{ - for (int i = 0; i < levels; ++i) { - if (!messages[i].empty()) return false; - } - return true; -} - -void PriorityQueue::reinsert(const QueuedMessage& message) +void PriorityQueue::release(const QueuedMessage& message) { uint p = getPriorityLevel(message); messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message); @@ -78,7 +72,7 @@ bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& return false; } -bool PriorityQueue::remove(const framing::SequenceNumber& position, QueuedMessage& message) +bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { return find(position, message, true); } @@ -88,7 +82,7 @@ bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& return find(position, message, false); } -bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& message) +bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool) { QueuedMessage match; match.position = position+1; @@ -112,16 +106,7 @@ bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& return found; } -QueuedMessage& PriorityQueue::front() -{ - if (checkFront()) { - return messages[frontLevel].front(); - } else { - throw qpid::framing::InternalErrorException(QPID_MSG("No message available")); - } -} - -bool PriorityQueue::pop(QueuedMessage& message) +bool PriorityQueue::consume(QueuedMessage& message) { if (checkFront()) { message = messages[frontLevel].front(); @@ -133,12 +118,6 @@ bool PriorityQueue::pop(QueuedMessage& message) } } -void PriorityQueue::pop() -{ - QueuedMessage dummy; - pop(dummy); -} - bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { messages[getPriorityLevel(added)].push_back(added); diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h index 4bf9d26a9d..67c31468d2 100644 --- a/qpid/cpp/src/qpid/broker/PriorityQueue.h +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h @@ -40,16 +40,13 @@ class PriorityQueue : public Messages PriorityQueue(int levels); virtual ~PriorityQueue() {} size_t size(); - bool empty(); - void reinsert(const QueuedMessage&); - bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool deleted(const QueuedMessage&); + void release(const QueuedMessage&); + bool acquire(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); - bool next(const framing::SequenceNumber&, QueuedMessage&); - - QueuedMessage& front(); - void pop(); - bool pop(QueuedMessage&); + bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); + bool consume(QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); void foreach(Functor); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 329fd1cb8c..b34bc65ec5 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -213,7 +213,7 @@ void Queue::requeue(const QueuedMessage& msg){ { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; - messages->reinsert(msg); + messages->release(msg); listeners.populate(copy); // for persistLastNode - don't force a message twice to disk, but force it if no force before @@ -296,46 +296,41 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { + QueuedMessage msg; while (true) { Mutex::ScopedLock locker(messageLock); - QueuedMessage msg; - - if (!allocator->nextConsumableMessage(c, msg)) { // no next available - QPID_LOG(debug, "No messages available to dispatch to consumer " << - c->getName() << " on queue '" << name << "'"); - listeners.addListener(c); - return NO_MESSAGES; - } - - if (msg.payload->hasExpired()) { - QPID_LOG(debug, "Message expired from queue '" << name << "'"); - c->setPosition(msg.position); - acquire( msg.position, msg, locker); - dequeue( 0, msg ); - continue; - } - - // a message is available for this consumer - can the consumer use it? + if (allocator->nextConsumableMessage(c, msg)) { + if (msg.payload->hasExpired()) { + QPID_LOG(debug, "Message expired from queue '" << name << "'"); + c->setPosition(msg.position) + ; + dequeue(0, msg); + continue; + } - if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { - bool ok = allocator->allocate( c->getName(), msg ); // inform allocator - (void) ok; assert(ok); - ok = acquire( msg.position, msg, locker); - (void) ok; assert(ok); - m = msg; - c->setPosition(m.position); - return CONSUMED; + if (c->filter(msg.payload)) { + if (c->accept(msg.payload)) { + bool ok = allocator->allocate( c->getName(), msg ); // inform allocator + (void) ok; assert(ok); + observeAcquire(msg, locker); + m = msg; + return CONSUMED; + } else { + //message(s) are available but consumer hasn't got enough credit + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + messages->release(msg); + return CANT_CONSUME; + } } else { - //message(s) are available but consumer hasn't got enough credit - QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + //consumer will never want this message + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + messages->release(msg); return CANT_CONSUME; } } else { - //consumer will never want this message - QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); - c->setPosition(msg.position); - return CANT_CONSUME; + QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + listeners.addListener(c); + return NO_MESSAGES; } } } @@ -398,7 +393,6 @@ bool Queue::dispatch(Consumer::shared_ptr c) } bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const { - Mutex::ScopedLock locker(messageLock); if (messages->find(pos, msg)) return true; @@ -460,7 +454,7 @@ void Queue::cancel(Consumer::shared_ptr c){ QueuedMessage Queue::get(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); - if (messages->pop(msg)) + if (messages->consume(msg)) observeAcquire(msg, locker); return msg; } @@ -632,6 +626,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> // Update observers and message state: observeAcquire(*qmsg, locker); dequeue(0, *qmsg); + QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName()); // now reroute if necessary if (dest.get()) { assert(qmsg->payload); @@ -663,24 +658,11 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, return c.matches.size(); } -/** Acquire the front (oldest) message from the in-memory queue. - * assumes messageLock held by caller - */ -void Queue::pop(const Mutex::ScopedLock& locker) -{ - assertClusterSafe(); - QueuedMessage msg; - if (messages->pop(msg)) { - observeAcquire(msg, locker); - ++dequeueSincePurge; - } -} - /** Acquire the message at the given position, return true and msg if acquire succeeds */ bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, const Mutex::ScopedLock& locker) { - if (messages->remove(position, msg)) { + if (messages->acquire(position, msg)) { observeAcquire(msg, locker); ++dequeueSincePurge; return true; @@ -867,12 +849,13 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) * Removes the first (oldest) message from the in-memory delivery queue as well dequeing * it from the logical (and persistent if applicable) queue */ -void Queue::popAndDequeue(const Mutex::ScopedLock& held) +bool Queue::popAndDequeue(QueuedMessage& msg) { - if (!messages->empty()) { - QueuedMessage msg = messages->front(); - pop(held); + if (messages->consume(msg)) { dequeue(0, msg); + return true; + } else { + return false; } } @@ -884,6 +867,7 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) { if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); + messages->deleted(msg); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->dequeued(msg); @@ -1070,10 +1054,10 @@ void Queue::destroyed() unbind(broker->getExchanges()); if (alternateExchange.get()) { Mutex::ScopedLock locker(messageLock); - while(!messages->empty()){ - DeliverableMessage msg(messages->front().payload); + QueuedMessage m; + while(popAndDequeue(m)){ + DeliverableMessage msg(m.payload); alternateExchange->routeWithAlternate(msg); - popAndDequeue(locker); } alternateExchange->decAlternateUsers(); } @@ -1336,6 +1320,7 @@ void Queue::query(qpid::types::Variant::Map& results) const void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); sequence = n; + QPID_LOG(info, "Set position to " << sequence << " on " << getName()); } SequenceNumber Queue::getPosition() { @@ -1421,6 +1406,12 @@ void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) observers.insert(observer); } +void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer) +{ + Mutex::ScopedLock locker(messageLock); + observers.erase(observer); +} + void Queue::flush() { ScopedUse u(barrier); @@ -1452,6 +1443,38 @@ void Queue::setDequeueSincePurge(uint32_t value) { dequeueSincePurge = value; } +namespace{ +class FindLowest +{ + public: + FindLowest() : init(false) {} + void process(const QueuedMessage& message) { + QPID_LOG(debug, "FindLowest processing: " << message.position); + if (!init || message.position < lowest) lowest = message.position; + init = true; + } + bool getLowest(qpid::framing::SequenceNumber& result) { + if (init) { + result = lowest; + return true; + } else { + return false; + } + } + private: + bool init; + qpid::framing::SequenceNumber lowest; +}; +} + +bool Queue::getOldest(qpid::framing::SequenceNumber& oldest) +{ + //Horribly inefficient, but saves modifying Messages interface and + //all its implementations at present: + FindLowest f; + eachMessage(boost::bind(&FindLowest::process, &f, _1)); + return f.getLowest(oldest); +} Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 59ae41e768..b66600ef43 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -148,10 +148,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); - - /** modify the Queue's message container - assumes messageLock held */ - void pop(const sys::Mutex::ScopedLock& held); // acquire front msg - void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg + bool popAndDequeue(QueuedMessage&); // acquire message @ position, return true and set msg if acquire succeeds bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, const sys::Mutex::ScopedLock& held); @@ -386,6 +383,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, */ QPID_BROKER_EXTERN framing::SequenceNumber getPosition(); void addObserver(boost::shared_ptr<QueueObserver>); + void removeObserver(boost::shared_ptr<QueueObserver>); QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key); /** * Notify queue that recovery has completed. @@ -409,6 +407,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } void setDequeueSincePurge(uint32_t value); + bool getOldest(framing::SequenceNumber& result); }; } } diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.cpp b/qpid/cpp/src/qpid/broker/QueueReplicator.cpp new file mode 100644 index 0000000000..01c0c8e272 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/QueueReplicator.cpp @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/QueueReplicator.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace broker { + +QueueReplicator::QueueReplicator(const std::string& name, boost::shared_ptr<Queue> q) : Exchange(name, 0, 0), queue(q), current(queue->getPosition()) {} +QueueReplicator::~QueueReplicator() {} + +namespace { +const std::string DEQUEUE_EVENT("dequeue-event"); +const std::string REPLICATOR("qpid.replicator-"); +} + +void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/) +{ + if (key == DEQUEUE_EVENT) { + std::string content; + msg.getMessage().getFrames().getContent(content); + qpid::framing::Buffer buffer(const_cast<char*>(content.c_str()), content.size()); + qpid::framing::SequenceSet latest; + latest.decode(buffer); + + //TODO: should be able to optimise the following + for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) { + if (current < *i) { + //haven't got that far yet, record the dequeue + dequeued.add(*i); + QPID_LOG(debug, "Recording dequeue of message at " << *i << " from " << queue->getName()); + } else { + QueuedMessage message; + if (queue->acquireMessageAt(*i, message)) { + queue->dequeue(0, message); + QPID_LOG(info, "Dequeued message at " << *i << " from " << queue->getName()); + } else { + QPID_LOG(error, "Unable to dequeue message at " << *i << " from " << queue->getName()); + } + } + } + } else { + //take account of any gaps in sequence created by messages + //dequeued before our subscription reached them + while (dequeued.contains(++current)) { + dequeued.remove(current); + QPID_LOG(debug, "Skipping dequeued message at " << current << " from " << queue->getName()); + queue->setPosition(current); + } + QPID_LOG(info, "Enqueued message on " << queue->getName() << "; currently at " << current); + msg.deliverTo(queue); + } +} + +bool QueueReplicator::isReplicatingLink(const std::string& name) +{ + return name.find(REPLICATOR) == 0; +} + +boost::shared_ptr<Exchange> QueueReplicator::create(const std::string& target, QueueRegistry& queues) +{ + boost::shared_ptr<Exchange> exchange; + if (isReplicatingLink(target)) { + std::string queueName = target.substr(REPLICATOR.size()); + boost::shared_ptr<Queue> queue = queues.find(queueName); + if (!queue) { + QPID_LOG(warning, "Unable to create replicator, can't find " << queueName); + } else { + //TODO: need to cache the replicator + QPID_LOG(info, "Creating replicator for " << queueName); + exchange.reset(new QueueReplicator(target, queue)); + } + } + return exchange; +} + +bool QueueReplicator::initReplicationSettings(const std::string& target, QueueRegistry& queues, qpid::framing::FieldTable& settings) +{ + if (isReplicatingLink(target)) { + std::string queueName = target.substr(REPLICATOR.size()); + boost::shared_ptr<Queue> queue = queues.find(queueName); + if (queue) { + settings.setInt("qpid.replicating-subscription", 1); + settings.setInt("qpid.high_sequence_number", queue->getPosition()); + qpid::framing::SequenceNumber oldest; + if (queue->getOldest(oldest)) { + settings.setInt("qpid.low_sequence_number", oldest); + } + } + return true; + } else { + return false; + } +} + +bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; } +bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; } +bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; } + +const std::string QueueReplicator::typeName("queue-replicator"); + +std::string QueueReplicator::getType() const +{ + return typeName; +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.h b/qpid/cpp/src/qpid/broker/QueueReplicator.h new file mode 100644 index 0000000000..679aa9240d --- /dev/null +++ b/qpid/cpp/src/qpid/broker/QueueReplicator.h @@ -0,0 +1,57 @@ +#ifndef QPID_BROKER_QUEUEREPLICATOR_H +#define QPID_BROKER_QUEUEREPLICATOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Exchange.h" +#include "qpid/framing/SequenceSet.h" + +namespace qpid { +namespace broker { + +class QueueRegistry; + +/** + * Dummy exchange for processing replication messages + */ +class QueueReplicator : public Exchange +{ + public: + QueueReplicator(const std::string& name, boost::shared_ptr<Queue>); + ~QueueReplicator(); + std::string getType() const; + bool bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*); + bool unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*); + void route(Deliverable&, const std::string&, const qpid::framing::FieldTable*); + bool isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const); + static bool isReplicatingLink(const std::string&); + static boost::shared_ptr<Exchange> create(const std::string&, QueueRegistry&); + static bool initReplicationSettings(const std::string&, QueueRegistry&, qpid::framing::FieldTable&); + static const std::string typeName; + private: + boost::shared_ptr<Queue> queue; + qpid::framing::SequenceNumber current; + qpid::framing::SequenceSet dequeued; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_QUEUEREPLICATOR_H*/ diff --git a/qpid/cpp/src/qpid/broker/QueuedMessage.h b/qpid/cpp/src/qpid/broker/QueuedMessage.h index 35e48b11f3..051ade41ea 100644 --- a/qpid/cpp/src/qpid/broker/QueuedMessage.h +++ b/qpid/cpp/src/qpid/broker/QueuedMessage.h @@ -32,6 +32,7 @@ struct QueuedMessage { boost::intrusive_ptr<Message> payload; framing::SequenceNumber position; + enum {AVAILABLE, ACQUIRED, DELETED, REMOVED} status; Queue* queue; QueuedMessage() : queue(0) {} diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 2b9fd247f5..86ecba7aaa 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -26,6 +26,7 @@ #include "qpid/broker/DtxTimeout.h" #include "qpid/broker/Message.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/QueueReplicator.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/SessionOutputException.h" #include "qpid/broker/TxAccept.h" @@ -114,7 +115,7 @@ void SemanticState::consume(const string& tag, // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination). // Create a globally unique name so the broker can identify individual consumers std::string name = session.getSessionId().str() + SEPARATOR + tag; - ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); + ConsumerImpl::shared_ptr c(ConsumerImpl::create(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception consumers[tag] = c; } @@ -264,6 +265,224 @@ void SemanticState::record(const DeliveryRecord& delivery) const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); +class ReplicatingSubscription : public SemanticState::ConsumerImpl, public QueueObserver +{ + public: + ReplicatingSubscription(SemanticState* parent, + const std::string& name, boost::shared_ptr<Queue> queue, + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); + ~ReplicatingSubscription(); + + void init(); + void cancel(); + bool deliver(QueuedMessage& msg); + void enqueued(const QueuedMessage&); + void dequeued(const QueuedMessage&); + void acquired(const QueuedMessage&) {} + void requeued(const QueuedMessage&) {} + + protected: + bool doDispatch(); + private: + boost::shared_ptr<Queue> events; + boost::shared_ptr<Consumer> consumer; + qpid::framing::SequenceSet range; + + void generateDequeueEvent(); + class DelegatingConsumer : public Consumer + { + public: + DelegatingConsumer(ReplicatingSubscription&); + ~DelegatingConsumer(); + bool deliver(QueuedMessage& msg); + void notify(); + bool filter(boost::intrusive_ptr<Message>); + bool accept(boost::intrusive_ptr<Message>); + void cancel() {} + OwnershipToken* getSession(); + private: + ReplicatingSubscription& delegate; + }; +}; + +SemanticState::ConsumerImpl::shared_ptr SemanticState::ConsumerImpl::create(SemanticState* parent, + const string& name, + Queue::shared_ptr queue, + bool ack, + bool acquire, + bool exclusive, + const string& tag, + const string& resumeId, + uint64_t resumeTtl, + const framing::FieldTable& arguments) +{ + if (arguments.isSet("qpid.replicating-subscription")) { + shared_ptr result(new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); + boost::dynamic_pointer_cast<ReplicatingSubscription>(result)->init(); + return result; + } else { + return shared_ptr(new ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); + } +} + +std::string mask(const std::string& in) +{ + return std::string("$") + in + std::string("_internal"); +} + +class ReplicationStateInitialiser +{ + public: + ReplicationStateInitialiser(qpid::framing::SequenceSet& results, + const qpid::framing::SequenceNumber& start, + const qpid::framing::SequenceNumber& end); + void operator()(const QueuedMessage& m) { process(m); } + private: + qpid::framing::SequenceSet& results; + const qpid::framing::SequenceNumber start; + const qpid::framing::SequenceNumber end; + void process(const QueuedMessage&); +}; + +ReplicatingSubscription::ReplicatingSubscription(SemanticState* _parent, + const string& _name, + Queue::shared_ptr _queue, + bool ack, + bool _acquire, + bool _exclusive, + const string& _tag, + const string& _resumeId, + uint64_t _resumeTtl, + const framing::FieldTable& _arguments +) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments), + events(new Queue(mask(_name))), + consumer(new DelegatingConsumer(*this)) +{ + + if (_arguments.isSet("qpid.high_sequence_number")) { + qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number"); + qpid::framing::SequenceNumber lwm; + if (_arguments.isSet("qpid.low_sequence_number")) { + lwm = _arguments.getAsInt("qpid.low_sequence_number"); + } else { + lwm = hwm; + } + qpid::framing::SequenceNumber oldest; + if (_queue->getOldest(oldest)) { + if (oldest >= hwm) { + range.add(lwm, --oldest); + } else if (oldest >= lwm) { + ReplicationStateInitialiser initialiser(range, lwm, hwm); + _queue->eachMessage(initialiser); + } else { //i.e. have older message on master than is reported to exist on replica + QPID_LOG(warning, "Replica appears to be missing message on master"); + } + } else { + //local queue (i.e. master) is empty + range.add(lwm, _queue->getPosition()); + } + QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << " are " << range + << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << _queue->getPosition() << ")"); + //set position of 'cursor' + position = hwm; + } +} + +bool ReplicatingSubscription::deliver(QueuedMessage& m) +{ + return ConsumerImpl::deliver(m); +} + +void ReplicatingSubscription::init() +{ + getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); +} + +void ReplicatingSubscription::cancel() +{ + getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); +} + +ReplicatingSubscription::~ReplicatingSubscription() {} + +//called before we get notified of the message being available and +//under the message lock in the queue +void ReplicatingSubscription::enqueued(const QueuedMessage& m) +{ + QPID_LOG(debug, "Enqueued message at " << m.position); + //delay completion + m.payload->getIngressCompletion().startCompleter(); + QPID_LOG(debug, "Delayed " << m.payload.get()); +} + +class Buffer : public qpid::framing::Buffer +{ + public: + Buffer(size_t size) : qpid::framing::Buffer(new char[size], size) {} + ~Buffer() { delete[] getPointer(); } +}; + +void ReplicatingSubscription::generateDequeueEvent() +{ + Buffer buffer(range.encodedSize()); + range.encode(buffer); + range.clear(); + buffer.reset(); + + //generate event message + boost::intrusive_ptr<Message> event = new Message(); + AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody())); + content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize()); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + event->getFrames().append(method); + event->getFrames().append(header); + event->getFrames().append(content); + + DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true); + props->setRoutingKey("dequeue-event"); + + events->deliver(event); +} + +//called after the message has been removed from the deque and under +//the message lock in the queue +void ReplicatingSubscription::dequeued(const QueuedMessage& m) +{ + { + Mutex::ScopedLock l(lock); + range.add(m.position); + QPID_LOG(debug, "Updated dequeue event to include message at " << m.position << "; subscription is at " << position); + } + notify(); + if (m.position > position) { + m.payload->getIngressCompletion().finishCompleter(); + QPID_LOG(debug, "Completed " << m.payload.get() << " early due to dequeue"); + } +} + +bool ReplicatingSubscription::doDispatch() +{ + { + Mutex::ScopedLock l(lock); + if (!range.empty()) { + generateDequeueEvent(); + } + } + bool r1 = events->dispatch(consumer); + bool r2 = ConsumerImpl::doDispatch(); + return r1 || r2; +} + SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, const string& _name, Queue::shared_ptr _queue, @@ -332,7 +551,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { assertClusterSafe(); allocateCredit(msg.payload); - DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, credit.isWindowMode()); + DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, credit.isWindowMode(), 0, dynamic_cast<const ReplicatingSubscription*>(this)); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset parent->deliver(record, sync); @@ -340,7 +559,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) parent->record(record); } if (acquire && !ackExpected) { // auto acquire && auto accept - queue->dequeue(0 /*ctxt*/, msg); + msg.queue->dequeue(0, msg); record.setEnded(); } if (mgmtObject) { mgmtObject->inc_delivered(); } @@ -455,8 +674,10 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { msg->computeExpiration(getSession().getBroker().getExpiryPolicy()); std::string exchangeName = msg->getExchangeName(); - if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) - cacheExchange = session.getBroker().getExchanges().get(exchangeName); + if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) { + cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues()); + if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName); + } cacheExchange->setProperties(msg); /* verify the userid if specified: */ @@ -646,9 +867,14 @@ bool SemanticState::ConsumerImpl::haveCredit() } } +bool SemanticState::ConsumerImpl::doDispatch() +{ + return queue->dispatch(shared_from_this()); +} + void SemanticState::ConsumerImpl::flush() { - while(haveCredit() && queue->dispatch(shared_from_this())) + while(haveCredit() && doDispatch()) ; credit.cancel(); } @@ -710,7 +936,7 @@ void SemanticState::reject(DeliveryId first, DeliveryId last) bool SemanticState::ConsumerImpl::doOutput() { try { - return haveCredit() && queue->dispatch(shared_from_this()); + return haveCredit() && doDispatch(); } catch (const SessionException& e) { throw SessionOutputException(e, parent->session.getChannel()); } @@ -820,4 +1046,35 @@ void SemanticState::detached() } } +ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {} +ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {} +bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) +{ + return delegate.deliver(m); +} +void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } +bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); } +bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } +OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } + +ReplicationStateInitialiser::ReplicationStateInitialiser(qpid::framing::SequenceSet& r, + const qpid::framing::SequenceNumber& s, + const qpid::framing::SequenceNumber& e) + : results(r), start(s), end(e) +{ + results.add(start, end); +} + +void ReplicationStateInitialiser::process(const QueuedMessage& message) +{ + if (message.position < start) { + //replica does not have a message that should still be on the queue + QPID_LOG(warning, "Replica appears to be missing message at " << message.position); + } else if (message.position >= start && message.position <= end) { + //i.e. message is within the intial range and has not been dequeued, so remove it from the results + results.remove(message.position); + } //else message has not been seen by replica yet so can be ignored here + +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 26fd815424..ec4bcb756c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -30,6 +30,7 @@ #include "qpid/broker/DtxBuffer.h" #include "qpid/broker/DtxManager.h" #include "qpid/broker/NameGenerator.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/broker/TxBuffer.h" #include "qpid/framing/FrameHandler.h" @@ -74,7 +75,9 @@ class SemanticState : private boost::noncopyable { public boost::enable_shared_from_this<ConsumerImpl>, public management::Manageable { + protected: mutable qpid::sys::Mutex lock; + private: SemanticState* const parent; const boost::shared_ptr<Queue> queue; const bool ackExpected; @@ -95,17 +98,20 @@ class SemanticState : private boost::noncopyable { void allocateCredit(boost::intrusive_ptr<Message>& msg); bool haveCredit(); + protected: + virtual bool doDispatch(); + size_t unacked() { return parent->unacked.size(); } + public: typedef boost::shared_ptr<ConsumerImpl> shared_ptr; ConsumerImpl(SemanticState* parent, const std::string& name, boost::shared_ptr<Queue> queue, bool ack, bool acquire, bool exclusive, - const std::string& tag, const std::string& resumeId, - uint64_t resumeTtl, const framing::FieldTable& arguments); - ~ConsumerImpl(); + const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); + virtual ~ConsumerImpl(); OwnershipToken* getSession(); - bool deliver(QueuedMessage& msg); + virtual bool deliver(QueuedMessage& msg); bool filter(boost::intrusive_ptr<Message> msg); bool accept(boost::intrusive_ptr<Message> msg); void cancel() {} @@ -142,9 +148,16 @@ class SemanticState : private boost::noncopyable { SemanticState& getParent() { return *parent; } const SemanticState& getParent() const { return *parent; } + // Manageable entry points management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); + + static shared_ptr create(SemanticState* parent, + const std::string& name, boost::shared_ptr<Queue> queue, + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); + }; typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 715376fd8d..2d05755fc7 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -275,7 +275,7 @@ void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* m populate(*message, *command); } const MessageTransferBody* transfer = command->as<MessageTransferBody>(); - if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) { + if (transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) { sys::Mutex::ScopedLock l(lock); acceptTracker.delivered(transfer->getDestination(), command->getId()); } diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 0b1b4cc59e..9a76bb28e1 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -711,7 +711,7 @@ namespace { const std::string& expectedGroup, const int expectedId ) { - queue->dispatch(c); + BOOST_CHECK(queue->dispatch(c)); results.push_back(c->last); std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); @@ -1026,6 +1026,11 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 6u); + /** + * TODO: Fix or replace the following test which incorrectly requeues a + * message that was never on the queue in the first place. This relied on + * internal details not part of the queue abstraction. + // check requeue 1 intrusive_ptr<Message> msg4 = create_message("e", "C"); intrusive_ptr<Message> msg5 = create_message("e", "D"); @@ -1047,6 +1052,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ queue2->clearLastNodeFailure(); queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 8u); + */ } QPID_AUTO_TEST_CASE(testLastNodeRecoverAndFail){ diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py index db5ec03df2..935db54458 100644 --- a/qpid/python/qpid/tests/messaging/endpoints.py +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -886,9 +886,11 @@ class ReceiverTests(Base): rc = self.ssn.receiver('test-receiver-queue; {mode: consume}') self.drain(rb, expected=msgs) self.drain(rc, expected=msgs) - rb2 = self.ssn.receiver(rb.source) - self.assertEmpty(rb2) + rc2 = self.ssn.receiver(rc.source) + self.assertEmpty(rc2) self.drain(self.rcv, expected=[]) + rb2 = self.ssn.receiver(rb.source) + self.drain(rb2, expected=msgs) # XXX: need testUnsettled() diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/message.py b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py index 204b6ebd23..c6095a0579 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/message.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py @@ -1033,8 +1033,7 @@ class MessageTests(TestBase010): #release all even messages session.message_release(RangedSet(msg.id)) - #browse: - session.message_subscribe(queue="q", destination="b", acquire_mode=1) + session.message_subscribe(queue="q", destination="b", acquire_mode=0) b = session.incoming("b") b.start() for i in [2, 4, 6, 8, 10]: diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py index 99d11151e8..ef6734f136 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py @@ -202,6 +202,10 @@ class MultiConsumerMsgGroupTests(Base): ## Queue = A-0, B-1, A-2, b-3, C-4 ## Owners= ^C1, ---, +C1, ---, --- + m2 = b1.fetch(0); + assert m2.properties['THE-GROUP'] == 'A' + assert m2.content['index'] == 0 + m2 = b1.fetch(0) assert m2.properties['THE-GROUP'] == 'B' assert m2.content['index'] == 1 @@ -713,6 +717,7 @@ class MultiConsumerMsgGroupTests(Base): assert rc.status == 0 queue.update() queue.msgDepth == 4 # the pending acquired A still counts! + s1.acknowledge() # verify all other A's removed.... s2 = self.setup_session() @@ -782,7 +787,7 @@ class MultiConsumerMsgGroupTests(Base): except Empty: pass assert count == 3 # non-A's - assert a_count == 1 # and one is an A + assert a_count == 2 # pending acquired message included in browse results s1.acknowledge() # ack the consumed A-0 self.qmf_session.delBroker(self.qmf_broker) @@ -829,7 +834,7 @@ class MultiConsumerMsgGroupTests(Base): # verify all other A's removed from msg-group-q s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) + b1 = s2.receiver("msg-group-q", options={"capacity":0}) count = 0 try: while True: @@ -963,7 +968,7 @@ class MultiConsumerMsgGroupTests(Base): # verify all other A's removed.... s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) + b1 = s2.receiver("msg-group-q", options={"capacity":0}) count = 0 try: while True: |