summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueGuard.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/QueueGuard.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueGuard.cpp94
1 files changed, 39 insertions, 55 deletions
diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp
index 77e1f81a38..d06d88ca29 100644
--- a/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/cpp/src/qpid/ha/QueueGuard.cpp
@@ -50,10 +50,10 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
- : queue(q), subscription(0)
+ : cancelled(false), queue(q), subscription(0)
{
std::ostringstream os;
- os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
+ os << "Primary guard " << queue.getName() << "@" << info << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
queue.addObserver(observer);
@@ -66,45 +66,31 @@ QueueGuard::~QueueGuard() { cancel(); }
// NOTE: Called with message lock held.
void QueueGuard::enqueued(const Message& m) {
// Delay completion
- QPID_LOG(trace, logPrefix << "Delayed completion of " << m);
+ QPID_LOG(trace, logPrefix << "Delayed completion of " << m.getSequence());
+ Mutex::ScopedLock l(lock);
+ if (cancelled) return; // Don't record enqueues after we are cancelled.
+ assert(delayed.find(m.getSequence()) == delayed.end());
+ delayed[m.getSequence()] = m.getIngressCompletion();
m.getIngressCompletion()->startCompleter();
- {
- Mutex::ScopedLock l(lock);
- if (!delayed.insert(Delayed::value_type(m.getSequence(), m.getIngressCompletion())).second) {
- QPID_LOG(critical, logPrefix << "Second enqueue for message with sequence " << m.getSequence());
- assert(false);
- }
- }
}
// NOTE: Called with message lock held.
void QueueGuard::dequeued(const Message& m) {
QPID_LOG(trace, logPrefix << "Dequeued " << m);
- ReplicatingSubscription* rs=0;
- {
- Mutex::ScopedLock l(lock);
- rs = subscription;
- }
- if (rs) rs->dequeued(m);
- complete(m.getSequence());
-}
-
-void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) {
- for (Delayed::iterator i = begin; i != end; ++i) {
- QPID_LOG(trace, logPrefix << "Completed " << i->first);
- i->second->finishCompleter();
- }
+ Mutex::ScopedLock l(lock);
+ if (subscription) subscription->dequeued(m);
+ complete(m.getSequence(), l);
}
void QueueGuard::cancel() {
queue.removeObserver(observer);
- Delayed removed;
- {
- Mutex::ScopedLock l(lock);
- if (delayed.empty()) return; // No need if no delayed messages.
- delayed.swap(removed);
+ Mutex::ScopedLock l(lock);
+ if (cancelled) return;
+ cancelled = true;
+ for (Delayed::iterator i = delayed.begin(); i != delayed.end();) {
+ complete(i, l);
+ delayed.erase(i++);
}
- completeRange(removed.begin(), removed.end());
}
void QueueGuard::attach(ReplicatingSubscription& rs) {
@@ -113,38 +99,36 @@ void QueueGuard::attach(ReplicatingSubscription& rs) {
}
bool QueueGuard::subscriptionStart(SequenceNumber position) {
- Delayed removed;
- {
- Mutex::ScopedLock l(lock);
- // Complete any messages before or at the ReplicatingSubscription start position.
- // Those messages are already on the backup.
- for (Delayed::iterator i = delayed.begin(); i != delayed.end() && i->first <= position;) {
- removed.insert(*i);
- delayed.erase(i++);
- }
+ // Complete any messages before or at the ReplicatingSubscription start position.
+ // Those messages are already on the backup.
+ Mutex::ScopedLock l(lock);
+ Delayed::iterator i = delayed.begin();
+ while(i != delayed.end() && i->first <= position) {
+ complete(i, l);
+ delayed.erase(i++);
}
- completeRange(removed.begin(), removed.end());
return position >= range.back;
}
void QueueGuard::complete(SequenceNumber sequence) {
- boost::intrusive_ptr<broker::AsyncCompletion> m;
- {
- Mutex::ScopedLock l(lock);
- // The same message can be completed twice, by
- // ReplicatingSubscription::acknowledged and dequeued. Remove it
- // from the map so we only call finishCompleter() once
- Delayed::iterator i = delayed.find(sequence);
- if (i != delayed.end()) {
- m = i->second;
- delayed.erase(i);
- }
+ Mutex::ScopedLock l(lock);
+ complete(sequence, l);
+}
+void QueueGuard::complete(SequenceNumber sequence, Mutex::ScopedLock& l) {
+ // The same message can be completed twice, by
+ // ReplicatingSubscription::acknowledged and dequeued. Remove it
+ // from the map so we only call finishCompleter() once
+ Delayed::iterator i = delayed.find(sequence);
+ if (i != delayed.end()) {
+ complete(i, l);
+ delayed.erase(i);
}
- if (m) {
- QPID_LOG(trace, logPrefix << "Completed " << sequence);
- m->finishCompleter();
- }
+}
+
+void QueueGuard::complete(Delayed::iterator i, Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Completed " << i->first);
+ i->second->finishCompleter();
}
}} // namespaces qpid::ha