summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueGuard.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-12 21:21:09 +0000
committerAlan Conway <aconway@apache.org>2012-06-12 21:21:09 +0000
commitab9698ad8182d3b9369de7170a38cda044cba0cf (patch)
tree4d7310a2adf744f28b78ed3731e043986cf25a72 /cpp/src/qpid/ha/QueueGuard.cpp
parent1c3d8a039f101e09e3d4c4c0f8d19cc3619d5584 (diff)
downloadqpid-python-ab9698ad8182d3b9369de7170a38cda044cba0cf.tar.gz
QPID-3603: HA bug fixes around transition to ready status
- Simplify QueueGuard::firstSafe calculation. - Fix error in setting initial queues - was not checking if replicated. - Send ready status to backups. Tests hang, deadlock in opened()->RemoteBackup on primary? - Fix deadlock in QueueGuard. - Don't start guards inside ConnectionObserver::opened. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1349547 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/QueueGuard.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueGuard.cpp23
1 files changed, 6 insertions, 17 deletions
diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp
index b577b3cfdb..40262a180c 100644
--- a/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/cpp/src/qpid/ha/QueueGuard.cpp
@@ -49,21 +49,16 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
- : queue(q), subscription(0), isFirstSet(false)
+ : queue(q), subscription(0)
{
std::ostringstream os;
os << "HA primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
- queue.addObserver(observer); // We can now receive concurrent calls to dequeued
- sys::Mutex::ScopedLock l(lock);
- // Race between this thread and enqueued thread to set first safe position.
- if (!isFirstSet) {
- // Must set after addObserver so we don't miss any dequeues.
- firstSafe = queue.getPosition()+1; // Next message will be safe.
- isFirstSet = true;
- QPID_LOG(debug, logPrefix << "First position (initial): " << firstSafe);
- }
+ // Once we call addObserver we can get calls to enqueued and dequeued
+ queue.addObserver(observer);
+ // Must set after addObserver so we don't miss any enqueues.
+ firstSafe = queue.getPosition()+1; // Next message will be safe.
}
QueueGuard::~QueueGuard() { cancel(); }
@@ -78,12 +73,6 @@ void QueueGuard::enqueued(const QueuedMessage& qm) {
Mutex::ScopedLock l(lock);
assert(!delayed.contains(qm.position));
delayed += qm.position;
- if (!isFirstSet) {
- firstSafe = qm.position;
- isFirstSet = true;
- QPID_LOG(debug, logPrefix << "First position (enqueued): " << firstSafe);
- }
- assert(qm.position >= firstSafe);
}
}
@@ -132,7 +121,7 @@ void QueueGuard::complete(const QueuedMessage& qm) {
}
framing::SequenceNumber QueueGuard::getFirstSafe() {
- // No lock, first is immutable.
+ // No lock, firstSafe is immutable.
return firstSafe;
}