summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-12 21:20:27 +0000
committerAlan Conway <aconway@apache.org>2012-06-12 21:20:27 +0000
commit6b638b03410d1487ba34f39dcbcbd73049d5ec52 (patch)
tree05a541ce210f18cb9ee2a5dd4992ce179ef6c327
parentb8067d2ecef01588f1fe73c8159cafacd8e1e217 (diff)
downloadqpid-python-6b638b03410d1487ba34f39dcbcbd73049d5ec52.tar.gz
QPID-3603: Guard should not complete messages from the internal event queue.
ReplicatingSubscription::acknowledge was calling Guard::complete for messages on the internal event queue as well as the replicated queue. Corrected to only complete messages from the replicated queue. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349542 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp6
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp21
4 files changed, 22 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index cd731fe732..9f655ff6eb 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -69,10 +69,10 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
assert(instance == 0);
instance = this; // Let queue replicators find us.
if (expect.empty()) {
- QPID_LOG(debug, logPrefix << "No initial backups");
+ QPID_LOG(debug, logPrefix << "Expected backups: none");
}
else {
- QPID_LOG(debug, logPrefix << "Waiting for initial backups: " << expect);
+ QPID_LOG(debug, logPrefix << "Expected backups: " << expect);
for (BrokerInfo::Set::iterator i = expect.begin(); i != expect.end(); ++i) {
boost::shared_ptr<RemoteBackup> backup(
new RemoteBackup(*i, haBroker.getBroker(), haBroker.getReplicationTest()));
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index 5ea4c8e6f8..b330c4b9cc 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -61,6 +61,7 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
QueueGuard::~QueueGuard() { cancel(); }
void QueueGuard::enqueued(const QueuedMessage& qm) {
+ assert(qm.queue == &queue);
// Delay completion
QPID_LOG(trace, logPrefix << "Delaying completion of " << qm);
qm.payload->getIngressCompletion().startCompleter();
@@ -74,6 +75,7 @@ void QueueGuard::enqueued(const QueuedMessage& qm) {
// FIXME aconway 2012-06-05: ERROR, must call on ReplicatingSubscription
void QueueGuard::dequeued(const QueuedMessage& qm) {
+ assert(qm.queue == &queue);
QPID_LOG(trace, logPrefix << "Dequeued " << qm);
ReplicatingSubscription* rs = 0;
{
@@ -98,17 +100,19 @@ void QueueGuard::attach(ReplicatingSubscription& rs) {
}
void QueueGuard::complete(const QueuedMessage& qm, sys::Mutex::ScopedLock&) {
- QPID_LOG(trace, logPrefix << "Completed " << qm);
+ assert(qm.queue == &queue);
// The same message can be completed twice, by acknowledged and
// dequeued, remove it from the set so we only call
// finishCompleter() once
if (delayed.contains(qm.position)) {
+ QPID_LOG(trace, logPrefix << "Completed " << qm);
qm.payload->getIngressCompletion().finishCompleter();
delayed -= qm.position;
}
}
void QueueGuard::complete(const QueuedMessage& qm) {
+ assert(qm.queue == &queue);
Mutex::ScopedLock l(lock);
complete(qm, l);
}
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 8b12231453..efa45ff58c 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -128,7 +128,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
// FIXME aconway 2012-05-22: use a finite credit window
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
- QPID_LOG(debug, logPrefix << "Subscribed bridge: " << bridgeName << " " << settings);
+ QPID_LOG(debug, logPrefix << "Subscribed: " << bridgeName);
}
namespace {
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 7438c95bc2..ebc2365664 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -233,7 +233,10 @@ ReplicatingSubscription::ReplicatingSubscription(
// Set the guard
if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo());
- if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo()));
+ if (!guard) {
+ QPID_LOG(debug, logPrefix << "No pre-set guard found, creating one.");
+ guard.reset(new QueueGuard(*queue, getBrokerInfo()));
+ }
guard->attach(*this);
// Guard is active, dequeued can be called concurrently.
@@ -332,9 +335,12 @@ void ReplicatingSubscription::cancel()
}
// Consumer override, called on primary in the backup's IO thread.
-void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
- // Finish completion of message, it has been acknowledged by the backup.
- guard->complete(msg);
+void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
+ if (qm.queue == getQueue().get()) { // Don't complete messages on the internal queue
+ // Finish completion of message, it has been acknowledged by the backup.
+ QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
+ guard->complete(qm);
+ }
}
// Hide the "queue deleted" error for a ReplicatingSubscription when a
@@ -363,15 +369,16 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
// arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
+ assert (qm.queue == getQueue().get());
bool doComplete = false;
- QPID_LOG(trace, logPrefix << "Dequeued " << qm);
{
Mutex::ScopedLock l(lock);
+ assert(!dequeues.contains(qm.position));
dequeues.add(qm.position);
+ // If we have not yet sent this message to the backup, then
+ // complete it now as it will never be accepted.
if (qm.position > position) doComplete = true;
}
- // If we have not yet sent this message to the backup, then
- // complete it now as it will never be accepted.
if (doComplete) guard->complete(qm);
notify(); // Ensure a call to doDispatch
}