summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-13 16:17:52 +0000
committerAlan Conway <aconway@apache.org>2012-02-13 16:17:52 +0000
commitd1e2f3cdba108b57a5305607eb4336fbfd1f2a06 (patch)
tree78ed15ac8fbcb509029266a9e2296ce2fc4c41a8
parent7ff74907cc026e96462da725de537629511a5011 (diff)
downloadqpid-python-d1e2f3cdba108b57a5305607eb4336fbfd1f2a06.tar.gz
QPID-3603: Simplified Link failover.
- Moved timer from LinkRegistry to Link. - Got rid of remapping code, simplified failover. - Faster interval for maintenance intervals. - Test for simple HA broker failover. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1243577 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp77
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h225
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp75
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.h29
-rw-r--r--qpid/cpp/src/qpid/broker/RetryList.h3
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp23
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp22
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h1
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py33
-rwxr-xr-xqpid/cpp/src/tests/reliable_replication_test1
11 files changed, 239 insertions, 252 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 2c053220a4..cd5b89e1ad 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -23,6 +23,7 @@
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
+#include "qpid/sys/Timer.h"
#include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h"
#include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h"
#include "boost/bind.hpp"
@@ -31,19 +32,35 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
-using namespace qpid::broker;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
-using qpid::framing::UnauthorizedAccessException;
-using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
-using qpid::sys::Mutex;
+namespace qpid {
+namespace broker {
+
+using framing::Buffer;
+using framing::FieldTable;
+using framing::UnauthorizedAccessException;
+using framing::connection::CLOSE_CODE_CONNECTION_FORCED;
+using management::ManagementAgent;
+using management::ManagementObject;
+using management::Manageable;
+using management::Args;
+using sys::Mutex;
using std::stringstream;
using std::string;
-namespace _qmf = qmf::org::apache::qpid::broker;
+namespace _qmf = ::qmf::org::apache::qpid::broker;
+
+struct LinkTimerTask : public sys::TimerTask {
+ LinkTimerTask(Link& l, sys::Timer& t)
+ : TimerTask(/*FIXME*/100*sys::TIME_MSEC, "Link retry timer"), link(l), timer(t) {}
+
+ void fire() {
+ link.maintenanceVisit(); // FIXME aconway 2012-01-31:
+ setupNextFire();
+ timer.add(this);
+ }
+
+ Link& link;
+ sys::Timer& timer;
+};
Link::Link(LinkRegistry* _links,
MessageStore* _store,
@@ -67,7 +84,8 @@ Link::Link(LinkRegistry* _links,
reconnectNext(0), // Index of next address for reconnecting in url.
channelCounter(1),
connection(0),
- agent(0)
+ agent(0),
+ timerTask(new LinkTimerTask(*this, broker->getTimer()))
{
if (parent != 0 && broker != 0)
{
@@ -80,10 +98,12 @@ Link::Link(LinkRegistry* _links,
}
setStateLH(STATE_WAITING);
startConnectionLH();
+ broker->getTimer().add(timerTask);
}
Link::~Link ()
{
+ timerTask->cancel();
if (state == STATE_OPERATIONAL && connection != 0)
connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
@@ -121,7 +141,9 @@ void Link::startConnectionLH ()
broker->connect (host, boost::lexical_cast<std::string>(port), transport,
boost::bind (&Link::closed, this, _1, _2));
QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
- } catch(std::exception& e) {
+ } catch(const std::exception& e) {
+ QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: "
+ << e.what());
setStateLH(STATE_WAITING);
if (!hideManagement())
mgmtObject->set_lastError (e.what());
@@ -156,14 +178,16 @@ void Link::setUrl(const Url& u) {
void Link::opened() {
Mutex::ScopedLock mutex(lock);
assert(connection);
- // Get default URL from known-hosts.
- const std::vector<Url>& known = connection->getKnownHosts();
- // Flatten vector of URLs into a single URL listing all addresses.
- url.clear();
- for(size_t i = 0; i < known.size(); ++i)
- url.insert(url.end(), known[i].begin(), known[i].end());
- reconnectNext = 0;
- QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
+ // Get default URL from known-hosts if not already set
+ if (url.empty()) {
+ const std::vector<Url>& known = connection->getKnownHosts();
+ // Flatten vector of URLs into a single URL listing all addresses.
+ url.clear();
+ for(size_t i = 0; i < known.size(); ++i)
+ url.insert(url.end(), known[i].begin(), known[i].end());
+ reconnectNext = 0;
+ QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
+ }
}
void Link::closed(int, std::string text)
@@ -176,7 +200,7 @@ void Link::closed(int, std::string text)
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
- QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
+ QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str());
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
@@ -333,7 +357,7 @@ void Link::maintenanceVisit ()
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
-void Link::reconnect(const qpid::Address& a)
+void Link::reconnect(const Address& a)
{
Mutex::ScopedLock mutex(lock);
host = a.host;
@@ -347,13 +371,14 @@ void Link::reconnect(const qpid::Address& a)
}
}
-bool Link::tryFailoverLH() { // FIXME aconway 2012-01-30: lock held?
+bool Link::tryFailoverLH() {
if (reconnectNext >= url.size()) reconnectNext = 0;
if (url.empty()) return false;
Address next = url[reconnectNext++];
if (next.host != host || next.port != port || next.protocol != transport) {
links->changeAddress(Address(transport, host, port), next);
- QPID_LOG(debug, "Link failing over to " << host << ":" << port);
+ QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port);
+ reconnect(next);
return true;
}
return false;
@@ -510,3 +535,5 @@ void Link::setPassive(bool passive)
}
}
}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index 9c82eda000..9ee4837040 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -35,115 +35,122 @@
#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
- namespace broker {
-
- class LinkRegistry;
- class Broker;
- class Connection;
-
- class Link : public PersistableConfig, public management::Manageable {
- private:
- sys::Mutex lock;
- LinkRegistry* links;
- MessageStore* store;
- std::string host;
- uint16_t port;
- std::string transport;
- bool durable;
- std::string authMechanism;
- std::string username;
- std::string password;
- mutable uint64_t persistenceId;
- qmf::org::apache::qpid::broker::Link* mgmtObject;
- Broker* broker;
- int state;
- uint32_t visitCount;
- uint32_t currentInterval;
- bool closing;
- Url url; // URL can contain many addresses.
- size_t reconnectNext; // Index for next re-connect attempt
-
- typedef std::vector<Bridge::shared_ptr> Bridges;
- Bridges created; // Bridges pending creation
- Bridges active; // Bridges active
- Bridges cancellations; // Bridges pending cancellation
- uint channelCounter;
- Connection* connection;
- management::ManagementAgent* agent;
-
- static const int STATE_WAITING = 1;
- static const int STATE_CONNECTING = 2;
- static const int STATE_OPERATIONAL = 3;
- static const int STATE_FAILED = 4;
- static const int STATE_CLOSED = 5;
- static const int STATE_PASSIVE = 6;
-
- static const uint32_t MAX_INTERVAL = 32;
-
- void setStateLH (int newState);
- void startConnectionLH(); // Start the IO Connection
- void destroy(); // Called when mgmt deletes this link
- void ioThreadProcessing(); // Called on connection's IO thread by request
- bool tryFailoverLH(); // Called during maintenance visit
- bool hideManagement() const;
-
- public:
- typedef boost::shared_ptr<Link> shared_ptr;
-
- Link(LinkRegistry* links,
- MessageStore* store,
- const std::string& host,
- uint16_t port,
- const std::string& transport,
- bool durable,
- const std::string& authMechanism,
- const std::string& username,
- const std::string& password,
- Broker* broker,
- management::Manageable* parent = 0);
- virtual ~Link();
-
- std::string getHost() { return host; }
- uint16_t getPort() { return port; }
- std::string getTransport() { return transport; }
-
- bool isDurable() { return durable; }
- void maintenanceVisit ();
- uint nextChannel();
- void add(Bridge::shared_ptr);
- void cancel(Bridge::shared_ptr);
- void setUrl(const Url&); // Set URL for reconnection.
-
- void established(); // Called when connection is create
- void opened(); // Called when connection is open (after create)
- void closed(int, std::string); // Called when connection goes away
- void setConnection(Connection*); // Set pointer to the AMQP Connection
- void reconnect(const Address&); //called by LinkRegistry
- void close(); // Close the link from within the broker.
-
- std::string getAuthMechanism() { return authMechanism; }
- std::string getUsername() { return username; }
- std::string getPassword() { return password; }
- Broker* getBroker() { return broker; }
-
- void notifyConnectionForced(const std::string text);
- void setPassive(bool p);
-
- // PersistableConfig:
- void setPersistenceId(uint64_t id) const;
- uint64_t getPersistenceId() const { return persistenceId; }
- uint32_t encodedSize() const;
- void encode(framing::Buffer& buffer) const;
- const std::string& getName() const;
-
- static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
-
- // Manageable entry points
- management::ManagementObject* GetManagementObject(void) const;
- management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
-
- };
- }
+
+namespace sys {
+class TimerTask;
+}
+
+namespace broker {
+
+class LinkRegistry;
+class Broker;
+class Connection;
+
+class Link : public PersistableConfig, public management::Manageable {
+ private:
+ sys::Mutex lock;
+ LinkRegistry* links;
+ MessageStore* store;
+ std::string host;
+ uint16_t port;
+ std::string transport;
+ bool durable;
+ std::string authMechanism;
+ std::string username;
+ std::string password;
+ mutable uint64_t persistenceId;
+ qmf::org::apache::qpid::broker::Link* mgmtObject;
+ Broker* broker;
+ int state;
+ uint32_t visitCount;
+ uint32_t currentInterval;
+ bool closing;
+ Url url; // URL can contain many addresses.
+ size_t reconnectNext; // Index for next re-connect attempt
+
+ typedef std::vector<Bridge::shared_ptr> Bridges;
+ Bridges created; // Bridges pending creation
+ Bridges active; // Bridges active
+ Bridges cancellations; // Bridges pending cancellation
+ uint channelCounter;
+ Connection* connection;
+ management::ManagementAgent* agent;
+
+ boost::intrusive_ptr<sys::TimerTask> timerTask;
+
+ static const int STATE_WAITING = 1;
+ static const int STATE_CONNECTING = 2;
+ static const int STATE_OPERATIONAL = 3;
+ static const int STATE_FAILED = 4;
+ static const int STATE_CLOSED = 5;
+ static const int STATE_PASSIVE = 6;
+
+ static const uint32_t MAX_INTERVAL = 32;
+
+ void setStateLH (int newState);
+ void startConnectionLH(); // Start the IO Connection
+ void destroy(); // Called when mgmt deletes this link
+ void ioThreadProcessing(); // Called on connection's IO thread by request
+ bool tryFailoverLH(); // Called during maintenance visit
+ bool hideManagement() const;
+
+ public:
+ typedef boost::shared_ptr<Link> shared_ptr;
+
+ Link(LinkRegistry* links,
+ MessageStore* store,
+ const std::string& host,
+ uint16_t port,
+ const std::string& transport,
+ bool durable,
+ const std::string& authMechanism,
+ const std::string& username,
+ const std::string& password,
+ Broker* broker,
+ management::Manageable* parent = 0);
+ virtual ~Link();
+
+ std::string getHost() { return host; }
+ uint16_t getPort() { return port; }
+ std::string getTransport() { return transport; }
+
+ bool isDurable() { return durable; }
+ void maintenanceVisit ();
+ uint nextChannel();
+ void add(Bridge::shared_ptr);
+ void cancel(Bridge::shared_ptr);
+ void setUrl(const Url&); // Set URL for reconnection.
+
+ void established(); // Called when connection is create
+ void opened(); // Called when connection is open (after create)
+ void closed(int, std::string); // Called when connection goes away
+ void setConnection(Connection*); // Set pointer to the AMQP Connection
+ void reconnect(const Address&); //called by LinkRegistry
+ void close(); // Close the link from within the broker.
+
+ std::string getAuthMechanism() { return authMechanism; }
+ std::string getUsername() { return username; }
+ std::string getPassword() { return password; }
+ Broker* getBroker() { return broker; }
+
+ void notifyConnectionForced(const std::string text);
+ void setPassive(bool p);
+
+ // PersistableConfig:
+ void setPersistenceId(uint64_t id) const;
+ uint64_t getPersistenceId() const { return persistenceId; }
+ uint32_t encodedSize() const;
+ void encode(framing::Buffer& buffer) const;
+ const std::string& getName() const;
+
+ static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+
+ // Manageable entry points
+ management::ManagementObject* GetManagementObject(void) const;
+ management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
+
+};
+}
}
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index bb21ca01ab..ef6bf95b0b 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -35,15 +35,12 @@ using boost::format;
using boost::str;
namespace _qmf = qmf::org::apache::qpid::broker;
-#define LINK_MAINT_INTERVAL 2
-
// TODO: This constructor is only used by the store unit tests -
// That probably indicates that LinkRegistry isn't correctly
-// factored: The persistence element and maintenance element
-// should be factored separately
+// factored: The persistence element should be factored separately
LinkRegistry::LinkRegistry () :
- broker(0), timer(0),
- parent(0), store(0), passive(false), passiveChanged(false),
+ broker(0),
+ parent(0), store(0), passive(false),
realm("")
{
}
@@ -60,79 +57,32 @@ struct ConnectionObserverImpl : public ConnectionObserver {
}
LinkRegistry::LinkRegistry (Broker* _broker) :
- broker(_broker), timer(&broker->getTimer()),
- maintenanceTask(new Periodic(*this)),
- parent(0), store(0), passive(false), passiveChanged(false),
+ broker(_broker),
+ parent(0), store(0), passive(false),
realm(broker->getOptions().realm)
{
- timer->add(maintenanceTask);
broker->getConnectionObservers().add(
boost::shared_ptr<ConnectionObserver>(new ConnectionObserverImpl(*this)));
}
-LinkRegistry::~LinkRegistry()
-{
- // This test is only necessary if the default constructor above is present
- if (maintenanceTask)
- maintenanceTask->cancel();
-}
-
-LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
- TimerTask (Duration (LINK_MAINT_INTERVAL * TIME_SEC),"LinkRegistry"), links(_links) {}
-
-void LinkRegistry::Periodic::fire ()
-{
- links.periodicMaintenance ();
- setupNextFire();
- links.timer->add(this);
-}
-
-void LinkRegistry::periodicMaintenance ()
-{
- Mutex::ScopedLock locker(lock);
+LinkRegistry::~LinkRegistry() {}
- linksToDestroy.clear();
- bridgesToDestroy.clear();
- if (passiveChanged) {
- if (passive) { QPID_LOG(info, "Passivating links"); }
- else { QPID_LOG(info, "Activating links"); }
- for (LinkMap::iterator i = links.begin(); i != links.end(); i++) {
- i->second->setPassive(passive);
- }
- passiveChanged = false;
- }
- for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
- i->second->maintenanceVisit();
- //now process any requests for re-addressing
- for (AddressMap::iterator i = reMappings.begin(); i != reMappings.end(); i++)
- updateAddress(i->first, i->second);
- reMappings.clear();
-}
void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress)
{
- //done on periodic maintenance thread; hold changes in separate
- //map to avoid modifying the link map that is iterated over
- reMappings[createKey(oldAddress)] = newAddress;
-}
-
-bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::Address& newAddress)
-{
+ Mutex::ScopedLock locker(lock);
+ std::string oldKey = createKey(oldAddress);
std::string newKey = createKey(newAddress);
if (links.find(newKey) != links.end()) {
QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use");
- return false;
} else {
LinkMap::iterator i = links.find(oldKey);
if (i == links.end()) {
QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey);
- return false;
} else {
links[newKey] = i->second;
- i->second->reconnect(newAddress);
links.erase(oldKey);
QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey);
- return true;
}
}
}
@@ -230,7 +180,6 @@ void LinkRegistry::destroy(const string& host, const uint16_t port)
{
if (i->second->isDurable() && store)
store->destroy(*(i->second));
- linksToDestroy[key] = i->second;
links.erase(i);
}
}
@@ -258,7 +207,6 @@ void LinkRegistry::destroy(const std::string& host,
l->second->cancel(b->second);
if (b->second->isDurable())
store->destroy(*(b->second));
- bridgesToDestroy[bridgeKey] = b->second;
bridges.erase(b);
}
@@ -406,9 +354,12 @@ std::string LinkRegistry::createKey(const std::string& host, uint16_t port) {
void LinkRegistry::setPassive(bool p)
{
Mutex::ScopedLock locker(lock);
- passiveChanged = p != passive;
passive = p;
- //will activate or passivate links on maintenance visit
+ if (passive) { QPID_LOG(info, "Passivating links"); }
+ else { QPID_LOG(info, "Activating links"); }
+ for (LinkMap::iterator i = links.begin(); i != links.end(); i++) {
+ i->second->setPassive(passive);
+ }
}
void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index e66fca7d6b..ef4871192f 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,7 +27,6 @@
#include "qpid/broker/MessageStore.h"
#include "qpid/Address.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
@@ -40,40 +39,19 @@ namespace broker {
class Broker;
class Connection;
class LinkRegistry {
-
- // Declare a timer task to manage the establishment of link connections and the
- // re-establishment of lost link connections.
- struct Periodic : public sys::TimerTask
- {
- LinkRegistry& links;
-
- Periodic(LinkRegistry& links);
- virtual ~Periodic() {};
- void fire();
- };
-
typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
- typedef std::map<std::string, Address> AddressMap;
LinkMap links;
- LinkMap linksToDestroy;
BridgeMap bridges;
- BridgeMap bridgesToDestroy;
- AddressMap reMappings;
qpid::sys::Mutex lock;
Broker* broker;
- sys::Timer* timer;
- boost::intrusive_ptr<qpid::sys::TimerTask> maintenanceTask;
management::Manageable* parent;
MessageStore* store;
bool passive;
- bool passiveChanged;
std::string realm;
- void periodicMaintenance ();
- bool updateAddress(const std::string& oldKey, const Address& newAddress);
boost::shared_ptr<Link> findLink(const std::string& key);
static std::string createKey(const Address& address);
static std::string createKey(const std::string& host, uint16_t port);
@@ -147,6 +125,7 @@ namespace broker {
* Called by links failing over to new address
*/
void changeAddress(const Address& oldAddress, const Address& newAddress);
+
/**
* Called to alter passive state. In passive state the links
* and bridges managed by a link registry will be recorded and
@@ -155,7 +134,7 @@ namespace broker {
*/
void setPassive(bool);
-
+
/** Iterate over each link in the registry. Used for cluster updates. */
void eachLink(boost::function<void(boost::shared_ptr<Link>)> f);
/** Iterate over each bridge in the registry. Used for cluster updates. */
diff --git a/qpid/cpp/src/qpid/broker/RetryList.h b/qpid/cpp/src/qpid/broker/RetryList.h
index 242a7d2122..9c4b779bcb 100644
--- a/qpid/cpp/src/qpid/broker/RetryList.h
+++ b/qpid/cpp/src/qpid/broker/RetryList.h
@@ -23,7 +23,6 @@
*/
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/Address.h"
#include "qpid/Url.h"
namespace qpid {
@@ -36,7 +35,7 @@ namespace broker {
class RetryList
{
public:
- QPID_BROKER_EXTERN RetryList();
+ QPID_BROKER_EXTERN RetryList();
QPID_BROKER_EXTERN void reset(const std::vector<Url>& urls);
QPID_BROKER_EXTERN bool next(Address& address);
private:
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 1ab24057ce..32f87dc722 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -46,32 +46,39 @@ using std::string;
Backup::Backup(broker::Broker& b, const Settings& s) :
broker(b), settings(s), excluder(new ConnectionExcluder())
{
- Url url(s.brokerUrl);
- string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
+}
+void Backup::initialize(const Url& url) {
+ QPID_LOG(notice, "Ha: Backup started: " << url);
+ string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
// Declare the link
std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
url[0].host, url[0].port, protocol,
false, // durable
- s.mechanism, s.username, s.password);
+ settings.mechanism, settings.username, settings.password);
assert(result.second); // FIXME aconway 2011-11-23: error handling
link = result.first;
- link->setUrl(Url(s.brokerUrl));
+ link->setUrl(url);
replicator.reset(new BrokerReplicator(link));
broker.getExchanges().registerExchange(replicator);
-
broker.getConnectionObservers().add(excluder);
}
void Backup::setUrl(const Url& url) {
sys::Mutex::ScopedLock l(lock);
- link->setUrl(url);
+ if (!replicator.get())
+ initialize(url);
+ else {
+ QPID_LOG(info, "HA: Backup URL set to " << url);
+ link->setUrl(url);
+ }
}
Backup::~Backup() {
- link->close();
- broker.getExchanges().destroy(replicator->getName());
+ if (link) link->close();
+ if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
broker.getConnectionObservers().remove(excluder); // Allows client connections.
}
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index 3a9e739c98..3ebb6d018d 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -52,6 +52,8 @@ class Backup
void setUrl(const Url&);
private:
+ void initialize(const Url&);
+
sys::Mutex lock;
broker::Broker& broker;
Settings settings;
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index ad97f87a62..c7692ac9a2 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -41,7 +41,9 @@ using namespace std;
namespace {
Url url(const std::string& s, const std::string& id) {
try {
- return Url(s);
+ // Allow the URL to be empty, used in tests that set the URL
+ // after starting broker
+ return s.empty() ? Url() : Url(s);
} catch (const std::exception& e) {
throw Exception(Msg() << "Invalid URL for " << id << ": '" << s << "'");
}
@@ -55,15 +57,9 @@ const std::string BACKUP="backup";
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: broker(b),
settings(s),
- brokerUrl(url(s.brokerUrl, "ha-broker-url")),
- clientUrl(s.clientUrl.empty() ? brokerUrl : url(s.clientUrl, "ha-client-url")),
backup(new Backup(b, s)),
mgmtObject(0)
{
- // Note all HA brokers start out in backup mode.
- QPID_LOG(notice, "HA: Backup initialized: "
- << " broker-url=" << brokerUrl
- << " client-url=" << clientUrl);
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
boost::shared_ptr<ReplicatingSubscription::Factory>(
@@ -96,15 +92,17 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
break;
}
case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: {
- clientUrl = dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args).i_clientAddresses;
- mgmtObject->set_clientAddresses(clientUrl.str());
+ string url = dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args)
+ .i_clientAddresses;
+ mgmtObject->set_clientAddresses(url);
// FIXME aconway 2012-01-30: upate status for new URL
break;
}
case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: {
- brokerUrl = dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args).i_brokerAddresses;
- mgmtObject->set_brokerAddresses(brokerUrl.str());
- if (backup.get()) backup->setUrl(brokerUrl);
+ string url = dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args)
+ .i_brokerAddresses;
+ mgmtObject->set_brokerAddresses(url);
+ if (backup.get()) backup->setUrl(Url(url));
break;
}
default:
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index affaa7486f..e3dc46946b 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -57,7 +57,6 @@ class HaBroker : public management::Manageable
sys::Mutex lock;
broker::Broker& broker;
Settings settings;
- Url brokerUrl, clientUrl;
std::auto_ptr<Backup> backup;
qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
};
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index e782b57f7f..ddcc19ee1e 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -30,11 +30,12 @@ log = getLogger("qpid.ha-tests")
class HaBroker(Broker):
def __init__(self, test, args=[], broker_url=None, **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
- Broker.__init__(self, test,
- args=["--load-module", BrokerTest.ha_lib,
- "--ha-enable=yes",
- "--ha-broker-url", broker_url ],
- **kwargs)
+ args=["--load-module", BrokerTest.ha_lib,
+ "--log-enable=debug+:ha::", # FIXME aconway 2012-01-31:
+ "--log-enable=debug+:Link",
+ "--ha-enable=yes"]
+ if broker_url: args += [ "--ha-broker-url", broker_url ]
+ Broker.__init__(self, test, args, **kwargs)
def promote(self):
assert os.system("qpid-ha-tool --promote %s"%(self.host_port())) == 0
@@ -237,13 +238,13 @@ class ShortTests(BrokerTest):
def test_failover(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
- getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover
+ getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages
primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
# Check that backup rejects normal connections
try:
- backup.connect()
+ backup.connect().session()
self.fail("Expected connection to backup to fail")
except ConnectionError: pass
# Check that admin connections are allowed to backup.
@@ -284,6 +285,24 @@ class ShortTests(BrokerTest):
sender.stop()
receiver.stop()
+ def test_backup_failover(self):
+ # FIXME aconway 2012-01-30: UNFINISHED
+ brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
+ for name in ["a","b","c"] ]
+ url = ",".join([b.host_port() for b in brokers])
+ for b in brokers: b.set_broker_url(url)
+ brokers[0].promote()
+ brokers[0].connect().session().sender(
+ "q;{create:always,%s}"%(self.qpid_replicate())).send("a")
+ for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
+ # FIXME aconway 2012-01-30: failing - not using set URL?
+ brokers[0].kill()
+ brokers[2].promote() # c must fail over to b.
+ brokers[2].connect().session().sender("q").send("b")
+ self.assert_browse_backup(brokers[1], "q", ["a","b"])
+ # FIXME aconway 2012-01-30: finish
+ for b in brokers[1:]: b.kill()
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
diff --git a/qpid/cpp/src/tests/reliable_replication_test b/qpid/cpp/src/tests/reliable_replication_test
index 273e482da0..1f1dac5f2d 100755
--- a/qpid/cpp/src/tests/reliable_replication_test
+++ b/qpid/cpp/src/tests/reliable_replication_test
@@ -66,7 +66,6 @@ receive() {
bounce_link() {
$PYTHON_COMMANDS/qpid-route link del "localhost:$BROKER_B" "localhost:$BROKER_A"
-# sleep 2
$PYTHON_COMMANDS/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
}