summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-22 19:28:20 +0000
committerAlan Conway <aconway@apache.org>2012-06-22 19:28:20 +0000
commit5fa99445dc43cb0af8aeed08dd32f803c2329252 (patch)
tree64b4b63f6db7440e95cdc6704ce5ea1e813f3b89
parent62b928632b4779ec841070bfe0b7e9c50506a0c1 (diff)
downloadqpid-python-5fa99445dc43cb0af8aeed08dd32f803c2329252.tar.gz
QPID-4078: Fix primary self-connections in long running test.
Assert to detect self-connection were triggered in log runs of ha_tests.py test_failover_send_receive. Fix: - HaBroker close backup link before removing broker-info for outgoing link. - HaBroker removes own address from failover addresses. - Link.cpp: Skip ioThreadProcessing and maintenanceVisit on a link that is closed. - Minor improvements to log messages and comments. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1352999 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp6
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp34
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp32
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h2
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp4
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py2
9 files changed, 46 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 1f6e6ac856..1be388b989 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -445,7 +445,7 @@ void Link::ioThreadProcessing()
{
Mutex::ScopedLock mutex(lock);
- if (state != STATE_OPERATIONAL)
+ if (state != STATE_OPERATIONAL || closing)
return;
// check for bridge session errors and recover
@@ -482,7 +482,7 @@ void Link::ioThreadProcessing()
void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
-
+ if (closing) return;
if (state == STATE_WAITING)
{
visitCount++;
@@ -500,7 +500,7 @@ void Link::maintenanceVisit ()
}
else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
- }
+}
void Link::reconnectLH(const Address& a)
{
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 8ed64c6767..90c615aaf5 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -56,25 +56,24 @@ bool Backup::isSelf(const Address& a) const {
a.port == haBroker.getBroker().getPort(a.protocol);
}
-Url Backup::linkUrl(const Url& brokers) const {
- return brokers;
- /** FIXME aconway 2012-05-29: Problems with self-test, false positives.
- // linkUrl contains only the addresses of *
- other* brokers, not this one.
+// Remove my own address from the URL if possible.
+// This isn't 100% reliable given the many ways to specify a host,
+// but should work in most cases. We have additional measures to prevent
+// self-connection in ConnectionObserver
+Url Backup::removeSelf(const Url& brokers) const {
Url url;
for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
if (!isSelf(*i)) url.push_back(*i);
- if (url.empty()) throw Url::Invalid("HA Backup failover URL is empty");
- QPID_LOG(debug, logPrefix << " failover URL (excluding self): " << url);
+ if (url.empty())
+ throw Url::Invalid(logPrefix+"Failover URL is empty");
+ QPID_LOG(debug, logPrefix << "Failover URL (excluding self): " << url);
return url;
- */
}
void Backup::initialize(const Url& brokers) {
if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
QPID_LOG(info, logPrefix << "Initialized, broker URL: " << brokers);
- sys::Mutex::ScopedLock l(lock);
- Url url = linkUrl(brokers);
+ Url url = removeSelf(brokers);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
types::Uuid uuid(true);
// Declare the link
@@ -93,8 +92,6 @@ void Backup::initialize(const Url& brokers) {
Backup::~Backup() {
if (link) link->close();
- // FIXME aconway 2012-05-30: race: may have outstanding initializeBridge calls
- // pointing to this.
if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
replicator.reset();
}
@@ -103,12 +100,15 @@ Backup::~Backup() {
void Backup::setBrokerUrl(const Url& url) {
// Ignore empty URLs seen during start-up for some tests.
if (url.empty()) return;
- sys::Mutex::ScopedLock l(lock);
- if (link) {
- QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
- link->setUrl(linkUrl(url));
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (link) {
+ QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
+ link->setUrl(removeSelf(url));
+ return;
+ }
}
- else initialize(url); // Deferred initialization
+ initialize(url); // Deferred initialization
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index ca3c2a02d0..c3c4fbbbfc 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -53,7 +53,7 @@ class Backup
private:
bool isSelf(const Address& a) const;
- Url linkUrl(const Url&) const;
+ Url removeSelf(const Url&) const;
void initialize(const Url&);
std::string logPrefix;
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index d510d13b31..bff7a188b1 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -170,8 +170,9 @@ Variant::Map asMapVoid(const Variant& value) {
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
- logPrefix("Backup configuration: "), replicationTest(hb.getReplicationTest()),
- haBroker(hb), broker(hb.getBroker()), link(l)
+ logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
+ haBroker(hb), broker(hb.getBroker()), link(l),
+ initialized(false)
{}
void BrokerReplicator::initialize() {
@@ -202,28 +203,32 @@ BrokerReplicator::~BrokerReplicator() { link->close(); }
// This is called in the connection IO thread when the bridge is started.
void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ qpid::Address primary;
+ link->getRemoteAddress(primary);
+ string queueName = bridge.getQueueName();
+
+ QPID_LOG(info, logPrefix << (initialized ? "Connecting" : "Failing-over")
+ << " to primary " << primary
+ << " status:" << printable(haBroker.getStatus()));
+ initialized = true;
switch (haBroker.getStatus()) {
case JOINING:
haBroker.setStatus(CATCHUP);
+ break;
case CATCHUP:
- // FIXME aconway 2012-04-27: distinguish catchup case, below.
break;
case READY:
- // FIXME aconway 2012-04-27: distinguish ready case, reconnect to other backup.
break;
case RECOVERING:
case ACTIVE:
- // FIXME aconway 2012-04-27: link is connected to self!
- // Promotion should close the link before allowing connections.
+ assert(0); // Primary does not reconnect.
return;
- break;
case STANDALONE:
return;
}
framing::AMQP_ServerProxy peer(sessionHandler.out);
- string queueName = bridge.getQueueName();
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
//declare and bind an event queue
@@ -243,9 +248,9 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
- qpid::Address primary;
- link->getRemoteAddress(primary);
- QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << queueName << ")");
+
+ QPID_LOG(debug, logPrefix << "Connected to primary " << primary
+ << "(" << queueName << ")" << " status:" << printable(haBroker.getStatus()));
}
void BrokerReplicator::route(Deliverable& msg) {
@@ -571,9 +576,4 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
-void BrokerReplicator::ready() {
- assert(haBroker.getStatus() == CATCHUP);
- haBroker.setStatus(READY);
-}
-
}} // namespace broker
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 9788e4b647..f7439fe892 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -94,13 +94,13 @@ class BrokerReplicator : public broker::Exchange,
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
- void ready();
std::string logPrefix;
ReplicationTest replicationTest;
HaBroker& haBroker;
broker::Broker& broker;
boost::shared_ptr<broker::Link> link;
+ bool initialized;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index da03a04013..3967a45515 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -110,8 +110,11 @@ HaBroker::~HaBroker() {
}
void HaBroker::recover(Mutex::ScopedLock&) {
+ // No longer replicating, close link. Note: link must be closed before we
+ // setStatus(RECOVERING) as that will remove our broker info from the
+ // outgoing link properties so we won't recognize self-connects.
+ backup.reset();
setStatus(RECOVERING);
- backup.reset(); // No longer replicating, close link.
BrokerInfo::Set backups = membership.otherBackups();
membership.reset(brokerInfo);
// Drop the lock, new Primary may call back on activate.
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index d36789e565..aa15944259 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -56,7 +56,7 @@ class ConnectionObserver;
class Primary;
/**
- * HA state and actions associated with a broker.
+ * HA state and actions associated with a HA broker. Holds all the management info.
*
* THREAD SAFE: may be called in arbitrary broker IO or timer threads.
*/
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index a874559655..1fa51b6f68 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -70,13 +70,13 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
assert(instance == 0);
instance = this; // Let queue replicators find us.
if (expect.empty()) {
- QPID_LOG(debug, logPrefix << "Expected backups: none");
+ QPID_LOG(debug, logPrefix << "Promoted, no expected backups");
}
else {
// NOTE: RemoteBackups must be created before we set the ConfigurationObserver
// orr ConnectionObserver so that there is no client activity while
// the QueueGuards are created.
- QPID_LOG(debug, logPrefix << "Expected backups: " << expect);
+ QPID_LOG(debug, logPrefix << "Promoted, expected backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
bool guard = true; // Create queue guards immediately for expected backups.
boost::shared_ptr<RemoteBackup> backup(
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 7338136bfd..4d07d386f9 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -87,7 +87,7 @@ class HaBroker(Broker):
# Ignore ConnectionError, the broker may not be up yet.
try: return self.ha_status() == status;
except ConnectionError: return False
- assert retry(try_get_status, timeout=20), "%s, %r != %r"%(self, self.ha_status(), status)
+ assert retry(try_get_status, timeout=20), "%s status != %r"%(self, status)
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):