diff options
author | Alan Conway <aconway@apache.org> | 2012-01-25 18:48:46 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-01-25 18:48:46 +0000 |
commit | 1986c2fa3d11de7856458d5eae34fdc3f80a42b0 (patch) | |
tree | e025df507357546bf6465e22e9ea287881ff0ee6 | |
parent | fe4d3dd164c2685cb7483c65e2e915f93d13e7e6 (diff) | |
download | qpid-python-1986c2fa3d11de7856458d5eae34fdc3f80a42b0.tar.gz |
QPID-3603: Use client-properties to signal admin connections.
qpid-ha-admin sets a client-property to indicate an admin connection.
Added support for setting client-property in python clients.
Added support for getting client-property in a broker.
ConnectionExcluder checks admin property.
Removed old user-based checks.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1235870 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/ha.mk | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionObserver.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp | 44 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ConnectionExcluder.h | 33 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 41 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaPlugin.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Settings.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/management-schema.xml | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 46 | ||||
-rw-r--r-- | qpid/python/qpid/messaging/driver.py | 3 | ||||
-rwxr-xr-x | qpid/tools/src/py/qpid-ha-status | 5 |
15 files changed, 141 insertions, 60 deletions
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index 6874449cc5..8a2cee30c7 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -35,6 +35,7 @@ ha_la_SOURCES = \ qpid/ha/ReplicatingSubscription.cpp \ qpid/ha/BrokerReplicator.cpp \ qpid/ha/BrokerReplicator.h \ + qpid/ha/ConnectionExcluder.cpp \ qpid/ha/ConnectionExcluder.h ha_la_LIBADD = libqpidbroker.la diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 6186c06a3c..855172bc43 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -165,6 +165,9 @@ class Connection : public sys::ConnectionInputHandler, // Used by cluster during catch-up, see cluster::OutputInterceptor void doIoCallbacks(); + void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; } + const framing::FieldTable& getClientProperties() const { return clientProperties; } + private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<boost::shared_ptr<Queue> >::iterator queue_iterator; @@ -186,6 +189,8 @@ class Connection : public sys::ConnectionInputHandler, ErrorListener* errorListener; uint64_t objectId; bool shadow; + framing::FieldTable clientProperties; + /** * Chained ConnectionOutputHandler that allows outgoing frames to be * tracked (for updating mgmt stats). diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 6048a46f79..f1d43c5cdb 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -158,6 +158,8 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body) throw; } const framing::FieldTable& clientProperties = body.getClientProperties(); + connection.setClientProperties(clientProperties); + connection.setFederationLink(clientProperties.get(QPID_FED_LINK)); if (clientProperties.isSet(QPID_FED_TAG)) { connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG)); diff --git a/qpid/cpp/src/qpid/broker/ConnectionObserver.h b/qpid/cpp/src/qpid/broker/ConnectionObserver.h index 12aa8549fd..eea2981185 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionObserver.h +++ b/qpid/cpp/src/qpid/broker/ConnectionObserver.h @@ -22,6 +22,8 @@ * */ +#include <string> + namespace qpid { namespace broker { diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 6b877314e0..bbf3f4ae73 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -118,12 +118,10 @@ const string S_NONE="none"; const string S_WIRING="wiring"; const string S_ALL="all"; -ReplicateLevel replicateLevel(const string& str) { - string value(str.size(), '\0'); - transform(str.begin(), str.end(), value.begin(), &tolower); +ReplicateLevel replicateLevel(const string& level) { ReplicateLevel rl = RL_NONE; - if (value == S_WIRING) rl = RL_WIRING; - else if (value == S_ALL) rl = RL_ALL; + if (level == S_WIRING) rl = RL_WIRING; + else if (level == S_ALL) rl = RL_ALL; return rl; } diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp new file mode 100644 index 0000000000..396217e0ff --- /dev/null +++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ConnectionExcluder.h" +#include "qpid/broker/Connection.h" +#include <boost/function.hpp> +#include <sstream> + +namespace qpid { +namespace ha { + +ConnectionExcluder::ConnectionExcluder(PrimaryTest isPrimary_) : isPrimary(isPrimary_) {} + +void ConnectionExcluder::opened(broker::Connection& connection) { + if (!isPrimary() && !connection.isLink() + && !connection.getClientProperties().isSet(ADMIN_TAG)) + throw Exception( + QPID_MSG("HA: Backup broker rejected connection " << connection.getMgmtId())); + else + QPID_LOG(debug, "HA: Backup broker accepted connection" << connection.getMgmtId()); +} + +const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin"; + +}} // namespace qpid::ha + diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h index 2ea75efffd..e6c299884e 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h +++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h @@ -23,46 +23,35 @@ */ #include "qpid/broker/ConnectionObserver.h" -#include "qpid/broker/Connection.h" #include <boost/function.hpp> -#include <sstream> namespace qpid { + +namespace broker { +class Connection; +} + namespace ha { /** * Exclude normal connections to a backup broker. - * Connections as ha-admin user are allowed. + * Admin connections are identified by a special flag in client-properties + * during connection negotiation. */ class ConnectionExcluder : public broker::ConnectionObserver { public: typedef boost::function<bool()> PrimaryTest; - ConnectionExcluder(string adminUser_, PrimaryTest isPrimary_) - : adminUser(adminUser_), isPrimary(isPrimary_) {} + ConnectionExcluder(PrimaryTest isPrimary_); - void opened(broker::Connection& connection) { - if (!isPrimary() && !connection.isLink() - && !connection.isAuthenticatedUser(adminUser)) - { - throw Exception( - QPID_MSG( - "HA: Backup broker rejected connection " - << connection.getMgmtId() << " by user " << connection.getUserId() - << ". Only " << adminUser << " can connect to a backup.")); - } - else { - QPID_LOG(debug, "HA: Backup broker accepted connection" - << connection.getMgmtId() << " by user " - << connection.getUserId()); - } - } + void opened(broker::Connection& connection); private: - string adminUser; + static const std::string ADMIN_TAG; PrimaryTest isPrimary; }; + }} // namespace qpid::ha #endif /*!QPID_HA_CONNECTIONEXCLUDER_H*/ diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 21d918bed9..a1350e492a 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -44,24 +44,21 @@ Url url(const std::string& s, const std::string& id) { throw Exception(Msg() << "Invalid URL for " << id << ": '" << s << "'"); } } + +const std::string PRIMARY="primary"; +const std::string BACKUP="backup"; + } // namespace HaBroker::HaBroker(broker::Broker& b, const Settings& s) : broker(b), + settings(s), clientUrl(url(s.clientUrl, "ha-client-url")), brokerUrl(url(s.brokerUrl, "ha-broker-url")), mgmtObject(0) { - ManagementAgent* ma = broker.getManagementAgent(); - if (ma) { - _qmf::Package packageInit(ma); - mgmtObject = new _qmf::HaBroker(ma, this); - // FIXME aconway 2011-11-11: Placeholder - initialize cluster role. - mgmtObject->set_status("solo"); - ma->addObject(mgmtObject); - } // FIXME aconway 2011-11-22: temporary hack to identify primary. - bool primary = (s.brokerUrl == "primary"); + bool primary = (settings.brokerUrl == PRIMARY); QPID_LOG(notice, "HA: " << (primary ? "Primary" : "Backup") << " initialized: client-url=" << clientUrl << " broker-url=" << brokerUrl); @@ -73,8 +70,16 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) // Register a connection excluder broker.getConnectionObservers().add( boost::shared_ptr<broker::ConnectionObserver>( - new ConnectionExcluder( - s.adminUser, boost::bind(&HaBroker::isPrimary, this)))); + new ConnectionExcluder(boost::bind(&HaBroker::isPrimary, this)))); + + ManagementAgent* ma = broker.getManagementAgent(); + if (ma) { + _qmf::Package packageInit(ma); + mgmtObject = new _qmf::HaBroker(ma, this); + // FIXME aconway 2011-11-11: Placeholder - initialize cluster role. + mgmtObject->set_status(isPrimary() ? PRIMARY : BACKUP); + ma->addObject(mgmtObject); + } } HaBroker::~HaBroker() {} @@ -83,7 +88,19 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, switch (methodId) { case _qmf::HaBroker::METHOD_SETSTATUS: { std::string status = dynamic_cast<_qmf::ArgsHaBrokerSetStatus&>(args).i_status; - // FIXME aconway 2011-11-11: placeholder, validate & execute status change. + if (status == PRIMARY) { + if (!isPrimary()) { + backup.reset(); + QPID_LOG(notice, "HA Primary: promoted from backup"); + } + } else if (status == BACKUP) { + if (isPrimary()) { + backup.reset(new Backup(broker, settings)); + QPID_LOG(notice, "HA Backup: demoted from primary."); + } + } else { + QPID_LOG(error, "Attempt to set invalid HA status: " << status); + } mgmtObject->set_status(status); break; } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 18e6156850..aff518aa8d 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -22,6 +22,7 @@ * */ +#include "Settings.h" #include "qpid/Url.h" #include "qmf/org/apache/qpid/ha/HaBroker.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h" @@ -33,7 +34,6 @@ namespace broker { class Broker; } namespace ha { -class Settings; class Backup; /** @@ -55,6 +55,7 @@ class HaBroker : public management::Manageable bool isPrimary() const; private: broker::Broker& broker; + Settings settings; Url clientUrl, brokerUrl; std::auto_ptr<Backup> backup; qmf::org::apache::qpid::ha::HaBroker* mgmtObject; diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index b7ab5ada2d..fc9e48411d 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -37,7 +37,6 @@ struct Options : public qpid::Options { ("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers") ("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers") ("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers") - ("ha-admin-user", optValue(settings.adminUser, "USER"), "User allowed to perform HA administration tasks") ; } }; diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h index 9d8821c571..a2d2e89d82 100644 --- a/qpid/cpp/src/qpid/ha/Settings.h +++ b/qpid/cpp/src/qpid/ha/Settings.h @@ -35,12 +35,11 @@ using std::string; class Settings { public: - Settings() : enabled(false), adminUser("qpid-ha-admin") {} + Settings() : enabled(false) {} bool enabled; string clientUrl; string brokerUrl; string username, password, mechanism; - string adminUser; private: }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml index bb06e77a69..0fd18a10d4 100644 --- a/qpid/cpp/src/qpid/ha/management-schema.xml +++ b/qpid/cpp/src/qpid/ha/management-schema.xml @@ -21,9 +21,9 @@ <!-- Monitor and control HA status of a broker. --> <class name="HaBroker"> - <property name="status" type="sstr" desc="HA status: PRIMARY, BACKUP, SOLO"/> + <property name="status" type="sstr" desc="HA status: primary, backup"/> - <method name="setStatus" desc="Set HA status: PRIMARY, BACKUP, SOLO"> + <method name="setStatus" desc="Set HA status: primary, backup"> <arg name="status" type="sstr" dir="I"/> </method> diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index bf044765b5..f3c45ba7a3 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -19,7 +19,7 @@ # import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil -from qpid.messaging import Message, NotFound, ConnectionError +from qpid.messaging import Message, NotFound, ConnectionError, Connection from brokertest import * from threading import Thread, Lock, Condition from logging import getLogger @@ -48,15 +48,18 @@ class ShortTests(BrokerTest): except NotFound: return False assert retry(check), "Timed out waiting for %s"%(address) - def assert_missing(self,session, address): + def set_ha_status(self, address, status): + os.system("qpid-ha-status %s %s"%(address, status)) + + def assert_missing(self, session, address): try: session.receiver(address) self.fail("Should not have been replicated: %s"%(address)) except NotFound: pass def connect_admin(self, backup, **kwargs): - """Connect to a backup broker as the admin user""" - return backup.connect(username="qpid-ha-admin", password="dummy", mechanism="PLAIN", **kwargs) + """Connect to a backup broker as an admin connection""" + return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs) def test_replication(self): """Test basic replication of wiring and messages before and @@ -113,6 +116,7 @@ class ShortTests(BrokerTest): primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary p = primary.connect().session() + # Create config, send messages before starting the backup, to test catch-up replication. setup(p, "1", primary) backup = self.ha_broker(name="backup", broker_url=primary.host_port()) @@ -120,10 +124,9 @@ class ShortTests(BrokerTest): setup(p, "2", primary) # Verify the data on the backup - b = self.connect_admin(backup, ).session() + b = self.connect_admin(backup).session() verify(b, "1", p) verify(b, "2", p) - # Test a series of messages, enqueue all then dequeue all. s = p.sender(queue("foo","all")) self.wait(b, "foo") @@ -174,6 +177,7 @@ class ShortTests(BrokerTest): self.assert_browse_retry(b2, "q", msgs) def test_send_receive(self): + """Verify sequence numbers of messages sent by qpid-send""" primary = self.ha_broker(name="primary", broker_url="primary") backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port()) backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port()) @@ -204,17 +208,35 @@ class ShortTests(BrokerTest): print self.browse(self.connect_admin(backup2).session(), "q", transform=sn) raise - def test_exclude(self): - """Verify that backup rejects connections""" - primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary + def test_failover(self): + """Verify that backups rejects connections and that fail-over works""" + primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary backup = self.ha_broker(name="backup", broker_url=primary.host_port()) - # Admin is allowed - self.connect_admin(backup) - # Others are not + # Check that backup rejects normal connections try: backup.connect() self.fail("Expected connection to backup to fail") except ConnectionError: pass + # Check that admin connections are allowed to backup. + self.connect_admin(backup).close() + + # Test discovery: should connect to primary after reject by backup + c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) + s = c.session() + s.sender("q;{create:always,%s}"%(self.qpid_replicate())).send("foo", sync=True) + # FIXME aconway 2012-01-23: we shouldn't need the wait and retry here, + # send(sync=True) shouldn't return till the backup acknowledges. + bs = self.connect_admin(backup).session() + self.wait(bs, "q") + self.assert_browse_retry(bs, "q", ["foo"]) + bs.connection.close() + + primary.kill() + # Promote the backup + self.set_ha_status(backup.host_port(), "primary") + # FIXME aconway 2012-01-23: should re-use session s below + self.assert_browse_retry(c.session(), "q", ["foo"]) + c.close() if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index 608791927f..ff657653e0 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -533,7 +533,8 @@ class Driver: log.warn("reconnect succeeded: %s:%s", host, port) self._next_retry = None self._attempts = 0 - self._host = 0 + # FIXME aconway 2012-01-23: is this correct +# self._host = 0 self._delay = self.connection.reconnect_interval_min self._retrying = False self.schedule() diff --git a/qpid/tools/src/py/qpid-ha-status b/qpid/tools/src/py/qpid-ha-status index c70e4c9af3..b4234cc051 100755 --- a/qpid/tools/src/py/qpid-ha-status +++ b/qpid/tools/src/py/qpid-ha-status @@ -27,7 +27,7 @@ Usage: qpid-ha-status [broker-address] [status] If status is specified, sets the HA status of the broker. Otherwise prints the current HA status. Status must be one of: primary, backup, solo. """ -STATUS_VALUES=["primary", "backup", "solo"] +STATUS_VALUES=["primary", "backup"] def is_valid_status(value): return value in STATUS_VALUES @@ -39,7 +39,8 @@ class HaBroker: def __init__(self, broker, session): self.session = session try: - self.qmf_broker = self.session.addBroker(broker) + self.qmf_broker = self.session.addBroker( + broker, client_properties={"qpid.ha-admin":1}) except Exception, e: raise Exception("Can't connect to %s: %s"%(broker,e)) ha_brokers=self.session.getObjects(_class="habroker", _package="org.apache.qpid.ha") |