diff options
author | Alan Conway <aconway@apache.org> | 2012-01-19 23:08:30 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-01-19 23:08:30 +0000 |
commit | 6df9229322558736ccb92af4f61f62a5798fd9df (patch) | |
tree | 6f4afd68f8e7f5530a666c42275fa58106c38868 | |
parent | 6fd3663ff0c697584bbba5bcba5d61c3fefa9115 (diff) | |
download | qpid-python-6df9229322558736ccb92af4f61f62a5798fd9df.tar.gz |
QPID-3603: Handle backup crash/shutdown.
If a backup crashes or shuts down any messages that have
been delayed completion for that backup must be marked
complete to avoid the primary hanging.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233681 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Consumer.h | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.cpp | 57 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.h | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 98 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 14 | ||||
-rw-r--r-- | qpid/cpp/src/tests/DeliveryRecordTest.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 1 |
11 files changed, 139 insertions, 87 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index d13d8075a9..3330e3918f 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -34,10 +34,12 @@ class QueueListeners; /** * Base class for consumers which represent a subscription to a queue. */ -class Consumer { +class Consumer +{ const bool acquires; - // inListeners allows QueueListeners to efficiently track if this instance is registered - // for notifications without having to search its containers + // inListeners allows QueueListeners to efficiently track if this + // instance is registered for notifications without having to + // search its containers bool inListeners; // the name is generated by broker and is unique within broker scope. It is not // provided or known by the remote Consumer. @@ -61,7 +63,12 @@ class Consumer { virtual bool accept(boost::intrusive_ptr<Message>) { return true; } virtual OwnershipToken* getSession() = 0; virtual void cancel() = 0; - virtual bool isDelayedCompletion() const { return false; } + + /** Called when the peer has acknowledged receipt of the message. + * Not to be confused with accept() above, which is asking if + * this consumer will consume/browse the message. + */ + virtual void acknowledged(const QueuedMessage&) = 0; protected: framing::SequenceNumber position; diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index f17795743b..0181a88840 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,7 @@ #include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/SemanticState.h" +#include "qpid/broker/Consumer.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/Queue.h" #include "qpid/log/Statement.h" @@ -31,23 +32,25 @@ using namespace qpid; using namespace qpid::broker; using std::string; -DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, - const Queue::shared_ptr& _queue, +DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, + const Queue::shared_ptr& _queue, const std::string& _tag, + const boost::shared_ptr<Consumer>& _consumer, bool _acquired, - bool accepted, + bool accepted, bool _windowing, - uint32_t _credit, bool _isDelayedCompletion) : msg(_msg), - queue(_queue), - tag(_tag), - acquired(_acquired), - acceptExpected(!accepted), - cancelled(false), - completed(false), - ended(accepted && acquired), - windowing(_windowing), - credit(msg.payload ? msg.payload->getRequiredCredit() : _credit), - isDelayedCompletion(_isDelayedCompletion) + uint32_t _credit): + msg(_msg), + queue(_queue), + tag(_tag), + consumer(_consumer), + acquired(_acquired), + acceptExpected(!accepted), + cancelled(false), + completed(false), + ended(accepted && acquired), + windowing(_windowing), + credit(msg.payload ? msg.payload->getRequiredCredit() : _credit) {} bool DeliveryRecord::setEnded() @@ -95,7 +98,7 @@ void DeliveryRecord::requeue() const } } -void DeliveryRecord::release(bool setRedelivered) +void DeliveryRecord::release(bool setRedelivered) { if (acquired && !ended) { if (setRedelivered) msg.payload->redeliver(); @@ -108,19 +111,13 @@ void DeliveryRecord::release(bool setRedelivered) } void DeliveryRecord::complete() { - completed = true; + completed = true; } bool DeliveryRecord::accept(TransactionContext* ctxt) { if (!ended) { - if (acquired) { - queue->dequeue(ctxt, msg); - } else if (isDelayedCompletion) { - // FIXME aconway 2011-12-05: This should be done in HA code. - msg.payload->getIngressCompletion().finishCompleter(); - QPID_LOG(debug, "Completed " << msg.queue->getName() - << "[" << msg.position << "]"); - } + consumer->acknowledged(getMessage()); + if (acquired) queue->dequeue(ctxt, msg); setEnded(); QPID_LOG(debug, "Accepted " << id); } @@ -137,8 +134,8 @@ void DeliveryRecord::committed() const{ queue->dequeueCommitted(msg); } -void DeliveryRecord::reject() -{ +void DeliveryRecord::reject() +{ if (acquired && !ended) { Exchange::shared_ptr alternate = queue->getAlternateExchange(); if (alternate) { @@ -173,7 +170,7 @@ void DeliveryRecord::acquire(DeliveryIds& results) { } } -void DeliveryRecord::cancel(const std::string& cancelledTag) +void DeliveryRecord::cancel(const std::string& cancelledTag) { if (tag == cancelledTag) cancelled = true; @@ -192,7 +189,7 @@ AckRange DeliveryRecord::findRange(DeliveryRecords& records, DeliveryId first, D namespace qpid { namespace broker { -std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) +std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) { out << "{" << "id=" << r.id.getValue(); out << ", tag=" << r.tag << "}"; diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h index ea33ed5461..21074d4274 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -38,6 +38,7 @@ namespace broker { class TransactionContext; class SemanticState; struct AckRange; +class Consumer; /** * Record of a delivery for which an ack is outstanding. @@ -47,6 +48,7 @@ class DeliveryRecord QueuedMessage msg; mutable boost::shared_ptr<Queue> queue; std::string tag; // name of consumer + boost::shared_ptr<Consumer> consumer; DeliveryId id; bool acquired : 1; bool acceptExpected : 1; @@ -63,21 +65,20 @@ class DeliveryRecord * after that). */ uint32_t credit; - bool isDelayedCompletion; public: QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg, const boost::shared_ptr<Queue>& queue, const std::string& tag, + const boost::shared_ptr<Consumer>& consumer, bool acquired, bool accepted, bool windowing, - uint32_t credit=0, // Only used if msg is empty. - bool isDelayedCompletion=false + uint32_t credit=0 // Only used if msg is empty. ); - + bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); } - + void dequeue(TransactionContext* ctxt = 0) const; void requeue() const; void release(bool setRedelivered); @@ -97,7 +98,7 @@ class DeliveryRecord bool isAccepted() const { return !acceptExpected; } bool isEnded() const { return ended; } bool isWindowing() const { return windowing; } - + uint32_t getCredit() const; const std::string& getTag() const { return tag; } @@ -134,7 +135,7 @@ typedef DeliveryRecord::DeliveryRecords DeliveryRecords; struct AckRange { DeliveryRecords::iterator start; - DeliveryRecords::iterator end; + DeliveryRecords::iterator end; AckRange(DeliveryRecords::iterator _start, DeliveryRecords::iterator _end) : start(_start), end(_end) {} }; diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 775c4cd862..e7d2259c80 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -341,7 +341,8 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { assertClusterSafe(); allocateCredit(msg.payload); - DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, credit.isWindowMode(), 0, isDelayedCompletion()); + DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), + shared_from_this(), acquire, !ackExpected, credit.isWindowMode(), 0); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset parent->deliver(record, sync); @@ -364,7 +365,7 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) { assertClusterSafe(); - // FIXME aconway 2009-06-08: if we have byte & message credit but + // TODO aconway 2009-06-08: if we have byte & message credit but // checkCredit fails because the message is to big, we should // remain on queue's listener list for possible smaller messages // in future. diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 2be78e7233..5a83fd0fb3 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -149,14 +149,11 @@ class SemanticState : private boost::noncopyable { SemanticState& getParent() { return *parent; } const SemanticState& getParent() const { return *parent; } - // Manageable entry points + void acknowledged(const broker::QueuedMessage&) {} + + // manageable entry points management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - - /** This consumer wants delayed completion. - * Overridden by ConsumerImpl subclasses. - */ - virtual bool isDelayedCompletion() const { return false; } }; typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index c16ab72876..00a343d71e 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -549,7 +549,7 @@ void Connection::deliveryRecord(const string& qname, } else { // Message at original position in original queue queue->find(position, m); } - // FIXME aconway 2011-08-19: removed: + // NOTE: removed: // if (!m.payload) // throw Exception(QPID_MSG("deliveryRecord no update message")); // @@ -561,7 +561,8 @@ void Connection::deliveryRecord(const string& qname, // } - broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit); + broker::DeliveryRecord dr(m, queue, tag, semanticState().find(tag), + acquired, accepted, windowing, credit); dr.setId(id); if (cancelled) dr.cancel(dr.getTag()); if (completed) dr.complete(); diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 36859909df..9b2598b3bf 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -70,6 +70,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) boost::shared_ptr<ReplicatingSubscription::Factory>( new ReplicatingSubscription::Factory())); } + HaBroker::~HaBroker() {} Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) { diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 6c33002b5c..0070118102 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -132,20 +132,68 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m) { ReplicatingSubscription::~ReplicatingSubscription() {} + +// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg + +// Mark a message completed. May be called by acknowledge or dequeued +void ReplicatingSubscription::complete( + const QueuedMessage& qm, const sys::Mutex::ScopedLock&) +{ + // Handle completions for the subscribed queue, not the internal event queue. + if (qm.queue && qm.queue == getQueue().get()) { + QPID_LOG(trace, logPrefix << "Completed message " << qm.position); + Delayed::iterator i= delayed.find(qm.position); + // The same message can be completed twice, by acknowledged and + // dequeued, remove it from the set so it only gets completed + // once. + if (i != delayed.end()) { + assert(i->second.payload == qm.payload); + qm.payload->getIngressCompletion().finishCompleter(); + delayed.erase(i); + } + } +} + +// Called before we get notified of the message being available and +// under the message lock in the queue. Called in arbitrary connection thread. +void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { + sys::Mutex::ScopedLock l(lock); + // Delay completion + QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position); + qm.payload->getIngressCompletion().startCompleter(); + assert(delayed.find(qm.position) == delayed.end()); + delayed[qm.position] = qm; +} + + +// Function to complete a delayed message, called by cancel() +void ReplicatingSubscription::cancelComplete( + const Delayed::value_type& v, const sys::Mutex::ScopedLock&) +{ + QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position); + v.second.payload->getIngressCompletion().finishCompleter(); +} + // Called in the subscription's connection thread. void ReplicatingSubscription::cancel() { - QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName()); getQueue()->removeObserver( boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); + { + sys::Mutex::ScopedLock l(lock); + QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName()); + for_each(delayed.begin(), delayed.end(), + boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l))); + delayed.clear(); + } ConsumerImpl::cancel(); } -// Called before we get notified of the message being available and -// under the message lock in the queue. Called in arbitrary connection thread. -void ReplicatingSubscription::enqueued(const QueuedMessage& m) { - //delay completion - m.payload->getIngressCompletion().startCompleter(); +// Called on primary in the backups IO thread. +void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) { + sys::Mutex::ScopedLock l(lock); + // Finish completion of message, it has been acknowledged by the backup. + complete(msg, l); } // Called with lock held. Called in subscription's connection thread. @@ -160,6 +208,21 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); } +// Called after the message has been removed from the deque and under +// the messageLock in the queue. Called in arbitrary connection threads. +void ReplicatingSubscription::dequeued(const QueuedMessage& qm) +{ + { + sys::Mutex::ScopedLock l(lock); + QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position); + dequeues.add(qm.position); + // If we have not yet sent this message to the backup, then + // complete it now as it will never be accepted. + if (qm.position > position) complete(qm, l); + } + notify(); // Ensure a call to doDispatch +} + // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendPositionEvent( SequenceNumber position, const sys::Mutex::ScopedLock&l ) @@ -205,28 +268,6 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& events->dispatch(consumer); } -// Called after the message has been removed from the deque and under -// the messageLock in the queue. Called in arbitrary connection threads. -void ReplicatingSubscription::dequeued(const QueuedMessage& m) -{ - { - sys::Mutex::ScopedLock l(lock); - dequeues.add(m.position); - // If we have not yet sent this message to the backup, then - // complete it now as it will never be accepted. - - // FIXME aconway 2012-01-05: suspect use of position in - // foreign connection thread. Race with deliver() which is - // not under the message lock? - if (m.position > position) { - m.payload->getIngressCompletion().finishCompleter(); - QPID_LOG(trace, logPrefix << "Dequeued and completed message " << m.position << " early"); - } - else - QPID_LOG(trace, logPrefix << "Dequeued message " << m.position); - } - notify(); // Ensure a call to doDispatch -} // Called in subscription's connection thread. bool ReplicatingSubscription::doDispatch() @@ -244,7 +285,6 @@ bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { re void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); } bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } -void ReplicatingSubscription::DelegatingConsumer::cancel() {} OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 147c40ee6d..8af273e4d8 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -85,17 +85,21 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, // Consumer overrides. void cancel(); - bool isDelayedCompletion() const { return true; } + void acknowledged(const broker::QueuedMessage&); protected: bool doDispatch(); private: + typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed; std::string logPrefix; boost::shared_ptr<broker::Queue> events; boost::shared_ptr<broker::Consumer> consumer; - qpid::framing::SequenceSet dequeues; + Delayed delayed; + framing::SequenceSet dequeues; framing::SequenceNumber backupPosition; + void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&); + void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&); void sendDequeueEvent(const sys::Mutex::ScopedLock&); void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&); void sendEvent(const std::string& key, framing::Buffer&, @@ -110,9 +114,11 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, void notify(); bool filter(boost::intrusive_ptr<broker::Message>); bool accept(boost::intrusive_ptr<broker::Message>); - void cancel(); - bool isDelayedCompletion() const { return false; } + void cancel() {} + void acknowledged(const broker::QueuedMessage&) {} + broker::OwnershipToken* getSession(); + private: ReplicatingSubscription& delegate; }; diff --git a/qpid/cpp/src/tests/DeliveryRecordTest.cpp b/qpid/cpp/src/tests/DeliveryRecordTest.cpp index f7013014ff..fb7bd2f727 100644 --- a/qpid/cpp/src/tests/DeliveryRecordTest.cpp +++ b/qpid/cpp/src/tests/DeliveryRecordTest.cpp @@ -49,7 +49,7 @@ QPID_AUTO_TEST_CASE(testSort) list<DeliveryRecord> records; for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) { - DeliveryRecord r(QueuedMessage(0), Queue::shared_ptr(), "tag", false, false, false); + DeliveryRecord r(QueuedMessage(0), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false); r.setId(*i); records.push_back(r); } diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 9a76bb28e1..bb4f7b9f4b 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -67,6 +67,7 @@ public: }; void notify() {} void cancel() {} + void acknowledged(const QueuedMessage&) {} OwnershipToken* getSession() { return 0; } }; |