summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-01-21 17:53:32 +0000
committerGordon Sim <gsim@apache.org>2009-01-21 17:53:32 +0000
commit8286d4dca724d72f98e9529d433765a0cebf6222 (patch)
treef15a9d4e78f76d6af2fb4c6a20a55c06d01a8915 /cpp
parent35e88ad9c8304d4f72bc712a57413c753906bc74 (diff)
downloadqpid-python-8286d4dca724d72f98e9529d433765a0cebf6222.tar.gz
QPID-1567: Added ability for federation links to failover to other specified known-hosts
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736354 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/Broker.cpp14
-rw-r--r--cpp/src/qpid/broker/Broker.h1
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp6
-rw-r--r--cpp/src/qpid/broker/ConnectionState.h3
-rw-r--r--cpp/src/qpid/broker/Link.cpp48
-rw-r--r--cpp/src/qpid/broker/Link.h5
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp39
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h7
-rw-r--r--cpp/src/qpid/broker/RetryList.cpp60
-rw-r--r--cpp/src/qpid/broker/RetryList.h53
-rw-r--r--cpp/src/tests/Makefile.am3
-rw-r--r--cpp/src/tests/RetryList.cpp106
13 files changed, 337 insertions, 10 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 1f1f1c4a9c..bec5373157 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -408,6 +408,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/RecoveryManagerImpl.cpp \
qpid/broker/RecoveredEnqueue.cpp \
qpid/broker/RecoveredDequeue.cpp \
+ qpid/broker/RetryList.cpp \
qpid/broker/SaslAuthenticator.cpp \
qpid/broker/SecureConnection.cpp \
qpid/broker/SecureConnectionFactory.cpp \
@@ -561,6 +562,7 @@ nobase_include_HEADERS = \
qpid/broker/RecoveredEnqueue.h \
qpid/broker/RecoveryManager.h \
qpid/broker/RecoveryManagerImpl.h \
+ qpid/broker/RetryList.h \
qpid/broker/SaslAuthenticator.h \
qpid/broker/SecureConnection.h \
qpid/broker/SecureConnectionFactory.h \
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 184dc06964..663a110a78 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -118,7 +118,8 @@ Broker::Options::Options(const std::string& name) :
("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication")
("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)")
("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections")
- ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted");
+ ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted")
+ ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)");
}
const std::string empty;
@@ -127,6 +128,7 @@ const std::string amq_topic("amq.topic");
const std::string amq_fanout("amq.fanout");
const std::string amq_match("amq.match");
const std::string qpid_management("qpid.management");
+const std::string knownHostsNone("none");
Broker::Broker(const Broker::Options& conf) :
poller(new Poller),
@@ -248,9 +250,13 @@ Broker::Broker(const Broker::Options& conf) :
}
//initialize known broker urls (TODO: add support for urls for other transports (SSL, RDMA)):
- boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(TCP_TRANSPORT);
- if (factory) {
- knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( factory->getPort() ) );
+ if (conf.knownHosts.empty()) {
+ boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(TCP_TRANSPORT);
+ if (factory) {
+ knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( factory->getPort() ) );
+ }
+ } else if (conf.knownHosts != knownHostsNone) {
+ knownBrokers.push_back(Url(conf.knownHosts));
}
}
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 247493d41c..d97737c707 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -104,6 +104,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
uint queueLimit;
bool tcpNoDelay;
bool requireEncrypted;
+ std::string knownHosts;
};
private:
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index 38e667dcba..86123d346f 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -236,8 +236,12 @@ void ConnectionHandler::Handler::tune(uint16_t channelMax,
server.open("/", Array(), true);
}
-void ConnectionHandler::Handler::openOk(const framing::Array& /*knownHosts*/)
+void ConnectionHandler::Handler::openOk(const framing::Array& knownHosts)
{
+ for (Array::ValueVector::const_iterator i = knownHosts.begin(); i != knownHosts.end(); ++i) {
+ Url url((*i)->get<std::string>());
+ connection.getKnownHosts().push_back(url);
+ }
}
void ConnectionHandler::Handler::redirect(const string& /*host*/, const framing::Array& /*knownHosts*/)
diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h
index 53591dc40a..c4c75833bf 100644
--- a/cpp/src/qpid/broker/ConnectionState.h
+++ b/cpp/src/qpid/broker/ConnectionState.h
@@ -27,6 +27,7 @@
#include "qpid/sys/ConnectionOutputHandlerPtr.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/management/Manageable.h"
+#include "qpid/Url.h"
#include "Broker.h"
namespace qpid {
@@ -73,6 +74,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
bool isFederationLink() const { return federationLink; }
void setFederationPeerTag(const string& tag) { federationPeerTag = string(tag); }
const string& getFederationPeerTag() const { return federationPeerTag; }
+ std::vector<Url>& getKnownHosts() { return knownHosts; }
Broker& getBroker() { return broker; }
@@ -97,6 +99,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
string url;
bool federationLink;
string federationPeerTag;
+ std::vector<Url> knownHosts;
};
}}
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 92417608b7..2bd15759ef 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -63,6 +63,7 @@ Link::Link(LinkRegistry* _links,
visitCount(0),
currentInterval(1),
closing(false),
+ updateUrls(false),
channelCounter(1),
connection(0),
agent(0)
@@ -116,6 +117,7 @@ void Link::startConnectionLH ()
setStateLH(STATE_CONNECTING);
broker->connect (host, port, transport,
boost::bind (&Link::closed, this, _1, _2));
+ QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
} catch(std::exception& e) {
setStateLH(STATE_WAITING);
if (mgmtObject != 0)
@@ -143,6 +145,7 @@ void Link::established ()
void Link::closed (int, std::string text)
{
Mutex::ScopedLock mutex(lock);
+ QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
connection = 0;
@@ -251,28 +254,65 @@ void Link::setConnection(Connection* c)
{
Mutex::ScopedLock mutex(lock);
connection = c;
+ updateUrls = true;
}
void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
+ if (connection && updateUrls) {
+ urls.reset(connection->getKnownHosts());
+ QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
+ updateUrls = false;
+ }
+
if (state == STATE_WAITING)
{
visitCount++;
if (visitCount >= currentInterval)
{
visitCount = 0;
- currentInterval *= 2;
- if (currentInterval > MAX_INTERVAL)
- currentInterval = MAX_INTERVAL;
- startConnectionLH();
+ //switch host and port to next in url list if possible
+ if (!tryFailover()) {
+ currentInterval *= 2;
+ if (currentInterval > MAX_INTERVAL)
+ currentInterval = MAX_INTERVAL;
+ startConnectionLH();
+ }
}
}
else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
+void Link::reconnect(const TcpAddress& a)
+{
+ Mutex::ScopedLock mutex(lock);
+ host = a.host;
+ port = a.port;
+ startConnectionLH();
+ if (mgmtObject != 0) {
+ stringstream errorString;
+ errorString << "Failed over to " << a;
+ mgmtObject->set_lastError(errorString.str());
+ }
+}
+
+bool Link::tryFailover()
+{
+ //TODO: urls only work for TCP at present, update when that has changed
+ TcpAddress next;
+ if (transport == Broker::TCP_TRANSPORT && urls.next(next) &&
+ (next.host != host || next.port != port)) {
+ links->changeAddress(TcpAddress(host, port), next);
+ QPID_LOG(debug, "Link failing over to " << host << ":" << port);
+ return true;
+ } else {
+ return false;
+ }
+}
+
uint Link::nextChannel()
{
Mutex::ScopedLock mutex(lock);
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
index a8dd528071..6fef694663 100644
--- a/cpp/src/qpid/broker/Link.h
+++ b/cpp/src/qpid/broker/Link.h
@@ -26,6 +26,7 @@
#include "MessageStore.h"
#include "PersistableConfig.h"
#include "Bridge.h"
+#include "RetryList.h"
#include "qpid/sys/Mutex.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
@@ -60,6 +61,8 @@ namespace qpid {
uint32_t visitCount;
uint32_t currentInterval;
bool closing;
+ RetryList urls;
+ bool updateUrls;
typedef std::vector<Bridge::shared_ptr> Bridges;
Bridges created; // Bridges pending creation
@@ -80,6 +83,7 @@ namespace qpid {
void startConnectionLH(); // Start the IO Connection
void destroy(); // Called when mgmt deletes this link
void ioThreadProcessing(); // Called on connection's IO thread by request
+ bool tryFailover(); // Called during maintenance visit
public:
typedef boost::shared_ptr<Link> shared_ptr;
@@ -108,6 +112,7 @@ namespace qpid {
void established(); // Called when connection is created
void closed(int, std::string); // Called when connection goes away
void setConnection(Connection*); // Set pointer to the AMQP Connection
+ void reconnect(const TcpAddress&); //called by LinkRegistry
string getAuthMechanism() { return authMechanism; }
string getUsername() { return username; }
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index 274e673bd3..f400f2066a 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -19,6 +19,7 @@
*
*/
#include "LinkRegistry.h"
+#include "qpid/log/Statement.h"
#include <iostream>
using namespace qpid::broker;
@@ -52,6 +53,38 @@ void LinkRegistry::periodicMaintenance ()
bridgesToDestroy.clear();
for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
i->second->maintenanceVisit();
+ //now process any requests for re-addressing
+ for (AddressMap::iterator i = reMappings.begin(); i != reMappings.end(); i++)
+ updateAddress(i->first, i->second);
+ reMappings.clear();
+}
+
+void LinkRegistry::changeAddress(const TcpAddress& oldAddress, const TcpAddress& newAddress)
+{
+ //done on periodic maintenance thread; hold changes in separate
+ //map to avoid modifying the link map that is iterated over
+ reMappings[createKey(oldAddress)] = newAddress;
+}
+
+bool LinkRegistry::updateAddress(const std::string& oldKey, const TcpAddress& newAddress)
+{
+ std::string newKey = createKey(newAddress);
+ if (links.find(newKey) != links.end()) {
+ QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use");
+ return false;
+ } else {
+ LinkMap::iterator i = links.find(oldKey);
+ if (i == links.end()) {
+ QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey);
+ return false;
+ } else {
+ links[newKey] = i->second;
+ i->second->reconnect(newAddress);
+ links.erase(oldKey);
+ QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey);
+ return true;
+ }
+ }
}
pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
@@ -252,3 +285,9 @@ std::string LinkRegistry::getAuthIdentity(const std::string& key)
}
+std::string LinkRegistry::createKey(const TcpAddress& a)
+{
+ stringstream keystream;
+ keystream << a.host << ":" << a.port;
+ return string(keystream.str());
+}
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index d52b7a7394..6e228c0e2c 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -27,6 +27,7 @@
#include "Bridge.h"
#include "MessageStore.h"
#include "Timer.h"
+#include "qpid/Address.h"
#include "qpid/sys/Mutex.h"
#include "qpid/management/Manageable.h"
@@ -50,11 +51,13 @@ namespace broker {
typedef std::map<std::string, Link::shared_ptr> LinkMap;
typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
+ typedef std::map<std::string, TcpAddress> AddressMap;
LinkMap links;
LinkMap linksToDestroy;
BridgeMap bridges;
BridgeMap bridgesToDestroy;
+ AddressMap reMappings;
qpid::sys::Mutex lock;
Broker* broker;
@@ -63,6 +66,8 @@ namespace broker {
MessageStore* store;
void periodicMaintenance ();
+ bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress);
+ static std::string createKey(const TcpAddress& address);
public:
LinkRegistry (Broker* _broker);
@@ -116,6 +121,8 @@ namespace broker {
std::string getAuthMechanism (const std::string& key);
std::string getAuthCredentials (const std::string& key);
std::string getAuthIdentity (const std::string& key);
+
+ void changeAddress(const TcpAddress& oldAddress, const TcpAddress& newAddress);
};
}
}
diff --git a/cpp/src/qpid/broker/RetryList.cpp b/cpp/src/qpid/broker/RetryList.cpp
new file mode 100644
index 0000000000..e3a077bd95
--- /dev/null
+++ b/cpp/src/qpid/broker/RetryList.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RetryList.h"
+
+namespace qpid {
+namespace broker {
+
+RetryList::RetryList() : urlIndex(0), addressIndex(0) {}
+
+void RetryList::reset(const std::vector<Url>& u)
+{
+ urls = u;
+ urlIndex = addressIndex = 0;//reset indices
+}
+
+bool RetryList::next(TcpAddress& address)
+{
+ while (urlIndex < urls.size()) {
+ while (addressIndex < urls[urlIndex].size()) {
+ const TcpAddress* tcp = urls[urlIndex][addressIndex++].get<TcpAddress>();
+ if (tcp) {
+ address = *tcp;
+ return true;
+ }
+ }
+ urlIndex++;
+ addressIndex = 0;
+ }
+
+ urlIndex = addressIndex = 0;//reset indices
+ return false;
+}
+
+std::ostream& operator<<(std::ostream& os, const RetryList& l)
+{
+ for (size_t i = 0; i < l.urls.size(); i++) {
+ os << l.urls[i] << " ";
+ }
+ return os;
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/RetryList.h b/cpp/src/qpid/broker/RetryList.h
new file mode 100644
index 0000000000..013233ef00
--- /dev/null
+++ b/cpp/src/qpid/broker/RetryList.h
@@ -0,0 +1,53 @@
+#ifndef QPID_BROKER_RETRYLIST_H
+#define QPID_BROKER_RETRYLIST_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Address.h"
+#include "qpid/Url.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Simple utility for managing a list of urls to try on reconnecting a
+ * link. Currently only supports TCP urls.
+ */
+class RetryList
+{
+ public:
+ RetryList();
+ void reset(const std::vector<Url>& urls);
+ bool next(TcpAddress& address);
+ private:
+ std::vector<Url> urls;
+ size_t urlIndex;
+ size_t addressIndex;
+
+ friend std::ostream& operator<<(std::ostream& os, const RetryList& l);
+};
+
+std::ostream& operator<<(std::ostream& os, const RetryList& l);
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_RETRYLIST_H*/
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index ff2ee060c2..6392322f9a 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -90,7 +90,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
MessageReplayTracker.cpp \
ConsoleTest.cpp \
QueueEvents.cpp \
- ProxyTest.cpp
+ ProxyTest.cpp \
+ RetryList.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
diff --git a/cpp/src/tests/RetryList.cpp b/cpp/src/tests/RetryList.cpp
new file mode 100644
index 0000000000..80f59bf15f
--- /dev/null
+++ b/cpp/src/tests/RetryList.cpp
@@ -0,0 +1,106 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "qpid/broker/RetryList.h"
+
+using namespace qpid;
+using namespace qpid::broker;
+
+QPID_AUTO_TEST_SUITE(RetryListTestSuite)
+
+struct RetryListFixture
+{
+ RetryList list;
+ std::vector<Url> urls;
+ std::vector<TcpAddress> expected;
+
+ void addUrl(const std::string& s)
+ {
+ urls.push_back(Url(s));
+ }
+
+ void addExpectation(const std::string& host, uint16_t port)
+ {
+ expected.push_back(TcpAddress(host, port));
+ }
+
+ void check()
+ {
+ list.reset(urls);
+ for (int t = 0; t < 2; t++) {
+ TcpAddress next;
+ for (std::vector<TcpAddress>::const_iterator i = expected.begin(); i != expected.end(); ++i) {
+ BOOST_CHECK(list.next(next));
+ BOOST_CHECK_EQUAL(i->host, next.host);
+ BOOST_CHECK_EQUAL(i->port, next.port);
+ }
+ BOOST_CHECK(!list.next(next));
+ }
+ }
+};
+
+QPID_AUTO_TEST_CASE(testWithSingleAddress)
+{
+ RetryListFixture test;
+ test.addUrl("amqp:host:5673");
+ test.addExpectation("host", 5673);
+ test.check();
+}
+
+QPID_AUTO_TEST_CASE(testWithSingleUrlOfMultipleAddresses)
+{
+ RetryListFixture test;
+ test.addUrl("amqp:host1,host2:2222,tcp:host3:5673,host4:1");
+
+ test.addExpectation("host1", 5672);
+ test.addExpectation("host2", 2222);
+ test.addExpectation("host3", 5673);
+ test.addExpectation("host4", 1);
+
+ test.check();
+}
+
+QPID_AUTO_TEST_CASE(testWithMultipleUrlsOfMultipleAddresses)
+{
+ RetryListFixture test;
+ test.addUrl("amqp:my-host");
+ test.addUrl("amqp:host1:6666,host2:2222,tcp:host3:5673,host4:1");
+ test.addUrl("amqp:host5,host6:2222,tcp:host7:5673");
+
+ test.addExpectation("my-host", 5672);
+ test.addExpectation("host1", 6666);
+ test.addExpectation("host2", 2222);
+ test.addExpectation("host3", 5673);
+ test.addExpectation("host4", 1);
+ test.addExpectation("host5", 5672);
+ test.addExpectation("host6", 2222);
+ test.addExpectation("host7", 5673);
+
+ test.check();
+}
+
+QPID_AUTO_TEST_CASE(testEmptyList)
+{
+ RetryListFixture test;
+ test.check();
+}
+
+QPID_AUTO_TEST_SUITE_END()