diff options
author | Alan Conway <aconway@apache.org> | 2012-07-24 13:33:52 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-07-24 13:33:52 +0000 |
commit | 95d7e7dce5a56465cb8948fda8e969997c475ac7 (patch) | |
tree | e34de075c7d354e279d494bd1d5024947b6522f8 | |
parent | 6056bae55c25c6ecdbc6ca58fc9a5ae7d0d74a48 (diff) | |
download | qpid-python-95d7e7dce5a56465cb8948fda8e969997c475ac7.tar.gz |
NO-JIRA: Fix typos, update comments, update log messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1365046 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Consumer.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaPlugin.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.h | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 9 |
7 files changed, 32 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index 682c75ed4f..64073621be 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -54,7 +54,9 @@ class Consumer bool preAcquires() const { return acquires; } const std::string& getName() const { return name; } + /**@return the position of the last message seen by this consumer */ virtual framing::SequenceNumber getPosition() const { return position; } + virtual void setPosition(framing::SequenceNumber pos) { position = pos; } virtual bool deliver(QueuedMessage& msg) = 0; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 14b5218198..4cddbcd657 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -71,6 +71,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) // otherwise there's a window for a client to connect before we get to // initialize() if (settings.cluster) { + QPID_LOG(debug, logPrefix << "Rejecting client connections."); observer->setObserver(boost::shared_ptr<broker::ConnectionObserver>( new BackupConnectionExcluder)); broker.getConnectionObservers().add(observer); diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index 73f4ed5e8c..f7fe553d9b 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -66,7 +66,7 @@ struct HaPlugin : public Plugin { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); if (broker) { // Must create the HaBroker in earlyInitialize so it can set up its - // connection observer before clients start conneting. + // connection observer before clients start connecting. haBroker.reset(new ha::HaBroker(*broker, settings)); broker->addFinalizer(boost::bind(&HaPlugin::finalize, this)); } diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 2a3eb86b64..fecb7a1205 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -129,11 +129,15 @@ void Primary::checkReady(Mutex::ScopedLock&) { void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) { if (i != backups.end() && i->second->isReady()) { BrokerInfo info = i->second->getBrokerInfo(); - QPID_LOG(info, "Expected backup is ready: " << info); info.setStatus(READY); + QPID_LOG(info, "Expected backup is ready: " << info); haBroker.addBroker(info); - expectedBackups.erase(i->second); - checkReady(l); + if (expectedBackups.erase(i->second)) { + QPID_LOG(info, logPrefix << "Expected backup is ready: " << info); + checkReady(l); + } + else + QPID_LOG(info, logPrefix << "Backup is ready: " << info); } } @@ -147,7 +151,7 @@ void Primary::timeoutExpectedBackups() { boost::shared_ptr<RemoteBackup> rb = *i; if (!rb->isConnected()) { BrokerInfo info = rb->getBrokerInfo(); - QPID_LOG(error, "Expected backup timed out: " << info); + QPID_LOG(error, logPrefix << "Expected backup timed out: " << info); expectedBackups.erase(i++); backups.erase(info.getSystemId()); rb->cancel(); diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index 85feadd2ab..a30ab1f73c 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -101,7 +101,7 @@ void QueueGuard::cancel() { void QueueGuard::attach(ReplicatingSubscription& rs) { Mutex::ScopedLock l(lock); - subscription = &rs; + subscription = &rs; } namespace { diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h index 8cc2055381..bc8f40b65f 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.h +++ b/qpid/cpp/src/qpid/ha/QueueGuard.h @@ -79,13 +79,20 @@ class QueueGuard { void attach(ReplicatingSubscription&); /** - * Return the queue range at the time the QueueGuard was created. The - * QueueGuard is created before the queue becomes active: either when a - * backup is promoted, or when a new queue is created on the primary. + * Return the un-guarded queue range at the time the QueueGuard was created. + * + * The first position guaranteed to be protected by the guard is + * getRange().getBack()+1. It is possible that the guard has protected some + * messages before that point. Any such messages are dealt with in subscriptionStart + * + * The QueueGuard is created in 3 situations + * - when a backup is promoted, guards are created for expected backups. + * - when a new queue is created on the primary + * - when a new backup joins. + * + * In the last situation the queue is active while the guard is being + * created. * - * NOTE: The first position guaranteed to be protected by the guard is - * getRange().getBack()+1. It is possible that the guard has protected - * some messages before that point. */ const QueueRange& getRange() const { return range; } // range is immutable, no lock needed. diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 629014b215..0c4e61ba6d 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -175,13 +175,13 @@ ReplicatingSubscription::ReplicatingSubscription( logPrefix = os.str(); // NOTE: Once the guard is attached we can have concurrent - // calls to dequeued so we need to lock use of this->deques. + // calls to dequeued so we need to lock use of this->dequeues. // // However we must attach the guard _before_ we scan for // initial dequeues to be sure we don't miss any dequeues // between the scan and attaching the guard. if (Primary::get()) guard = Primary::get()->getGuard(queue, info); - if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo())); + if (!guard) guard.reset(new QueueGuard(*queue, info)); guard->attach(*this); QueueRange backup(arguments); // The remote backup state. @@ -213,6 +213,9 @@ ReplicatingSubscription::ReplicatingSubscription( scan.finish(); position = backup.back; } + // NOTE: we are assuming that the messages that are on the backup are + // consistent with those on the primary. If the backup is a replica + // queue and hasn't been tampered with then that will be the case. QPID_LOG(debug, logPrefix << "Subscribed:" << " backup:" << backup @@ -332,7 +335,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) void ReplicatingSubscription::dequeued(const QueuedMessage& qm) { assert (qm.queue == getQueue().get()); - QPID_LOG(trace, logPrefix << "Dequeued " << qm); + QPID_LOG(trace, logPrefix << "Dequeued " << qm); { Mutex::ScopedLock l(lock); dequeues.add(qm.position); |