diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/QueueGuard.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.cpp | 24 |
1 files changed, 18 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index b391a5257b..6293f640e1 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -24,6 +24,7 @@ #include "qpid/broker/QueuedMessage.h" #include "qpid/broker/QueueObserver.h" #include "qpid/log/Statement.h" +#include <boost/bind.hpp> #include <sstream> namespace qpid { @@ -51,16 +52,13 @@ class QueueGuard::QueueObserver : public broker::QueueObserver QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) : queue(q), subscription(0) { - // NOTE: There is no activity on the queue while QueueGuard constructor is - // running It is called either from Primary before client connections are - // allowed or from ConfigurationObserver::queueCreate before the queue is - // visible. std::ostringstream os; os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": "; logPrefix = os.str(); observer.reset(new QueueObserver(*this)); queue.addObserver(observer); - firstSafe = queue.getPosition(); // FIXME aconway 2012-06-13: fencepost error + // Set after addObserver to ensure we dont miss an enqueue. + firstSafe = queue.getPosition() + 1; // Next message will be protected by the guard. } QueueGuard::~QueueGuard() { cancel(); } @@ -101,9 +99,23 @@ void QueueGuard::cancel() { queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1)); } +namespace { +void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMessage& qm) { + if (qm.position <= position) guard->complete(qm); +} +} + void QueueGuard::attach(ReplicatingSubscription& rs) { - Mutex::ScopedLock l(lock); + // NOTE: attach is called before the ReplicatingSubscription is active so + // it's position is not changing. assert(firstSafe >= rs.getPosition()); + // Complete any messages before or at the ReplicatingSubscription position. + if (!delayed.empty() && delayed.front() <= rs.getPosition()) { + // FIXME aconway 2012-06-15: queue iteration, only messages in delayed + queue.eachMessage(boost::bind(&completeBefore, this, rs.getPosition(), _1)); + } + Mutex::ScopedLock l(lock); + // FIXME aconway 2012-06-15: complete messages before rs.getPosition subscription = &rs; } |