diff options
author | Alan Conway <aconway@apache.org> | 2012-06-22 19:28:20 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-06-22 19:28:20 +0000 |
commit | 5fa99445dc43cb0af8aeed08dd32f803c2329252 (patch) | |
tree | 64b4b63f6db7440e95cdc6704ce5ea1e813f3b89 | |
parent | 62b928632b4779ec841070bfe0b7e9c50506a0c1 (diff) | |
download | qpid-python-5fa99445dc43cb0af8aeed08dd32f803c2329252.tar.gz |
QPID-4078: Fix primary self-connections in long running test.
Assert to detect self-connection were triggered in log runs of ha_tests.py
test_failover_send_receive. Fix:
- HaBroker close backup link before removing broker-info for outgoing link.
- HaBroker removes own address from failover addresses.
- Link.cpp: Skip ioThreadProcessing and maintenanceVisit on a link that is closed.
- Minor improvements to log messages and comments.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1352999 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 34 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 32 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 2 |
9 files changed, 46 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 1f6e6ac856..1be388b989 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -445,7 +445,7 @@ void Link::ioThreadProcessing() { Mutex::ScopedLock mutex(lock); - if (state != STATE_OPERATIONAL) + if (state != STATE_OPERATIONAL || closing) return; // check for bridge session errors and recover @@ -482,7 +482,7 @@ void Link::ioThreadProcessing() void Link::maintenanceVisit () { Mutex::ScopedLock mutex(lock); - + if (closing) return; if (state == STATE_WAITING) { visitCount++; @@ -500,7 +500,7 @@ void Link::maintenanceVisit () } else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); - } +} void Link::reconnectLH(const Address& a) { diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 8ed64c6767..90c615aaf5 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -56,25 +56,24 @@ bool Backup::isSelf(const Address& a) const { a.port == haBroker.getBroker().getPort(a.protocol); } -Url Backup::linkUrl(const Url& brokers) const { - return brokers; - /** FIXME aconway 2012-05-29: Problems with self-test, false positives. - // linkUrl contains only the addresses of * - other* brokers, not this one. +// Remove my own address from the URL if possible. +// This isn't 100% reliable given the many ways to specify a host, +// but should work in most cases. We have additional measures to prevent +// self-connection in ConnectionObserver +Url Backup::removeSelf(const Url& brokers) const { Url url; for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i) if (!isSelf(*i)) url.push_back(*i); - if (url.empty()) throw Url::Invalid("HA Backup failover URL is empty"); - QPID_LOG(debug, logPrefix << " failover URL (excluding self): " << url); + if (url.empty()) + throw Url::Invalid(logPrefix+"Failover URL is empty"); + QPID_LOG(debug, logPrefix << "Failover URL (excluding self): " << url); return url; - */ } void Backup::initialize(const Url& brokers) { if (brokers.empty()) throw Url::Invalid("HA broker URL is empty"); QPID_LOG(info, logPrefix << "Initialized, broker URL: " << brokers); - sys::Mutex::ScopedLock l(lock); - Url url = linkUrl(brokers); + Url url = removeSelf(brokers); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; types::Uuid uuid(true); // Declare the link @@ -93,8 +92,6 @@ void Backup::initialize(const Url& brokers) { Backup::~Backup() { if (link) link->close(); - // FIXME aconway 2012-05-30: race: may have outstanding initializeBridge calls - // pointing to this. if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); replicator.reset(); } @@ -103,12 +100,15 @@ Backup::~Backup() { void Backup::setBrokerUrl(const Url& url) { // Ignore empty URLs seen during start-up for some tests. if (url.empty()) return; - sys::Mutex::ScopedLock l(lock); - if (link) { - QPID_LOG(info, logPrefix << "Broker URL set to: " << url); - link->setUrl(linkUrl(url)); + { + sys::Mutex::ScopedLock l(lock); + if (link) { + QPID_LOG(info, logPrefix << "Broker URL set to: " << url); + link->setUrl(removeSelf(url)); + return; + } } - else initialize(url); // Deferred initialization + initialize(url); // Deferred initialization } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h index ca3c2a02d0..c3c4fbbbfc 100644 --- a/qpid/cpp/src/qpid/ha/Backup.h +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -53,7 +53,7 @@ class Backup private: bool isSelf(const Address& a) const; - Url linkUrl(const Url&) const; + Url removeSelf(const Url&) const; void initialize(const Url&); std::string logPrefix; diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index d510d13b31..bff7a188b1 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -170,8 +170,9 @@ Variant::Map asMapVoid(const Variant& value) { BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), - logPrefix("Backup configuration: "), replicationTest(hb.getReplicationTest()), - haBroker(hb), broker(hb.getBroker()), link(l) + logPrefix("Backup: "), replicationTest(hb.getReplicationTest()), + haBroker(hb), broker(hb.getBroker()), link(l), + initialized(false) {} void BrokerReplicator::initialize() { @@ -202,28 +203,32 @@ BrokerReplicator::~BrokerReplicator() { link->close(); } // This is called in the connection IO thread when the bridge is started. void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + qpid::Address primary; + link->getRemoteAddress(primary); + string queueName = bridge.getQueueName(); + + QPID_LOG(info, logPrefix << (initialized ? "Connecting" : "Failing-over") + << " to primary " << primary + << " status:" << printable(haBroker.getStatus())); + initialized = true; switch (haBroker.getStatus()) { case JOINING: haBroker.setStatus(CATCHUP); + break; case CATCHUP: - // FIXME aconway 2012-04-27: distinguish catchup case, below. break; case READY: - // FIXME aconway 2012-04-27: distinguish ready case, reconnect to other backup. break; case RECOVERING: case ACTIVE: - // FIXME aconway 2012-04-27: link is connected to self! - // Promotion should close the link before allowing connections. + assert(0); // Primary does not reconnect. return; - break; case STANDALONE: return; } framing::AMQP_ServerProxy peer(sessionHandler.out); - string queueName = bridge.getQueueName(); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); //declare and bind an event queue @@ -243,9 +248,9 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler); - qpid::Address primary; - link->getRemoteAddress(primary); - QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << queueName << ")"); + + QPID_LOG(debug, logPrefix << "Connected to primary " << primary + << "(" << queueName << ")" << " status:" << printable(haBroker.getStatus())); } void BrokerReplicator::route(Deliverable& msg) { @@ -571,9 +576,4 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } -void BrokerReplicator::ready() { - assert(haBroker.getStatus() == CATCHUP); - haBroker.setStatus(READY); -} - }} // namespace broker diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 9788e4b647..f7439fe892 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -94,13 +94,13 @@ class BrokerReplicator : public broker::Exchange, QueueReplicatorPtr findQueueReplicator(const std::string& qname); void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); - void ready(); std::string logPrefix; ReplicationTest replicationTest; HaBroker& haBroker; broker::Broker& broker; boost::shared_ptr<broker::Link> link; + bool initialized; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index da03a04013..3967a45515 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -110,8 +110,11 @@ HaBroker::~HaBroker() { } void HaBroker::recover(Mutex::ScopedLock&) { + // No longer replicating, close link. Note: link must be closed before we + // setStatus(RECOVERING) as that will remove our broker info from the + // outgoing link properties so we won't recognize self-connects. + backup.reset(); setStatus(RECOVERING); - backup.reset(); // No longer replicating, close link. BrokerInfo::Set backups = membership.otherBackups(); membership.reset(brokerInfo); // Drop the lock, new Primary may call back on activate. diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index d36789e565..aa15944259 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -56,7 +56,7 @@ class ConnectionObserver; class Primary; /** - * HA state and actions associated with a broker. + * HA state and actions associated with a HA broker. Holds all the management info. * * THREAD SAFE: may be called in arbitrary broker IO or timer threads. */ diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index a874559655..1fa51b6f68 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -70,13 +70,13 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : assert(instance == 0); instance = this; // Let queue replicators find us. if (expect.empty()) { - QPID_LOG(debug, logPrefix << "Expected backups: none"); + QPID_LOG(debug, logPrefix << "Promoted, no expected backups"); } else { // NOTE: RemoteBackups must be created before we set the ConfigurationObserver // orr ConnectionObserver so that there is no client activity while // the QueueGuards are created. - QPID_LOG(debug, logPrefix << "Expected backups: " << expect); + QPID_LOG(debug, logPrefix << "Promoted, expected backups: " << expect); for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) { bool guard = true; // Create queue guards immediately for expected backups. boost::shared_ptr<RemoteBackup> backup( diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 7338136bfd..4d07d386f9 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -87,7 +87,7 @@ class HaBroker(Broker): # Ignore ConnectionError, the broker may not be up yet. try: return self.ha_status() == status; except ConnectionError: return False - assert retry(try_get_status, timeout=20), "%s, %r != %r"%(self, self.ha_status(), status) + assert retry(try_get_status, timeout=20), "%s status != %r"%(self, status) # FIXME aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): |