diff options
Diffstat (limited to 'cpp/src/qpid/ha/QueueGuard.cpp')
-rw-r--r-- | cpp/src/qpid/ha/QueueGuard.cpp | 85 |
1 files changed, 48 insertions, 37 deletions
diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp index a30ab1f73c..77e1f81a38 100644 --- a/cpp/src/qpid/ha/QueueGuard.cpp +++ b/cpp/src/qpid/ha/QueueGuard.cpp @@ -39,10 +39,10 @@ class QueueGuard::QueueObserver : public broker::QueueObserver { public: QueueObserver(QueueGuard& g) : guard(g) {} - void enqueued(const broker::QueuedMessage& qm) { guard.enqueued(qm); } - void dequeued(const broker::QueuedMessage& qm) { guard.dequeued(qm); } - void acquired(const broker::QueuedMessage&) {} - void requeued(const broker::QueuedMessage&) {} + void enqueued(const broker::Message& m) { guard.enqueued(m); } + void dequeued(const broker::Message& m) { guard.dequeued(m); } + void acquired(const broker::Message&) {} + void requeued(const broker::Message&) {} private: QueueGuard& guard; }; @@ -64,39 +64,47 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) QueueGuard::~QueueGuard() { cancel(); } // NOTE: Called with message lock held. -void QueueGuard::enqueued(const QueuedMessage& qm) { - assert(qm.queue == &queue); +void QueueGuard::enqueued(const Message& m) { // Delay completion - QPID_LOG(trace, logPrefix << "Delayed completion of " << qm); - qm.payload->getIngressCompletion().startCompleter(); + QPID_LOG(trace, logPrefix << "Delayed completion of " << m); + m.getIngressCompletion()->startCompleter(); { Mutex::ScopedLock l(lock); - assert(!delayed.contains(qm.position)); - delayed += qm.position; + if (!delayed.insert(Delayed::value_type(m.getSequence(), m.getIngressCompletion())).second) { + QPID_LOG(critical, logPrefix << "Second enqueue for message with sequence " << m.getSequence()); + assert(false); + } } } // NOTE: Called with message lock held. -void QueueGuard::dequeued(const QueuedMessage& qm) { - assert(qm.queue == &queue); - QPID_LOG(trace, logPrefix << "Dequeued " << qm); +void QueueGuard::dequeued(const Message& m) { + QPID_LOG(trace, logPrefix << "Dequeued " << m); ReplicatingSubscription* rs=0; { Mutex::ScopedLock l(lock); rs = subscription; } - if (rs) rs->dequeued(qm); - complete(qm); + if (rs) rs->dequeued(m); + complete(m.getSequence()); +} + +void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) { + for (Delayed::iterator i = begin; i != end; ++i) { + QPID_LOG(trace, logPrefix << "Completed " << i->first); + i->second->finishCompleter(); + } } void QueueGuard::cancel() { queue.removeObserver(observer); + Delayed removed; { Mutex::ScopedLock l(lock); if (delayed.empty()) return; // No need if no delayed messages. + delayed.swap(removed); } - // FIXME aconway 2012-06-15: optimize, only messages in delayed set. - queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1)); + completeRange(removed.begin(), removed.end()); } void QueueGuard::attach(ReplicatingSubscription& rs) { @@ -104,36 +112,39 @@ void QueueGuard::attach(ReplicatingSubscription& rs) { subscription = &rs; } -namespace { -void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMessage& qm) { - if (qm.position <= position) guard->complete(qm); -} -} - bool QueueGuard::subscriptionStart(SequenceNumber position) { - // Complete any messages before or at the ReplicatingSubscription start position. - // Those messages are already on the backup. - if (!delayed.empty() && delayed.front() <= position) { - // FIXME aconway 2012-06-15: queue iteration, only messages in delayed - queue.eachMessage(boost::bind(&completeBefore, this, position, _1)); + Delayed removed; + { + Mutex::ScopedLock l(lock); + // Complete any messages before or at the ReplicatingSubscription start position. + // Those messages are already on the backup. + for (Delayed::iterator i = delayed.begin(); i != delayed.end() && i->first <= position;) { + removed.insert(*i); + delayed.erase(i++); + } } + completeRange(removed.begin(), removed.end()); return position >= range.back; } -void QueueGuard::complete(const QueuedMessage& qm) { - assert(qm.queue == &queue); +void QueueGuard::complete(SequenceNumber sequence) { + boost::intrusive_ptr<broker::AsyncCompletion> m; { Mutex::ScopedLock l(lock); // The same message can be completed twice, by // ReplicatingSubscription::acknowledged and dequeued. Remove it - // from the set so we only call finishCompleter() once - if (delayed.contains(qm.position)) - delayed -= qm.position; - else - return; + // from the map so we only call finishCompleter() once + Delayed::iterator i = delayed.find(sequence); + if (i != delayed.end()) { + m = i->second; + delayed.erase(i); + } + + } + if (m) { + QPID_LOG(trace, logPrefix << "Completed " << sequence); + m->finishCompleter(); } - QPID_LOG(trace, logPrefix << "Completed " << qm); - qm.payload->getIngressCompletion().finishCompleter(); } }} // namespaces qpid::ha |