summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-07-24 13:33:52 +0000
committerAlan Conway <aconway@apache.org>2012-07-24 13:33:52 +0000
commit95d7e7dce5a56465cb8948fda8e969997c475ac7 (patch)
treee34de075c7d354e279d494bd1d5024947b6522f8
parent6056bae55c25c6ecdbc6ca58fc9a5ae7d0d74a48 (diff)
downloadqpid-python-95d7e7dce5a56465cb8948fda8e969997c475ac7.tar.gz
NO-JIRA: Fix typos, update comments, update log messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1365046 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp12
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h19
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp9
7 files changed, 32 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index 682c75ed4f..64073621be 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -54,7 +54,9 @@ class Consumer
bool preAcquires() const { return acquires; }
const std::string& getName() const { return name; }
+ /**@return the position of the last message seen by this consumer */
virtual framing::SequenceNumber getPosition() const { return position; }
+
virtual void setPosition(framing::SequenceNumber pos) { position = pos; }
virtual bool deliver(QueuedMessage& msg) = 0;
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 14b5218198..4cddbcd657 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -71,6 +71,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
// otherwise there's a window for a client to connect before we get to
// initialize()
if (settings.cluster) {
+ QPID_LOG(debug, logPrefix << "Rejecting client connections.");
observer->setObserver(boost::shared_ptr<broker::ConnectionObserver>(
new BackupConnectionExcluder));
broker.getConnectionObservers().add(observer);
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index 73f4ed5e8c..f7fe553d9b 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -66,7 +66,7 @@ struct HaPlugin : public Plugin {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker) {
// Must create the HaBroker in earlyInitialize so it can set up its
- // connection observer before clients start conneting.
+ // connection observer before clients start connecting.
haBroker.reset(new ha::HaBroker(*broker, settings));
broker->addFinalizer(boost::bind(&HaPlugin::finalize, this));
}
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 2a3eb86b64..fecb7a1205 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -129,11 +129,15 @@ void Primary::checkReady(Mutex::ScopedLock&) {
void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
if (i != backups.end() && i->second->isReady()) {
BrokerInfo info = i->second->getBrokerInfo();
- QPID_LOG(info, "Expected backup is ready: " << info);
info.setStatus(READY);
+ QPID_LOG(info, "Expected backup is ready: " << info);
haBroker.addBroker(info);
- expectedBackups.erase(i->second);
- checkReady(l);
+ if (expectedBackups.erase(i->second)) {
+ QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
+ checkReady(l);
+ }
+ else
+ QPID_LOG(info, logPrefix << "Backup is ready: " << info);
}
}
@@ -147,7 +151,7 @@ void Primary::timeoutExpectedBackups() {
boost::shared_ptr<RemoteBackup> rb = *i;
if (!rb->isConnected()) {
BrokerInfo info = rb->getBrokerInfo();
- QPID_LOG(error, "Expected backup timed out: " << info);
+ QPID_LOG(error, logPrefix << "Expected backup timed out: " << info);
expectedBackups.erase(i++);
backups.erase(info.getSystemId());
rb->cancel();
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index 85feadd2ab..a30ab1f73c 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -101,7 +101,7 @@ void QueueGuard::cancel() {
void QueueGuard::attach(ReplicatingSubscription& rs) {
Mutex::ScopedLock l(lock);
- subscription = &rs;
+ subscription = &rs;
}
namespace {
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
index 8cc2055381..bc8f40b65f 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.h
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -79,13 +79,20 @@ class QueueGuard {
void attach(ReplicatingSubscription&);
/**
- * Return the queue range at the time the QueueGuard was created. The
- * QueueGuard is created before the queue becomes active: either when a
- * backup is promoted, or when a new queue is created on the primary.
+ * Return the un-guarded queue range at the time the QueueGuard was created.
+ *
+ * The first position guaranteed to be protected by the guard is
+ * getRange().getBack()+1. It is possible that the guard has protected some
+ * messages before that point. Any such messages are dealt with in subscriptionStart
+ *
+ * The QueueGuard is created in 3 situations
+ * - when a backup is promoted, guards are created for expected backups.
+ * - when a new queue is created on the primary
+ * - when a new backup joins.
+ *
+ * In the last situation the queue is active while the guard is being
+ * created.
*
- * NOTE: The first position guaranteed to be protected by the guard is
- * getRange().getBack()+1. It is possible that the guard has protected
- * some messages before that point.
*/
const QueueRange& getRange() const { return range; } // range is immutable, no lock needed.
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 629014b215..0c4e61ba6d 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -175,13 +175,13 @@ ReplicatingSubscription::ReplicatingSubscription(
logPrefix = os.str();
// NOTE: Once the guard is attached we can have concurrent
- // calls to dequeued so we need to lock use of this->deques.
+ // calls to dequeued so we need to lock use of this->dequeues.
//
// However we must attach the guard _before_ we scan for
// initial dequeues to be sure we don't miss any dequeues
// between the scan and attaching the guard.
if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
- if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo()));
+ if (!guard) guard.reset(new QueueGuard(*queue, info));
guard->attach(*this);
QueueRange backup(arguments); // The remote backup state.
@@ -213,6 +213,9 @@ ReplicatingSubscription::ReplicatingSubscription(
scan.finish();
position = backup.back;
}
+ // NOTE: we are assuming that the messages that are on the backup are
+ // consistent with those on the primary. If the backup is a replica
+ // queue and hasn't been tampered with then that will be the case.
QPID_LOG(debug, logPrefix << "Subscribed:"
<< " backup:" << backup
@@ -332,7 +335,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
assert (qm.queue == getQueue().get());
- QPID_LOG(trace, logPrefix << "Dequeued " << qm);
+ QPID_LOG(trace, logPrefix << "Dequeued " << qm);
{
Mutex::ScopedLock l(lock);
dequeues.add(qm.position);