diff options
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionState.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 48 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Link.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.cpp | 39 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RetryList.cpp | 60 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RetryList.h | 53 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 3 | ||||
-rw-r--r-- | cpp/src/tests/RetryList.cpp | 106 |
13 files changed, 337 insertions, 10 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 1f1f1c4a9c..bec5373157 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -408,6 +408,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/RecoveryManagerImpl.cpp \ qpid/broker/RecoveredEnqueue.cpp \ qpid/broker/RecoveredDequeue.cpp \ + qpid/broker/RetryList.cpp \ qpid/broker/SaslAuthenticator.cpp \ qpid/broker/SecureConnection.cpp \ qpid/broker/SecureConnectionFactory.cpp \ @@ -561,6 +562,7 @@ nobase_include_HEADERS = \ qpid/broker/RecoveredEnqueue.h \ qpid/broker/RecoveryManager.h \ qpid/broker/RecoveryManagerImpl.h \ + qpid/broker/RetryList.h \ qpid/broker/SaslAuthenticator.h \ qpid/broker/SecureConnection.h \ qpid/broker/SecureConnectionFactory.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 184dc06964..663a110a78 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -118,7 +118,8 @@ Broker::Options::Options(const std::string& name) : ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication") ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)") ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections") - ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted"); + ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted") + ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)"); } const std::string empty; @@ -127,6 +128,7 @@ const std::string amq_topic("amq.topic"); const std::string amq_fanout("amq.fanout"); const std::string amq_match("amq.match"); const std::string qpid_management("qpid.management"); +const std::string knownHostsNone("none"); Broker::Broker(const Broker::Options& conf) : poller(new Poller), @@ -248,9 +250,13 @@ Broker::Broker(const Broker::Options& conf) : } //initialize known broker urls (TODO: add support for urls for other transports (SSL, RDMA)): - boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(TCP_TRANSPORT); - if (factory) { - knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( factory->getPort() ) ); + if (conf.knownHosts.empty()) { + boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(TCP_TRANSPORT); + if (factory) { + knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( factory->getPort() ) ); + } + } else if (conf.knownHosts != knownHostsNone) { + knownBrokers.push_back(Url(conf.knownHosts)); } } diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 247493d41c..d97737c707 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -104,6 +104,7 @@ class Broker : public sys::Runnable, public Plugin::Target, uint queueLimit; bool tcpNoDelay; bool requireEncrypted; + std::string knownHosts; }; private: diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index 38e667dcba..86123d346f 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -236,8 +236,12 @@ void ConnectionHandler::Handler::tune(uint16_t channelMax, server.open("/", Array(), true); } -void ConnectionHandler::Handler::openOk(const framing::Array& /*knownHosts*/) +void ConnectionHandler::Handler::openOk(const framing::Array& knownHosts) { + for (Array::ValueVector::const_iterator i = knownHosts.begin(); i != knownHosts.end(); ++i) { + Url url((*i)->get<std::string>()); + connection.getKnownHosts().push_back(url); + } } void ConnectionHandler::Handler::redirect(const string& /*host*/, const framing::Array& /*knownHosts*/) diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index 53591dc40a..c4c75833bf 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -27,6 +27,7 @@ #include "qpid/sys/ConnectionOutputHandlerPtr.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/management/Manageable.h" +#include "qpid/Url.h" #include "Broker.h" namespace qpid { @@ -73,6 +74,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable bool isFederationLink() const { return federationLink; } void setFederationPeerTag(const string& tag) { federationPeerTag = string(tag); } const string& getFederationPeerTag() const { return federationPeerTag; } + std::vector<Url>& getKnownHosts() { return knownHosts; } Broker& getBroker() { return broker; } @@ -97,6 +99,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable string url; bool federationLink; string federationPeerTag; + std::vector<Url> knownHosts; }; }} diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 92417608b7..2bd15759ef 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -63,6 +63,7 @@ Link::Link(LinkRegistry* _links, visitCount(0), currentInterval(1), closing(false), + updateUrls(false), channelCounter(1), connection(0), agent(0) @@ -116,6 +117,7 @@ void Link::startConnectionLH () setStateLH(STATE_CONNECTING); broker->connect (host, port, transport, boost::bind (&Link::closed, this, _1, _2)); + QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); } catch(std::exception& e) { setStateLH(STATE_WAITING); if (mgmtObject != 0) @@ -143,6 +145,7 @@ void Link::established () void Link::closed (int, std::string text) { Mutex::ScopedLock mutex(lock); + QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text); connection = 0; @@ -251,28 +254,65 @@ void Link::setConnection(Connection* c) { Mutex::ScopedLock mutex(lock); connection = c; + updateUrls = true; } void Link::maintenanceVisit () { Mutex::ScopedLock mutex(lock); + if (connection && updateUrls) { + urls.reset(connection->getKnownHosts()); + QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls); + updateUrls = false; + } + if (state == STATE_WAITING) { visitCount++; if (visitCount >= currentInterval) { visitCount = 0; - currentInterval *= 2; - if (currentInterval > MAX_INTERVAL) - currentInterval = MAX_INTERVAL; - startConnectionLH(); + //switch host and port to next in url list if possible + if (!tryFailover()) { + currentInterval *= 2; + if (currentInterval > MAX_INTERVAL) + currentInterval = MAX_INTERVAL; + startConnectionLH(); + } } } else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } +void Link::reconnect(const TcpAddress& a) +{ + Mutex::ScopedLock mutex(lock); + host = a.host; + port = a.port; + startConnectionLH(); + if (mgmtObject != 0) { + stringstream errorString; + errorString << "Failed over to " << a; + mgmtObject->set_lastError(errorString.str()); + } +} + +bool Link::tryFailover() +{ + //TODO: urls only work for TCP at present, update when that has changed + TcpAddress next; + if (transport == Broker::TCP_TRANSPORT && urls.next(next) && + (next.host != host || next.port != port)) { + links->changeAddress(TcpAddress(host, port), next); + QPID_LOG(debug, "Link failing over to " << host << ":" << port); + return true; + } else { + return false; + } +} + uint Link::nextChannel() { Mutex::ScopedLock mutex(lock); diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h index a8dd528071..6fef694663 100644 --- a/cpp/src/qpid/broker/Link.h +++ b/cpp/src/qpid/broker/Link.h @@ -26,6 +26,7 @@ #include "MessageStore.h" #include "PersistableConfig.h" #include "Bridge.h" +#include "RetryList.h" #include "qpid/sys/Mutex.h" #include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" @@ -60,6 +61,8 @@ namespace qpid { uint32_t visitCount; uint32_t currentInterval; bool closing; + RetryList urls; + bool updateUrls; typedef std::vector<Bridge::shared_ptr> Bridges; Bridges created; // Bridges pending creation @@ -80,6 +83,7 @@ namespace qpid { void startConnectionLH(); // Start the IO Connection void destroy(); // Called when mgmt deletes this link void ioThreadProcessing(); // Called on connection's IO thread by request + bool tryFailover(); // Called during maintenance visit public: typedef boost::shared_ptr<Link> shared_ptr; @@ -108,6 +112,7 @@ namespace qpid { void established(); // Called when connection is created void closed(int, std::string); // Called when connection goes away void setConnection(Connection*); // Set pointer to the AMQP Connection + void reconnect(const TcpAddress&); //called by LinkRegistry string getAuthMechanism() { return authMechanism; } string getUsername() { return username; } diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 274e673bd3..f400f2066a 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -19,6 +19,7 @@ * */ #include "LinkRegistry.h" +#include "qpid/log/Statement.h" #include <iostream> using namespace qpid::broker; @@ -52,6 +53,38 @@ void LinkRegistry::periodicMaintenance () bridgesToDestroy.clear(); for (LinkMap::iterator i = links.begin(); i != links.end(); i++) i->second->maintenanceVisit(); + //now process any requests for re-addressing + for (AddressMap::iterator i = reMappings.begin(); i != reMappings.end(); i++) + updateAddress(i->first, i->second); + reMappings.clear(); +} + +void LinkRegistry::changeAddress(const TcpAddress& oldAddress, const TcpAddress& newAddress) +{ + //done on periodic maintenance thread; hold changes in separate + //map to avoid modifying the link map that is iterated over + reMappings[createKey(oldAddress)] = newAddress; +} + +bool LinkRegistry::updateAddress(const std::string& oldKey, const TcpAddress& newAddress) +{ + std::string newKey = createKey(newAddress); + if (links.find(newKey) != links.end()) { + QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use"); + return false; + } else { + LinkMap::iterator i = links.find(oldKey); + if (i == links.end()) { + QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey); + return false; + } else { + links[newKey] = i->second; + i->second->reconnect(newAddress); + links.erase(oldKey); + QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey); + return true; + } + } } pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host, @@ -252,3 +285,9 @@ std::string LinkRegistry::getAuthIdentity(const std::string& key) } +std::string LinkRegistry::createKey(const TcpAddress& a) +{ + stringstream keystream; + keystream << a.host << ":" << a.port; + return string(keystream.str()); +} diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index d52b7a7394..6e228c0e2c 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -27,6 +27,7 @@ #include "Bridge.h" #include "MessageStore.h" #include "Timer.h" +#include "qpid/Address.h" #include "qpid/sys/Mutex.h" #include "qpid/management/Manageable.h" @@ -50,11 +51,13 @@ namespace broker { typedef std::map<std::string, Link::shared_ptr> LinkMap; typedef std::map<std::string, Bridge::shared_ptr> BridgeMap; + typedef std::map<std::string, TcpAddress> AddressMap; LinkMap links; LinkMap linksToDestroy; BridgeMap bridges; BridgeMap bridgesToDestroy; + AddressMap reMappings; qpid::sys::Mutex lock; Broker* broker; @@ -63,6 +66,8 @@ namespace broker { MessageStore* store; void periodicMaintenance (); + bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress); + static std::string createKey(const TcpAddress& address); public: LinkRegistry (Broker* _broker); @@ -116,6 +121,8 @@ namespace broker { std::string getAuthMechanism (const std::string& key); std::string getAuthCredentials (const std::string& key); std::string getAuthIdentity (const std::string& key); + + void changeAddress(const TcpAddress& oldAddress, const TcpAddress& newAddress); }; } } diff --git a/cpp/src/qpid/broker/RetryList.cpp b/cpp/src/qpid/broker/RetryList.cpp new file mode 100644 index 0000000000..e3a077bd95 --- /dev/null +++ b/cpp/src/qpid/broker/RetryList.cpp @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "RetryList.h" + +namespace qpid { +namespace broker { + +RetryList::RetryList() : urlIndex(0), addressIndex(0) {} + +void RetryList::reset(const std::vector<Url>& u) +{ + urls = u; + urlIndex = addressIndex = 0;//reset indices +} + +bool RetryList::next(TcpAddress& address) +{ + while (urlIndex < urls.size()) { + while (addressIndex < urls[urlIndex].size()) { + const TcpAddress* tcp = urls[urlIndex][addressIndex++].get<TcpAddress>(); + if (tcp) { + address = *tcp; + return true; + } + } + urlIndex++; + addressIndex = 0; + } + + urlIndex = addressIndex = 0;//reset indices + return false; +} + +std::ostream& operator<<(std::ostream& os, const RetryList& l) +{ + for (size_t i = 0; i < l.urls.size(); i++) { + os << l.urls[i] << " "; + } + return os; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/RetryList.h b/cpp/src/qpid/broker/RetryList.h new file mode 100644 index 0000000000..013233ef00 --- /dev/null +++ b/cpp/src/qpid/broker/RetryList.h @@ -0,0 +1,53 @@ +#ifndef QPID_BROKER_RETRYLIST_H +#define QPID_BROKER_RETRYLIST_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Address.h" +#include "qpid/Url.h" + +namespace qpid { +namespace broker { + +/** + * Simple utility for managing a list of urls to try on reconnecting a + * link. Currently only supports TCP urls. + */ +class RetryList +{ + public: + RetryList(); + void reset(const std::vector<Url>& urls); + bool next(TcpAddress& address); + private: + std::vector<Url> urls; + size_t urlIndex; + size_t addressIndex; + + friend std::ostream& operator<<(std::ostream& os, const RetryList& l); +}; + +std::ostream& operator<<(std::ostream& os, const RetryList& l); + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_RETRYLIST_H*/ diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index ff2ee060c2..6392322f9a 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -90,7 +90,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ MessageReplayTracker.cpp \ ConsoleTest.cpp \ QueueEvents.cpp \ - ProxyTest.cpp + ProxyTest.cpp \ + RetryList.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp diff --git a/cpp/src/tests/RetryList.cpp b/cpp/src/tests/RetryList.cpp new file mode 100644 index 0000000000..80f59bf15f --- /dev/null +++ b/cpp/src/tests/RetryList.cpp @@ -0,0 +1,106 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +#include "unit_test.h" +#include "test_tools.h" +#include "qpid/broker/RetryList.h" + +using namespace qpid; +using namespace qpid::broker; + +QPID_AUTO_TEST_SUITE(RetryListTestSuite) + +struct RetryListFixture +{ + RetryList list; + std::vector<Url> urls; + std::vector<TcpAddress> expected; + + void addUrl(const std::string& s) + { + urls.push_back(Url(s)); + } + + void addExpectation(const std::string& host, uint16_t port) + { + expected.push_back(TcpAddress(host, port)); + } + + void check() + { + list.reset(urls); + for (int t = 0; t < 2; t++) { + TcpAddress next; + for (std::vector<TcpAddress>::const_iterator i = expected.begin(); i != expected.end(); ++i) { + BOOST_CHECK(list.next(next)); + BOOST_CHECK_EQUAL(i->host, next.host); + BOOST_CHECK_EQUAL(i->port, next.port); + } + BOOST_CHECK(!list.next(next)); + } + } +}; + +QPID_AUTO_TEST_CASE(testWithSingleAddress) +{ + RetryListFixture test; + test.addUrl("amqp:host:5673"); + test.addExpectation("host", 5673); + test.check(); +} + +QPID_AUTO_TEST_CASE(testWithSingleUrlOfMultipleAddresses) +{ + RetryListFixture test; + test.addUrl("amqp:host1,host2:2222,tcp:host3:5673,host4:1"); + + test.addExpectation("host1", 5672); + test.addExpectation("host2", 2222); + test.addExpectation("host3", 5673); + test.addExpectation("host4", 1); + + test.check(); +} + +QPID_AUTO_TEST_CASE(testWithMultipleUrlsOfMultipleAddresses) +{ + RetryListFixture test; + test.addUrl("amqp:my-host"); + test.addUrl("amqp:host1:6666,host2:2222,tcp:host3:5673,host4:1"); + test.addUrl("amqp:host5,host6:2222,tcp:host7:5673"); + + test.addExpectation("my-host", 5672); + test.addExpectation("host1", 6666); + test.addExpectation("host2", 2222); + test.addExpectation("host3", 5673); + test.addExpectation("host4", 1); + test.addExpectation("host5", 5672); + test.addExpectation("host6", 2222); + test.addExpectation("host7", 5673); + + test.check(); +} + +QPID_AUTO_TEST_CASE(testEmptyList) +{ + RetryListFixture test; + test.check(); +} + +QPID_AUTO_TEST_SUITE_END() |