diff options
| author | Alan Conway <aconway@apache.org> | 2012-06-12 21:21:09 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-06-12 21:21:09 +0000 |
| commit | ab9698ad8182d3b9369de7170a38cda044cba0cf (patch) | |
| tree | 4d7310a2adf744f28b78ed3731e043986cf25a72 /cpp/src/qpid/ha/QueueGuard.cpp | |
| parent | 1c3d8a039f101e09e3d4c4c0f8d19cc3619d5584 (diff) | |
| download | qpid-python-ab9698ad8182d3b9369de7170a38cda044cba0cf.tar.gz | |
QPID-3603: HA bug fixes around transition to ready status
- Simplify QueueGuard::firstSafe calculation.
- Fix error in setting initial queues - was not checking if replicated.
- Send ready status to backups. Tests hang, deadlock in opened()->RemoteBackup on primary?
- Fix deadlock in QueueGuard.
- Don't start guards inside ConnectionObserver::opened.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1349547 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/QueueGuard.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/QueueGuard.cpp | 23 |
1 files changed, 6 insertions, 17 deletions
diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp index b577b3cfdb..40262a180c 100644 --- a/cpp/src/qpid/ha/QueueGuard.cpp +++ b/cpp/src/qpid/ha/QueueGuard.cpp @@ -49,21 +49,16 @@ class QueueGuard::QueueObserver : public broker::QueueObserver QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) - : queue(q), subscription(0), isFirstSet(false) + : queue(q), subscription(0) { std::ostringstream os; os << "HA primary guard " << queue.getName() << "@" << info.getLogId() << ": "; logPrefix = os.str(); observer.reset(new QueueObserver(*this)); - queue.addObserver(observer); // We can now receive concurrent calls to dequeued - sys::Mutex::ScopedLock l(lock); - // Race between this thread and enqueued thread to set first safe position. - if (!isFirstSet) { - // Must set after addObserver so we don't miss any dequeues. - firstSafe = queue.getPosition()+1; // Next message will be safe. - isFirstSet = true; - QPID_LOG(debug, logPrefix << "First position (initial): " << firstSafe); - } + // Once we call addObserver we can get calls to enqueued and dequeued + queue.addObserver(observer); + // Must set after addObserver so we don't miss any enqueues. + firstSafe = queue.getPosition()+1; // Next message will be safe. } QueueGuard::~QueueGuard() { cancel(); } @@ -78,12 +73,6 @@ void QueueGuard::enqueued(const QueuedMessage& qm) { Mutex::ScopedLock l(lock); assert(!delayed.contains(qm.position)); delayed += qm.position; - if (!isFirstSet) { - firstSafe = qm.position; - isFirstSet = true; - QPID_LOG(debug, logPrefix << "First position (enqueued): " << firstSafe); - } - assert(qm.position >= firstSafe); } } @@ -132,7 +121,7 @@ void QueueGuard::complete(const QueuedMessage& qm) { } framing::SequenceNumber QueueGuard::getFirstSafe() { - // No lock, first is immutable. + // No lock, firstSafe is immutable. return firstSafe; } |
