summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/QueueGuard.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/QueueGuard.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp24
1 files changed, 18 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index b391a5257b..6293f640e1 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -24,6 +24,7 @@
#include "qpid/broker/QueuedMessage.h"
#include "qpid/broker/QueueObserver.h"
#include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
#include <sstream>
namespace qpid {
@@ -51,16 +52,13 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
: queue(q), subscription(0)
{
- // NOTE: There is no activity on the queue while QueueGuard constructor is
- // running It is called either from Primary before client connections are
- // allowed or from ConfigurationObserver::queueCreate before the queue is
- // visible.
std::ostringstream os;
os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
queue.addObserver(observer);
- firstSafe = queue.getPosition(); // FIXME aconway 2012-06-13: fencepost error
+ // Set after addObserver to ensure we dont miss an enqueue.
+ firstSafe = queue.getPosition() + 1; // Next message will be protected by the guard.
}
QueueGuard::~QueueGuard() { cancel(); }
@@ -101,9 +99,23 @@ void QueueGuard::cancel() {
queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
}
+namespace {
+void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMessage& qm) {
+ if (qm.position <= position) guard->complete(qm);
+}
+}
+
void QueueGuard::attach(ReplicatingSubscription& rs) {
- Mutex::ScopedLock l(lock);
+ // NOTE: attach is called before the ReplicatingSubscription is active so
+ // it's position is not changing.
assert(firstSafe >= rs.getPosition());
+ // Complete any messages before or at the ReplicatingSubscription position.
+ if (!delayed.empty() && delayed.front() <= rs.getPosition()) {
+ // FIXME aconway 2012-06-15: queue iteration, only messages in delayed
+ queue.eachMessage(boost::bind(&completeBefore, this, rs.getPosition(), _1));
+ }
+ Mutex::ScopedLock l(lock);
+ // FIXME aconway 2012-06-15: complete messages before rs.getPosition
subscription = &rs;
}