summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueGuard.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/QueueGuard.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueGuard.cpp85
1 files changed, 48 insertions, 37 deletions
diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp
index a30ab1f73c..77e1f81a38 100644
--- a/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/cpp/src/qpid/ha/QueueGuard.cpp
@@ -39,10 +39,10 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
{
public:
QueueObserver(QueueGuard& g) : guard(g) {}
- void enqueued(const broker::QueuedMessage& qm) { guard.enqueued(qm); }
- void dequeued(const broker::QueuedMessage& qm) { guard.dequeued(qm); }
- void acquired(const broker::QueuedMessage&) {}
- void requeued(const broker::QueuedMessage&) {}
+ void enqueued(const broker::Message& m) { guard.enqueued(m); }
+ void dequeued(const broker::Message& m) { guard.dequeued(m); }
+ void acquired(const broker::Message&) {}
+ void requeued(const broker::Message&) {}
private:
QueueGuard& guard;
};
@@ -64,39 +64,47 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
QueueGuard::~QueueGuard() { cancel(); }
// NOTE: Called with message lock held.
-void QueueGuard::enqueued(const QueuedMessage& qm) {
- assert(qm.queue == &queue);
+void QueueGuard::enqueued(const Message& m) {
// Delay completion
- QPID_LOG(trace, logPrefix << "Delayed completion of " << qm);
- qm.payload->getIngressCompletion().startCompleter();
+ QPID_LOG(trace, logPrefix << "Delayed completion of " << m);
+ m.getIngressCompletion()->startCompleter();
{
Mutex::ScopedLock l(lock);
- assert(!delayed.contains(qm.position));
- delayed += qm.position;
+ if (!delayed.insert(Delayed::value_type(m.getSequence(), m.getIngressCompletion())).second) {
+ QPID_LOG(critical, logPrefix << "Second enqueue for message with sequence " << m.getSequence());
+ assert(false);
+ }
}
}
// NOTE: Called with message lock held.
-void QueueGuard::dequeued(const QueuedMessage& qm) {
- assert(qm.queue == &queue);
- QPID_LOG(trace, logPrefix << "Dequeued " << qm);
+void QueueGuard::dequeued(const Message& m) {
+ QPID_LOG(trace, logPrefix << "Dequeued " << m);
ReplicatingSubscription* rs=0;
{
Mutex::ScopedLock l(lock);
rs = subscription;
}
- if (rs) rs->dequeued(qm);
- complete(qm);
+ if (rs) rs->dequeued(m);
+ complete(m.getSequence());
+}
+
+void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) {
+ for (Delayed::iterator i = begin; i != end; ++i) {
+ QPID_LOG(trace, logPrefix << "Completed " << i->first);
+ i->second->finishCompleter();
+ }
}
void QueueGuard::cancel() {
queue.removeObserver(observer);
+ Delayed removed;
{
Mutex::ScopedLock l(lock);
if (delayed.empty()) return; // No need if no delayed messages.
+ delayed.swap(removed);
}
- // FIXME aconway 2012-06-15: optimize, only messages in delayed set.
- queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
+ completeRange(removed.begin(), removed.end());
}
void QueueGuard::attach(ReplicatingSubscription& rs) {
@@ -104,36 +112,39 @@ void QueueGuard::attach(ReplicatingSubscription& rs) {
subscription = &rs;
}
-namespace {
-void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMessage& qm) {
- if (qm.position <= position) guard->complete(qm);
-}
-}
-
bool QueueGuard::subscriptionStart(SequenceNumber position) {
- // Complete any messages before or at the ReplicatingSubscription start position.
- // Those messages are already on the backup.
- if (!delayed.empty() && delayed.front() <= position) {
- // FIXME aconway 2012-06-15: queue iteration, only messages in delayed
- queue.eachMessage(boost::bind(&completeBefore, this, position, _1));
+ Delayed removed;
+ {
+ Mutex::ScopedLock l(lock);
+ // Complete any messages before or at the ReplicatingSubscription start position.
+ // Those messages are already on the backup.
+ for (Delayed::iterator i = delayed.begin(); i != delayed.end() && i->first <= position;) {
+ removed.insert(*i);
+ delayed.erase(i++);
+ }
}
+ completeRange(removed.begin(), removed.end());
return position >= range.back;
}
-void QueueGuard::complete(const QueuedMessage& qm) {
- assert(qm.queue == &queue);
+void QueueGuard::complete(SequenceNumber sequence) {
+ boost::intrusive_ptr<broker::AsyncCompletion> m;
{
Mutex::ScopedLock l(lock);
// The same message can be completed twice, by
// ReplicatingSubscription::acknowledged and dequeued. Remove it
- // from the set so we only call finishCompleter() once
- if (delayed.contains(qm.position))
- delayed -= qm.position;
- else
- return;
+ // from the map so we only call finishCompleter() once
+ Delayed::iterator i = delayed.find(sequence);
+ if (i != delayed.end()) {
+ m = i->second;
+ delayed.erase(i);
+ }
+
+ }
+ if (m) {
+ QPID_LOG(trace, logPrefix << "Completed " << sequence);
+ m->finishCompleter();
}
- QPID_LOG(trace, logPrefix << "Completed " << qm);
- qm.payload->getIngressCompletion().finishCompleter();
}
}} // namespaces qpid::ha