summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-14 16:09:34 +0000
committerAlan Conway <aconway@apache.org>2012-02-14 16:09:34 +0000
commitb98cf8dfa84a636614a3b36beba9228f0a38e3c0 (patch)
tree054899b12a85b7c9fb9cee1de03d1986efca0bca
parent67c80caf7a9d4841ecb053057c8418608f243e52 (diff)
downloadqpid-python-b98cf8dfa84a636614a3b36beba9228f0a38e3c0.tar.gz
QPID-3603: Get rid of broker_url="primary" hack, promote primaries via management.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-6@1244086 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionObservers.h17
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp17
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h7
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.h5
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp34
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h3
-rw-r--r--qpid/cpp/src/tests/brokertest.py2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py23
-rwxr-xr-xqpid/tools/src/py/qpid-ha-status7
10 files changed, 68 insertions, 52 deletions
diff --git a/qpid/cpp/src/qpid/broker/ConnectionObservers.h b/qpid/cpp/src/qpid/broker/ConnectionObservers.h
index b36097cdbb..07e515f3c9 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionObservers.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionObservers.h
@@ -23,6 +23,9 @@
*/
#include "ConnectionObserver.h"
+#include "qpid/sys/Mutex.h"
+#include <set>
+#include <algorithm>
namespace qpid {
namespace broker {
@@ -30,12 +33,18 @@ namespace broker {
/**
* A collection of connection observers.
* Calling a ConnectionObserver function will call that function on each observer.
+ * THREAD SAFE.
*/
class ConnectionObservers : public ConnectionObserver {
public:
- // functions for managing the collection of observers
void add(boost::shared_ptr<ConnectionObserver> observer) {
- observers.push_back(observer);
+ sys::Mutex::ScopedLock l(lock);
+ observers.insert(observer);
+ }
+
+ void remove(boost::shared_ptr<ConnectionObserver> observer) {
+ sys::Mutex::ScopedLock l(lock);
+ observers.erase(observer);
}
void connection(Connection& c) {
@@ -55,10 +64,12 @@ class ConnectionObservers : public ConnectionObserver {
}
private:
- typedef std::vector<boost::shared_ptr<ConnectionObserver> > Observers;
+ typedef std::set<boost::shared_ptr<ConnectionObserver> > Observers;
+ sys::Mutex lock;
Observers observers;
template <class F> void each(F f) {
+ sys::Mutex::ScopedLock l(lock);
std::for_each(observers.begin(), observers.end(), f);
}
};
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 3476af3fe3..6e61ed0b46 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -22,6 +22,7 @@
#include "Settings.h"
#include "BrokerReplicator.h"
#include "ReplicatingSubscription.h"
+#include "ConnectionExcluder.h"
#include "qpid/Url.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Bridge.h"
@@ -42,11 +43,12 @@ using namespace broker;
using types::Variant;
using std::string;
-Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
+Backup::Backup(broker::Broker& b, const Settings& s) :
+ broker(b), settings(s), excluder(new ConnectionExcluder())
+{
Url url(s.brokerUrl);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
- // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over.
// Declare the link
std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
url[0].host, url[0].port, protocol,
@@ -54,9 +56,16 @@ Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
s.mechanism, s.username, s.password);
assert(result.second); // FIXME aconway 2011-11-23: error handling
link = result.first;
- boost::shared_ptr<BrokerReplicator> wr(new BrokerReplicator(link));
- broker.getExchanges().registerExchange(wr);
+
+ replicator.reset(new BrokerReplicator(link));
+ broker.getExchanges().registerExchange(replicator);
+
+ broker.getConnectionObservers().add(excluder);
}
+Backup::~Backup() {
+ broker.getExchanges().destroy(replicator->getName());
+ broker.getConnectionObservers().remove(excluder); // Allows client connections.
+}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index b4183a4dba..135363c714 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -27,6 +27,7 @@
#include <boost/shared_ptr.hpp>
namespace qpid {
+
namespace broker {
class Broker;
class Link;
@@ -34,6 +35,8 @@ class Link;
namespace ha {
class Settings;
+class ConnectionExcluder;
+class BrokerReplicator;
/**
* State associated with a backup broker. Manages connections to primary.
@@ -45,12 +48,16 @@ class Backup
{
public:
Backup(broker::Broker&, const Settings&);
+ ~Backup();
private:
broker::Broker& broker;
Settings settings;
boost::shared_ptr<broker::Link> link;
+ boost::shared_ptr<BrokerReplicator> replicator;
+ boost::shared_ptr<ConnectionExcluder> excluder;
};
+
}} // namespace qpid::ha
#endif /*!QPID_HA_BACKUP_H*/
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
index df50f1da19..67ad7202d6 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
@@ -27,11 +27,10 @@
namespace qpid {
namespace ha {
-ConnectionExcluder::ConnectionExcluder(PrimaryTest isPrimary_) : isPrimary(isPrimary_) {}
+ConnectionExcluder::ConnectionExcluder() {}
void ConnectionExcluder::opened(broker::Connection& connection) {
- if (!isPrimary() && !connection.isLink()
- && !connection.getClientProperties().isSet(ADMIN_TAG))
+ if (!connection.isLink() && !connection.getClientProperties().isSet(ADMIN_TAG))
throw Exception(
QPID_MSG("HA: Backup broker rejected connection " << connection.getMgmtId()));
}
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
index e6c299884e..f8f2843a0c 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
@@ -41,15 +41,12 @@ namespace ha {
class ConnectionExcluder : public broker::ConnectionObserver
{
public:
- typedef boost::function<bool()> PrimaryTest;
-
- ConnectionExcluder(PrimaryTest isPrimary_);
+ ConnectionExcluder();
void opened(broker::Connection& connection);
private:
static const std::string ADMIN_TAG;
- PrimaryTest isPrimary;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 60b4b69581..0e342bd17c 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -55,22 +55,16 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
settings(s),
clientUrl(url(s.clientUrl, "ha-client-url")),
brokerUrl(url(s.brokerUrl, "ha-broker-url")),
+ backup(new Backup(b, s)),
mgmtObject(0)
{
- // FIXME aconway 2011-11-22: temporary hack to identify primary.
- bool primary = (settings.brokerUrl == PRIMARY);
- QPID_LOG(notice, "HA: " << (primary ? "Primary" : "Backup")
- << " initialized: client-url=" << clientUrl
+ // Note all HA brokers start out in backup mode.
+ QPID_LOG(notice, "HA: Backup initialized: client-url=" << clientUrl
<< " broker-url=" << brokerUrl);
- if (!primary) backup.reset(new Backup(broker, s));
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
boost::shared_ptr<ReplicatingSubscription::Factory>(
new ReplicatingSubscription::Factory()));
- // Register a connection excluder
- broker.getConnectionObservers().add(
- boost::shared_ptr<broker::ConnectionObserver>(
- new ConnectionExcluder(boost::bind(&HaBroker::isPrimary, this))));
ManagementAgent* ma = broker.getManagementAgent();
if (!ma)
@@ -78,8 +72,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
if (ma) {
_qmf::Package packageInit(ma);
mgmtObject = new _qmf::HaBroker(ma, this);
- // FIXME aconway 2011-11-11: Placeholder - initialize cluster role.
- mgmtObject->set_status(isPrimary() ? PRIMARY : BACKUP);
+ mgmtObject->set_status(BACKUP);
ma->addObject(mgmtObject);
}
}
@@ -87,21 +80,22 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
HaBroker::~HaBroker() {}
Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
+ sys::Mutex::ScopedLock l(lock);
switch (methodId) {
case _qmf::HaBroker::METHOD_SETSTATUS: {
std::string status = dynamic_cast<_qmf::ArgsHaBrokerSetStatus&>(args).i_status;
if (status == PRIMARY) {
- if (!isPrimary()) {
+ if (backup.get()) {
+ // FIXME aconway 2012-01-26: create primary state before resetting backup
+ // as it allows client connections.
backup.reset();
- QPID_LOG(notice, "HA Primary: promoted from backup");
+ QPID_LOG(notice, "HA: Primary promoted from backup");
}
} else if (status == BACKUP) {
- if (isPrimary()) {
- backup.reset(new Backup(broker, settings));
- QPID_LOG(notice, "HA Backup: demoted from primary.");
- }
+ if (!backup.get())
+ throw Exception("HA: Primary cannot be demoted");
} else {
- QPID_LOG(error, "Attempt to set invalid HA status: " << status);
+ throw Exception("Invalid HA status: "+status);
}
mgmtObject->set_status(status);
break;
@@ -112,8 +106,4 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
return Manageable::STATUS_OK;
}
-bool HaBroker::isPrimary() const {
- return !backup.get(); // TODO aconway 2012-01-18: temporary test.
-}
-
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index aff518aa8d..8300de0ea8 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -24,6 +24,7 @@
#include "Settings.h"
#include "qpid/Url.h"
+#include "qpid/sys/Mutex.h"
#include "qmf/org/apache/qpid/ha/HaBroker.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h"
#include "qpid/management/Manageable.h"
@@ -52,8 +53,8 @@ class HaBroker : public management::Manageable
management::Manageable::status_t ManagementMethod (
uint32_t methodId, management::Args& args, std::string& text);
- bool isPrimary() const;
private:
+ sys::Mutex lock;
broker::Broker& broker;
Settings settings;
Url clientUrl, brokerUrl;
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index b2b4d89b0f..f7f446b893 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -513,7 +513,7 @@ class BrokerTest(TestCase):
finally: r.close()
return contents
- def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda d:m.content):
+ def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 7ccea16a08..bba3bc223a 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -56,8 +56,8 @@ class ShortTests(BrokerTest):
self.wait(bs, address)
bs.connection.close()
- def set_ha_status(self, address, status):
- os.system("qpid-ha-status %s %s"%(address, status))
+ def promote(self, broker):
+ os.system("qpid-ha-status %s primary"%(broker.host_port()))
def assert_missing(self, session, address):
try:
@@ -122,7 +122,8 @@ class ShortTests(BrokerTest):
b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind.
self.assert_browse_retry(b, prefix+"q4", ["6","7"])
- primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
+ primary = self.ha_broker(name="primary")
+ self.promote(primary)
p = primary.connect().session()
# Create config, send messages before starting the backup, to test catch-up replication.
@@ -164,7 +165,8 @@ class ShortTests(BrokerTest):
def test_sync(self):
def queue(name, replicate):
return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate))
- primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
+ primary = self.ha_broker(name="primary")
+ self.promote(primary)
p = primary.connect().session()
s = p.sender(queue("q","messages"))
for m in [str(i) for i in range(0,10)]: s.send(m)
@@ -186,7 +188,8 @@ class ShortTests(BrokerTest):
def test_send_receive(self):
"""Verify sequence numbers of messages sent by qpid-send"""
- primary = self.ha_broker(name="primary", broker_url="primary")
+ primary = self.ha_broker(name="primary")
+ self.promote(primary)
backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port())
backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port())
sender = self.popen(
@@ -219,7 +222,8 @@ class ShortTests(BrokerTest):
def test_failover(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover
- primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary
+ primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL)
+ self.promote(primary)
backup = self.ha_broker(name="backup", broker_url=primary.host_port())
# Check that backup rejects normal connections
try:
@@ -237,12 +241,13 @@ class ShortTests(BrokerTest):
sender.send("foo")
primary.kill()
assert retry(lambda: not is_running(primary.pid))
- self.set_ha_status(backup.host_port(), "primary") # Promote the backup
+ self.promote(backup)
self.assert_browse_retry(s, "q", ["foo"])
c.close()
def test_failover_cpp(self):
- primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary
+ primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL)
+ self.promote(primary)
backup = self.ha_broker(name="backup", broker_url=primary.host_port())
url="%s,%s"%(primary.host_port(), backup.host_port())
primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate()))
@@ -257,7 +262,7 @@ class ShortTests(BrokerTest):
primary.kill()
assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
- self.set_ha_status(backup.host_port(), "primary")
+ self.promote(backup)
n = receiver.received # Make sure we are still running
assert retry(lambda: receiver.received > n + 10)
sender.stop()
diff --git a/qpid/tools/src/py/qpid-ha-status b/qpid/tools/src/py/qpid-ha-status
index b4234cc051..20804b678d 100755
--- a/qpid/tools/src/py/qpid-ha-status
+++ b/qpid/tools/src/py/qpid-ha-status
@@ -38,11 +38,8 @@ def validate_status(value):
class HaBroker:
def __init__(self, broker, session):
self.session = session
- try:
- self.qmf_broker = self.session.addBroker(
- broker, client_properties={"qpid.ha-admin":1})
- except Exception, e:
- raise Exception("Can't connect to %s: %s"%(broker,e))
+ self.qmf_broker = self.session.addBroker(
+ broker, client_properties={"qpid.ha-admin":1})
ha_brokers=self.session.getObjects(_class="habroker", _package="org.apache.qpid.ha")
if (not ha_brokers): raise Exception("Broker does not have HA enabled.")
self.ha_broker = ha_brokers[0];