From d1e2f3cdba108b57a5305607eb4336fbfd1f2a06 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 13 Feb 2012 16:17:52 +0000 Subject: QPID-3603: Simplified Link failover. - Moved timer from LinkRegistry to Link. - Got rid of remapping code, simplified failover. - Faster interval for maintenance intervals. - Test for simple HA broker failover. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1243577 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Link.cpp | 77 ++++++--- qpid/cpp/src/qpid/broker/Link.h | 225 ++++++++++++++------------- qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 75 ++------- qpid/cpp/src/qpid/broker/LinkRegistry.h | 29 +--- qpid/cpp/src/qpid/broker/RetryList.h | 3 +- qpid/cpp/src/qpid/ha/Backup.cpp | 23 ++- qpid/cpp/src/qpid/ha/Backup.h | 2 + qpid/cpp/src/qpid/ha/HaBroker.cpp | 22 ++- qpid/cpp/src/qpid/ha/HaBroker.h | 1 - qpid/cpp/src/tests/ha_tests.py | 33 +++- qpid/cpp/src/tests/reliable_replication_test | 1 - 11 files changed, 239 insertions(+), 252 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 2c053220a4..cd5b89e1ad 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/LinkRegistry.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" +#include "qpid/sys/Timer.h" #include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h" #include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h" #include "boost/bind.hpp" @@ -31,19 +32,35 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/AclModule.h" -using namespace qpid::broker; -using qpid::framing::Buffer; -using qpid::framing::FieldTable; -using qpid::framing::UnauthorizedAccessException; -using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED; -using qpid::management::ManagementAgent; -using qpid::management::ManagementObject; -using qpid::management::Manageable; -using qpid::management::Args; -using qpid::sys::Mutex; +namespace qpid { +namespace broker { + +using framing::Buffer; +using framing::FieldTable; +using framing::UnauthorizedAccessException; +using framing::connection::CLOSE_CODE_CONNECTION_FORCED; +using management::ManagementAgent; +using management::ManagementObject; +using management::Manageable; +using management::Args; +using sys::Mutex; using std::stringstream; using std::string; -namespace _qmf = qmf::org::apache::qpid::broker; +namespace _qmf = ::qmf::org::apache::qpid::broker; + +struct LinkTimerTask : public sys::TimerTask { + LinkTimerTask(Link& l, sys::Timer& t) + : TimerTask(/*FIXME*/100*sys::TIME_MSEC, "Link retry timer"), link(l), timer(t) {} + + void fire() { + link.maintenanceVisit(); // FIXME aconway 2012-01-31: + setupNextFire(); + timer.add(this); + } + + Link& link; + sys::Timer& timer; +}; Link::Link(LinkRegistry* _links, MessageStore* _store, @@ -67,7 +84,8 @@ Link::Link(LinkRegistry* _links, reconnectNext(0), // Index of next address for reconnecting in url. channelCounter(1), connection(0), - agent(0) + agent(0), + timerTask(new LinkTimerTask(*this, broker->getTimer())) { if (parent != 0 && broker != 0) { @@ -80,10 +98,12 @@ Link::Link(LinkRegistry* _links, } setStateLH(STATE_WAITING); startConnectionLH(); + broker->getTimer().add(timerTask); } Link::~Link () { + timerTask->cancel(); if (state == STATE_OPERATIONAL && connection != 0) connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); @@ -121,7 +141,9 @@ void Link::startConnectionLH () broker->connect (host, boost::lexical_cast(port), transport, boost::bind (&Link::closed, this, _1, _2)); QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); - } catch(std::exception& e) { + } catch(const std::exception& e) { + QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: " + << e.what()); setStateLH(STATE_WAITING); if (!hideManagement()) mgmtObject->set_lastError (e.what()); @@ -156,14 +178,16 @@ void Link::setUrl(const Url& u) { void Link::opened() { Mutex::ScopedLock mutex(lock); assert(connection); - // Get default URL from known-hosts. - const std::vector& known = connection->getKnownHosts(); - // Flatten vector of URLs into a single URL listing all addresses. - url.clear(); - for(size_t i = 0; i < known.size(); ++i) - url.insert(url.end(), known[i].begin(), known[i].end()); - reconnectNext = 0; - QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url); + // Get default URL from known-hosts if not already set + if (url.empty()) { + const std::vector& known = connection->getKnownHosts(); + // Flatten vector of URLs into a single URL listing all addresses. + url.clear(); + for(size_t i = 0; i < known.size(); ++i) + url.insert(url.end(), known[i].begin(), known[i].end()); + reconnectNext = 0; + QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url); + } } void Link::closed(int, std::string text) @@ -176,7 +200,7 @@ void Link::closed(int, std::string text) if (state == STATE_OPERATIONAL) { stringstream addr; addr << host << ":" << port; - QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str()); + QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str()); if (!hideManagement() && agent) agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); } @@ -333,7 +357,7 @@ void Link::maintenanceVisit () connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } -void Link::reconnect(const qpid::Address& a) +void Link::reconnect(const Address& a) { Mutex::ScopedLock mutex(lock); host = a.host; @@ -347,13 +371,14 @@ void Link::reconnect(const qpid::Address& a) } } -bool Link::tryFailoverLH() { // FIXME aconway 2012-01-30: lock held? +bool Link::tryFailoverLH() { if (reconnectNext >= url.size()) reconnectNext = 0; if (url.empty()) return false; Address next = url[reconnectNext++]; if (next.host != host || next.port != port || next.protocol != transport) { links->changeAddress(Address(transport, host, port), next); - QPID_LOG(debug, "Link failing over to " << host << ":" << port); + QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port); + reconnect(next); return true; } return false; @@ -510,3 +535,5 @@ void Link::setPassive(bool passive) } } } + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index 9c82eda000..9ee4837040 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -35,115 +35,122 @@ #include namespace qpid { - namespace broker { - - class LinkRegistry; - class Broker; - class Connection; - - class Link : public PersistableConfig, public management::Manageable { - private: - sys::Mutex lock; - LinkRegistry* links; - MessageStore* store; - std::string host; - uint16_t port; - std::string transport; - bool durable; - std::string authMechanism; - std::string username; - std::string password; - mutable uint64_t persistenceId; - qmf::org::apache::qpid::broker::Link* mgmtObject; - Broker* broker; - int state; - uint32_t visitCount; - uint32_t currentInterval; - bool closing; - Url url; // URL can contain many addresses. - size_t reconnectNext; // Index for next re-connect attempt - - typedef std::vector Bridges; - Bridges created; // Bridges pending creation - Bridges active; // Bridges active - Bridges cancellations; // Bridges pending cancellation - uint channelCounter; - Connection* connection; - management::ManagementAgent* agent; - - static const int STATE_WAITING = 1; - static const int STATE_CONNECTING = 2; - static const int STATE_OPERATIONAL = 3; - static const int STATE_FAILED = 4; - static const int STATE_CLOSED = 5; - static const int STATE_PASSIVE = 6; - - static const uint32_t MAX_INTERVAL = 32; - - void setStateLH (int newState); - void startConnectionLH(); // Start the IO Connection - void destroy(); // Called when mgmt deletes this link - void ioThreadProcessing(); // Called on connection's IO thread by request - bool tryFailoverLH(); // Called during maintenance visit - bool hideManagement() const; - - public: - typedef boost::shared_ptr shared_ptr; - - Link(LinkRegistry* links, - MessageStore* store, - const std::string& host, - uint16_t port, - const std::string& transport, - bool durable, - const std::string& authMechanism, - const std::string& username, - const std::string& password, - Broker* broker, - management::Manageable* parent = 0); - virtual ~Link(); - - std::string getHost() { return host; } - uint16_t getPort() { return port; } - std::string getTransport() { return transport; } - - bool isDurable() { return durable; } - void maintenanceVisit (); - uint nextChannel(); - void add(Bridge::shared_ptr); - void cancel(Bridge::shared_ptr); - void setUrl(const Url&); // Set URL for reconnection. - - void established(); // Called when connection is create - void opened(); // Called when connection is open (after create) - void closed(int, std::string); // Called when connection goes away - void setConnection(Connection*); // Set pointer to the AMQP Connection - void reconnect(const Address&); //called by LinkRegistry - void close(); // Close the link from within the broker. - - std::string getAuthMechanism() { return authMechanism; } - std::string getUsername() { return username; } - std::string getPassword() { return password; } - Broker* getBroker() { return broker; } - - void notifyConnectionForced(const std::string text); - void setPassive(bool p); - - // PersistableConfig: - void setPersistenceId(uint64_t id) const; - uint64_t getPersistenceId() const { return persistenceId; } - uint32_t encodedSize() const; - void encode(framing::Buffer& buffer) const; - const std::string& getName() const; - - static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); - - // Manageable entry points - management::ManagementObject* GetManagementObject(void) const; - management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&); - - }; - } + +namespace sys { +class TimerTask; +} + +namespace broker { + +class LinkRegistry; +class Broker; +class Connection; + +class Link : public PersistableConfig, public management::Manageable { + private: + sys::Mutex lock; + LinkRegistry* links; + MessageStore* store; + std::string host; + uint16_t port; + std::string transport; + bool durable; + std::string authMechanism; + std::string username; + std::string password; + mutable uint64_t persistenceId; + qmf::org::apache::qpid::broker::Link* mgmtObject; + Broker* broker; + int state; + uint32_t visitCount; + uint32_t currentInterval; + bool closing; + Url url; // URL can contain many addresses. + size_t reconnectNext; // Index for next re-connect attempt + + typedef std::vector Bridges; + Bridges created; // Bridges pending creation + Bridges active; // Bridges active + Bridges cancellations; // Bridges pending cancellation + uint channelCounter; + Connection* connection; + management::ManagementAgent* agent; + + boost::intrusive_ptr timerTask; + + static const int STATE_WAITING = 1; + static const int STATE_CONNECTING = 2; + static const int STATE_OPERATIONAL = 3; + static const int STATE_FAILED = 4; + static const int STATE_CLOSED = 5; + static const int STATE_PASSIVE = 6; + + static const uint32_t MAX_INTERVAL = 32; + + void setStateLH (int newState); + void startConnectionLH(); // Start the IO Connection + void destroy(); // Called when mgmt deletes this link + void ioThreadProcessing(); // Called on connection's IO thread by request + bool tryFailoverLH(); // Called during maintenance visit + bool hideManagement() const; + + public: + typedef boost::shared_ptr shared_ptr; + + Link(LinkRegistry* links, + MessageStore* store, + const std::string& host, + uint16_t port, + const std::string& transport, + bool durable, + const std::string& authMechanism, + const std::string& username, + const std::string& password, + Broker* broker, + management::Manageable* parent = 0); + virtual ~Link(); + + std::string getHost() { return host; } + uint16_t getPort() { return port; } + std::string getTransport() { return transport; } + + bool isDurable() { return durable; } + void maintenanceVisit (); + uint nextChannel(); + void add(Bridge::shared_ptr); + void cancel(Bridge::shared_ptr); + void setUrl(const Url&); // Set URL for reconnection. + + void established(); // Called when connection is create + void opened(); // Called when connection is open (after create) + void closed(int, std::string); // Called when connection goes away + void setConnection(Connection*); // Set pointer to the AMQP Connection + void reconnect(const Address&); //called by LinkRegistry + void close(); // Close the link from within the broker. + + std::string getAuthMechanism() { return authMechanism; } + std::string getUsername() { return username; } + std::string getPassword() { return password; } + Broker* getBroker() { return broker; } + + void notifyConnectionForced(const std::string text); + void setPassive(bool p); + + // PersistableConfig: + void setPersistenceId(uint64_t id) const; + uint64_t getPersistenceId() const { return persistenceId; } + uint32_t encodedSize() const; + void encode(framing::Buffer& buffer) const; + const std::string& getName() const; + + static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); + + // Manageable entry points + management::ManagementObject* GetManagementObject(void) const; + management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&); + +}; +} } diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index bb21ca01ab..ef6bf95b0b 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -35,15 +35,12 @@ using boost::format; using boost::str; namespace _qmf = qmf::org::apache::qpid::broker; -#define LINK_MAINT_INTERVAL 2 - // TODO: This constructor is only used by the store unit tests - // That probably indicates that LinkRegistry isn't correctly -// factored: The persistence element and maintenance element -// should be factored separately +// factored: The persistence element should be factored separately LinkRegistry::LinkRegistry () : - broker(0), timer(0), - parent(0), store(0), passive(false), passiveChanged(false), + broker(0), + parent(0), store(0), passive(false), realm("") { } @@ -60,79 +57,32 @@ struct ConnectionObserverImpl : public ConnectionObserver { } LinkRegistry::LinkRegistry (Broker* _broker) : - broker(_broker), timer(&broker->getTimer()), - maintenanceTask(new Periodic(*this)), - parent(0), store(0), passive(false), passiveChanged(false), + broker(_broker), + parent(0), store(0), passive(false), realm(broker->getOptions().realm) { - timer->add(maintenanceTask); broker->getConnectionObservers().add( boost::shared_ptr(new ConnectionObserverImpl(*this))); } -LinkRegistry::~LinkRegistry() -{ - // This test is only necessary if the default constructor above is present - if (maintenanceTask) - maintenanceTask->cancel(); -} - -LinkRegistry::Periodic::Periodic (LinkRegistry& _links) : - TimerTask (Duration (LINK_MAINT_INTERVAL * TIME_SEC),"LinkRegistry"), links(_links) {} - -void LinkRegistry::Periodic::fire () -{ - links.periodicMaintenance (); - setupNextFire(); - links.timer->add(this); -} - -void LinkRegistry::periodicMaintenance () -{ - Mutex::ScopedLock locker(lock); +LinkRegistry::~LinkRegistry() {} - linksToDestroy.clear(); - bridgesToDestroy.clear(); - if (passiveChanged) { - if (passive) { QPID_LOG(info, "Passivating links"); } - else { QPID_LOG(info, "Activating links"); } - for (LinkMap::iterator i = links.begin(); i != links.end(); i++) { - i->second->setPassive(passive); - } - passiveChanged = false; - } - for (LinkMap::iterator i = links.begin(); i != links.end(); i++) - i->second->maintenanceVisit(); - //now process any requests for re-addressing - for (AddressMap::iterator i = reMappings.begin(); i != reMappings.end(); i++) - updateAddress(i->first, i->second); - reMappings.clear(); -} void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress) { - //done on periodic maintenance thread; hold changes in separate - //map to avoid modifying the link map that is iterated over - reMappings[createKey(oldAddress)] = newAddress; -} - -bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::Address& newAddress) -{ + Mutex::ScopedLock locker(lock); + std::string oldKey = createKey(oldAddress); std::string newKey = createKey(newAddress); if (links.find(newKey) != links.end()) { QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use"); - return false; } else { LinkMap::iterator i = links.find(oldKey); if (i == links.end()) { QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey); - return false; } else { links[newKey] = i->second; - i->second->reconnect(newAddress); links.erase(oldKey); QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey); - return true; } } } @@ -230,7 +180,6 @@ void LinkRegistry::destroy(const string& host, const uint16_t port) { if (i->second->isDurable() && store) store->destroy(*(i->second)); - linksToDestroy[key] = i->second; links.erase(i); } } @@ -258,7 +207,6 @@ void LinkRegistry::destroy(const std::string& host, l->second->cancel(b->second); if (b->second->isDurable()) store->destroy(*(b->second)); - bridgesToDestroy[bridgeKey] = b->second; bridges.erase(b); } @@ -406,9 +354,12 @@ std::string LinkRegistry::createKey(const std::string& host, uint16_t port) { void LinkRegistry::setPassive(bool p) { Mutex::ScopedLock locker(lock); - passiveChanged = p != passive; passive = p; - //will activate or passivate links on maintenance visit + if (passive) { QPID_LOG(info, "Passivating links"); } + else { QPID_LOG(info, "Activating links"); } + for (LinkMap::iterator i = links.begin(); i != links.end(); i++) { + i->second->setPassive(passive); + } } void LinkRegistry::eachLink(boost::function)> f) { diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h index e66fca7d6b..ef4871192f 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.h +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,7 +27,6 @@ #include "qpid/broker/MessageStore.h" #include "qpid/Address.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include #include @@ -40,40 +39,19 @@ namespace broker { class Broker; class Connection; class LinkRegistry { - - // Declare a timer task to manage the establishment of link connections and the - // re-establishment of lost link connections. - struct Periodic : public sys::TimerTask - { - LinkRegistry& links; - - Periodic(LinkRegistry& links); - virtual ~Periodic() {}; - void fire(); - }; - typedef std::map > LinkMap; typedef std::map BridgeMap; - typedef std::map AddressMap; LinkMap links; - LinkMap linksToDestroy; BridgeMap bridges; - BridgeMap bridgesToDestroy; - AddressMap reMappings; qpid::sys::Mutex lock; Broker* broker; - sys::Timer* timer; - boost::intrusive_ptr maintenanceTask; management::Manageable* parent; MessageStore* store; bool passive; - bool passiveChanged; std::string realm; - void periodicMaintenance (); - bool updateAddress(const std::string& oldKey, const Address& newAddress); boost::shared_ptr findLink(const std::string& key); static std::string createKey(const Address& address); static std::string createKey(const std::string& host, uint16_t port); @@ -147,6 +125,7 @@ namespace broker { * Called by links failing over to new address */ void changeAddress(const Address& oldAddress, const Address& newAddress); + /** * Called to alter passive state. In passive state the links * and bridges managed by a link registry will be recorded and @@ -155,7 +134,7 @@ namespace broker { */ void setPassive(bool); - + /** Iterate over each link in the registry. Used for cluster updates. */ void eachLink(boost::function)> f); /** Iterate over each bridge in the registry. Used for cluster updates. */ diff --git a/qpid/cpp/src/qpid/broker/RetryList.h b/qpid/cpp/src/qpid/broker/RetryList.h index 242a7d2122..9c4b779bcb 100644 --- a/qpid/cpp/src/qpid/broker/RetryList.h +++ b/qpid/cpp/src/qpid/broker/RetryList.h @@ -23,7 +23,6 @@ */ #include "qpid/broker/BrokerImportExport.h" -#include "qpid/Address.h" #include "qpid/Url.h" namespace qpid { @@ -36,7 +35,7 @@ namespace broker { class RetryList { public: - QPID_BROKER_EXTERN RetryList(); + QPID_BROKER_EXTERN RetryList(); QPID_BROKER_EXTERN void reset(const std::vector& urls); QPID_BROKER_EXTERN bool next(Address& address); private: diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 1ab24057ce..32f87dc722 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -46,32 +46,39 @@ using std::string; Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s), excluder(new ConnectionExcluder()) { - Url url(s.brokerUrl); - string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); +} +void Backup::initialize(const Url& url) { + QPID_LOG(notice, "Ha: Backup started: " << url); + string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; // Declare the link std::pair result = broker.getLinks().declare( url[0].host, url[0].port, protocol, false, // durable - s.mechanism, s.username, s.password); + settings.mechanism, settings.username, settings.password); assert(result.second); // FIXME aconway 2011-11-23: error handling link = result.first; - link->setUrl(Url(s.brokerUrl)); + link->setUrl(url); replicator.reset(new BrokerReplicator(link)); broker.getExchanges().registerExchange(replicator); - broker.getConnectionObservers().add(excluder); } void Backup::setUrl(const Url& url) { sys::Mutex::ScopedLock l(lock); - link->setUrl(url); + if (!replicator.get()) + initialize(url); + else { + QPID_LOG(info, "HA: Backup URL set to " << url); + link->setUrl(url); + } } Backup::~Backup() { - link->close(); - broker.getExchanges().destroy(replicator->getName()); + if (link) link->close(); + if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); broker.getConnectionObservers().remove(excluder); // Allows client connections. } diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h index 3a9e739c98..3ebb6d018d 100644 --- a/qpid/cpp/src/qpid/ha/Backup.h +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -52,6 +52,8 @@ class Backup void setUrl(const Url&); private: + void initialize(const Url&); + sys::Mutex lock; broker::Broker& broker; Settings settings; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index ad97f87a62..c7692ac9a2 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -41,7 +41,9 @@ using namespace std; namespace { Url url(const std::string& s, const std::string& id) { try { - return Url(s); + // Allow the URL to be empty, used in tests that set the URL + // after starting broker + return s.empty() ? Url() : Url(s); } catch (const std::exception& e) { throw Exception(Msg() << "Invalid URL for " << id << ": '" << s << "'"); } @@ -55,15 +57,9 @@ const std::string BACKUP="backup"; HaBroker::HaBroker(broker::Broker& b, const Settings& s) : broker(b), settings(s), - brokerUrl(url(s.brokerUrl, "ha-broker-url")), - clientUrl(s.clientUrl.empty() ? brokerUrl : url(s.clientUrl, "ha-client-url")), backup(new Backup(b, s)), mgmtObject(0) { - // Note all HA brokers start out in backup mode. - QPID_LOG(notice, "HA: Backup initialized: " - << " broker-url=" << brokerUrl - << " client-url=" << clientUrl); // Register a factory for replicating subscriptions. broker.getConsumerFactories().add( boost::shared_ptr( @@ -96,15 +92,17 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, break; } case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: { - clientUrl = dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args).i_clientAddresses; - mgmtObject->set_clientAddresses(clientUrl.str()); + string url = dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args) + .i_clientAddresses; + mgmtObject->set_clientAddresses(url); // FIXME aconway 2012-01-30: upate status for new URL break; } case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: { - brokerUrl = dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args).i_brokerAddresses; - mgmtObject->set_brokerAddresses(brokerUrl.str()); - if (backup.get()) backup->setUrl(brokerUrl); + string url = dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args) + .i_brokerAddresses; + mgmtObject->set_brokerAddresses(url); + if (backup.get()) backup->setUrl(Url(url)); break; } default: diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index affaa7486f..e3dc46946b 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -57,7 +57,6 @@ class HaBroker : public management::Manageable sys::Mutex lock; broker::Broker& broker; Settings settings; - Url brokerUrl, clientUrl; std::auto_ptr backup; qmf::org::apache::qpid::ha::HaBroker* mgmtObject; }; diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index e782b57f7f..ddcc19ee1e 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -30,11 +30,12 @@ log = getLogger("qpid.ha-tests") class HaBroker(Broker): def __init__(self, test, args=[], broker_url=None, **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" - Broker.__init__(self, test, - args=["--load-module", BrokerTest.ha_lib, - "--ha-enable=yes", - "--ha-broker-url", broker_url ], - **kwargs) + args=["--load-module", BrokerTest.ha_lib, + "--log-enable=debug+:ha::", # FIXME aconway 2012-01-31: + "--log-enable=debug+:Link", + "--ha-enable=yes"] + if broker_url: args += [ "--ha-broker-url", broker_url ] + Broker.__init__(self, test, args, **kwargs) def promote(self): assert os.system("qpid-ha-tool --promote %s"%(self.host_port())) == 0 @@ -237,13 +238,13 @@ class ShortTests(BrokerTest): def test_failover(self): """Verify that backups rejects connections and that fail-over works in python client""" - getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover + getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) # Check that backup rejects normal connections try: - backup.connect() + backup.connect().session() self.fail("Expected connection to backup to fail") except ConnectionError: pass # Check that admin connections are allowed to backup. @@ -284,6 +285,24 @@ class ShortTests(BrokerTest): sender.stop() receiver.stop() + def test_backup_failover(self): + # FIXME aconway 2012-01-30: UNFINISHED + brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) + for name in ["a","b","c"] ] + url = ",".join([b.host_port() for b in brokers]) + for b in brokers: b.set_broker_url(url) + brokers[0].promote() + brokers[0].connect().session().sender( + "q;{create:always,%s}"%(self.qpid_replicate())).send("a") + for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"]) + # FIXME aconway 2012-01-30: failing - not using set URL? + brokers[0].kill() + brokers[2].promote() # c must fail over to b. + brokers[2].connect().session().sender("q").send("b") + self.assert_browse_backup(brokers[1], "q", ["a","b"]) + # FIXME aconway 2012-01-30: finish + for b in brokers[1:]: b.kill() + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) diff --git a/qpid/cpp/src/tests/reliable_replication_test b/qpid/cpp/src/tests/reliable_replication_test index 273e482da0..1f1dac5f2d 100755 --- a/qpid/cpp/src/tests/reliable_replication_test +++ b/qpid/cpp/src/tests/reliable_replication_test @@ -66,7 +66,6 @@ receive() { bounce_link() { $PYTHON_COMMANDS/qpid-route link del "localhost:$BROKER_B" "localhost:$BROKER_A" -# sleep 2 $PYTHON_COMMANDS/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication } -- cgit v1.2.1