diff options
author | Alan Conway <aconway@apache.org> | 2012-06-12 21:20:27 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-06-12 21:20:27 +0000 |
commit | 6b638b03410d1487ba34f39dcbcbd73049d5ec52 (patch) | |
tree | 05a541ce210f18cb9ee2a5dd4992ce179ef6c327 | |
parent | b8067d2ecef01588f1fe73c8159cafacd8e1e217 (diff) | |
download | qpid-python-6b638b03410d1487ba34f39dcbcbd73049d5ec52.tar.gz |
QPID-3603: Guard should not complete messages from the internal event queue.
ReplicatingSubscription::acknowledge was calling Guard::complete for messages on the
internal event queue as well as the replicated queue. Corrected to only complete
messages from the replicated queue.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349542 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 21 |
4 files changed, 22 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index cd731fe732..9f655ff6eb 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -69,10 +69,10 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : assert(instance == 0); instance = this; // Let queue replicators find us. if (expect.empty()) { - QPID_LOG(debug, logPrefix << "No initial backups"); + QPID_LOG(debug, logPrefix << "Expected backups: none"); } else { - QPID_LOG(debug, logPrefix << "Waiting for initial backups: " << expect); + QPID_LOG(debug, logPrefix << "Expected backups: " << expect); for (BrokerInfo::Set::iterator i = expect.begin(); i != expect.end(); ++i) { boost::shared_ptr<RemoteBackup> backup( new RemoteBackup(*i, haBroker.getBroker(), haBroker.getReplicationTest())); diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index 5ea4c8e6f8..b330c4b9cc 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -61,6 +61,7 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) QueueGuard::~QueueGuard() { cancel(); } void QueueGuard::enqueued(const QueuedMessage& qm) { + assert(qm.queue == &queue); // Delay completion QPID_LOG(trace, logPrefix << "Delaying completion of " << qm); qm.payload->getIngressCompletion().startCompleter(); @@ -74,6 +75,7 @@ void QueueGuard::enqueued(const QueuedMessage& qm) { // FIXME aconway 2012-06-05: ERROR, must call on ReplicatingSubscription void QueueGuard::dequeued(const QueuedMessage& qm) { + assert(qm.queue == &queue); QPID_LOG(trace, logPrefix << "Dequeued " << qm); ReplicatingSubscription* rs = 0; { @@ -98,17 +100,19 @@ void QueueGuard::attach(ReplicatingSubscription& rs) { } void QueueGuard::complete(const QueuedMessage& qm, sys::Mutex::ScopedLock&) { - QPID_LOG(trace, logPrefix << "Completed " << qm); + assert(qm.queue == &queue); // The same message can be completed twice, by acknowledged and // dequeued, remove it from the set so we only call // finishCompleter() once if (delayed.contains(qm.position)) { + QPID_LOG(trace, logPrefix << "Completed " << qm); qm.payload->getIngressCompletion().finishCompleter(); delayed -= qm.position; } } void QueueGuard::complete(const QueuedMessage& qm) { + assert(qm.queue == &queue); Mutex::ScopedLock l(lock); complete(qm, l); } diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 8b12231453..efa45ff58c 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -128,7 +128,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa // FIXME aconway 2012-05-22: use a finite credit window peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - QPID_LOG(debug, logPrefix << "Subscribed bridge: " << bridgeName << " " << settings); + QPID_LOG(debug, logPrefix << "Subscribed: " << bridgeName); } namespace { diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 7438c95bc2..ebc2365664 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -233,7 +233,10 @@ ReplicatingSubscription::ReplicatingSubscription( // Set the guard if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo()); - if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo())); + if (!guard) { + QPID_LOG(debug, logPrefix << "No pre-set guard found, creating one."); + guard.reset(new QueueGuard(*queue, getBrokerInfo())); + } guard->attach(*this); // Guard is active, dequeued can be called concurrently. @@ -332,9 +335,12 @@ void ReplicatingSubscription::cancel() } // Consumer override, called on primary in the backup's IO thread. -void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) { - // Finish completion of message, it has been acknowledged by the backup. - guard->complete(msg); +void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) { + if (qm.queue == getQueue().get()) { // Don't complete messages on the internal queue + // Finish completion of message, it has been acknowledged by the backup. + QPID_LOG(trace, logPrefix << "Acknowledged " << qm); + guard->complete(qm); + } } // Hide the "queue deleted" error for a ReplicatingSubscription when a @@ -363,15 +369,16 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) // arbitrary connection threads. void ReplicatingSubscription::dequeued(const QueuedMessage& qm) { + assert (qm.queue == getQueue().get()); bool doComplete = false; - QPID_LOG(trace, logPrefix << "Dequeued " << qm); { Mutex::ScopedLock l(lock); + assert(!dequeues.contains(qm.position)); dequeues.add(qm.position); + // If we have not yet sent this message to the backup, then + // complete it now as it will never be accepted. if (qm.position > position) doComplete = true; } - // If we have not yet sent this message to the backup, then - // complete it now as it will never be accepted. if (doComplete) guard->complete(qm); notify(); // Ensure a call to doDispatch } |