diff options
author | Alan Conway <aconway@apache.org> | 2008-02-01 16:03:02 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-02-01 16:03:02 +0000 |
commit | df599b1716535909317e61f4b43516d48373ad1c (patch) | |
tree | 7971d8abe970f9711cf5ba7d817cf57fc0b85a87 /cpp/src | |
parent | e5450586ffe0d33c92eed1b4c961e9b150f4663c (diff) | |
download | qpid-python-df599b1716535909317e61f4b43516d48373ad1c.tar.gz |
Added cluster URL configuration, defaults to all interfaces.
src/qpid/Plugin.h - added doxygen
src/qpid/Url.cpp,.h - cache string rep, op==, istream/ostream ops.
src/qpid/broker/Broker.h,.cpp - removed getUrl()
src/qpid/cluster/Cluster.h,.cpp - use Url class
src/qpid/cluster/ClusterPlugin.cpp - added --url configuration.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@617533 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/Plugin.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/Url.cpp | 28 | ||||
-rw-r--r-- | cpp/src/qpid/Url.h | 19 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 40 | ||||
-rw-r--r-- | cpp/src/tests/Url.cpp | 2 |
9 files changed, 82 insertions, 44 deletions
diff --git a/cpp/src/qpid/Plugin.h b/cpp/src/qpid/Plugin.h index 5aed844b43..e040662866 100644 --- a/cpp/src/qpid/Plugin.h +++ b/cpp/src/qpid/Plugin.h @@ -70,15 +70,17 @@ class Plugin : boost::noncopyable /** * Initialize Plugin functionality on a Target. - * * Plugins should ignore targets they don't recognize. + * + * Called before the target itself is initialized. */ virtual void earlyInitialize(Target&) = 0; /** * Initialize Plugin functionality on a Target. - * * Plugins should ignore targets they don't recognize. + * + * Called after the target is fully initialized. */ virtual void initialize(Target&) = 0; diff --git a/cpp/src/qpid/Url.cpp b/cpp/src/qpid/Url.cpp index 1e0e0b3603..d056edc683 100644 --- a/cpp/src/qpid/Url.cpp +++ b/cpp/src/qpid/Url.cpp @@ -36,10 +36,16 @@ using namespace std; namespace qpid { -Url Url::getHostnameUrl(uint16_t port) { +std::ostream& operator<<(std::ostream& os, const TcpAddress& a) { + return os << "tcp:" << a.host << ":" << a.port; +} + +std::istream& operator>>(std::istream&, const TcpAddress&); + +Url Url::getHostNameUrl(uint16_t port) { char name[HOST_NAME_MAX]; if (::gethostname(name, sizeof(name)) != 0) - throw Exception(QPID_MSG("Cannot get host name: " << strError(errno))); + throw InvalidUrl(QPID_MSG("Cannot get host name: " << strError(errno))); return Url(TcpAddress(name, port)); } @@ -66,9 +72,12 @@ Url Url::getIpAddressesUrl(uint16_t port) { } string Url::str() const { - ostringstream os; - os << *this; - return os.str(); + if (cache.empty() && !this->empty()) { + ostringstream os; + os << *this; + cache = os.str(); + } + return cache; } ostream& operator<<(ostream& os, const Url& url) { @@ -140,13 +149,22 @@ struct UrlGrammar : public grammar<UrlGrammar> }; void Url::parse(const char* url) { + cache.clear(); if (!boost::spirit::parse(url, UrlGrammar(*this)).full) throw InvalidUrl(string("Invalid AMQP url: ")+url); } void Url::parseNoThrow(const char* url) { + cache.clear(); if (!boost::spirit::parse(url, UrlGrammar(*this)).full) clear(); } +std::istream& operator>>(std::istream& is, Url& url) { + std::string s; + is >> s; + url.parse(s); + return is; +} + } // namespace qpid diff --git a/cpp/src/qpid/Url.h b/cpp/src/qpid/Url.h index 5cdd30399b..2e24ba948d 100644 --- a/cpp/src/qpid/Url.h +++ b/cpp/src/qpid/Url.h @@ -42,19 +42,16 @@ inline bool operator==(const TcpAddress& x, const TcpAddress& y) { return y.host==x.host && y.port == x.port; } -inline std::ostream& operator<<(std::ostream& os, const TcpAddress& a) { - return os << "tcp:" << a.host << ":" << a.port; -} +std::ostream& operator<<(std::ostream& os, const TcpAddress& a); -/** Address is a variant of all address types. */ +/** Address is a variant of all address types, more coming in future. */ typedef boost::variant<TcpAddress> Address; - /** An AMQP URL contains a list of addresses */ struct Url : public std::vector<Address> { /** Url with the hostname as returned by gethostname(2) */ - static Url getHostnameUrl(uint16_t port); + static Url getHostNameUrl(uint16_t port); /** Url with local IP address(es), may be more than one address * on a multi-homed host. */ @@ -79,20 +76,30 @@ struct Url : public std::vector<Address> { /** Parse url, throw InvalidUrl if invalid. */ explicit Url(const char* url) { parse(url); } + template<class T> Url& operator=(T s) { parse(s); return *this; } + /** Replace contents with parsed URL as defined in * https://wiki.108.redhat.com/jira/browse/AMQP-95 *@exception InvalidUrl if the url is invalid. */ void parse(const char* url); + void parse(const std::string& url) { parse(url.c_str()); } /** Replace contesnts with parsed URL as defined in * https://wiki.108.redhat.com/jira/browse/AMQP-95 * url.empty() will be true if url is invalid. */ void parseNoThrow(const char* url); + + private: + mutable std::string cache; // cache string form for efficiency. }; +inline bool operator==(const Url& a, const Url& b) { return a.str()==b.str(); } +inline bool operator!=(const Url& a, const Url& b) { return a.str()!=b.str(); } + std::ostream& operator<<(std::ostream& os, const Url& url); +std::istream& operator>>(std::istream& is, Url& url); } // namespace qpid diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 06a0d33a85..3ba07a180a 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -33,7 +33,6 @@ #include "qpid/management/ArgsBrokerEcho.h" #include "qpid/log/Statement.h" -#include "qpid/Url.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Acceptor.h" @@ -61,7 +60,7 @@ namespace broker { Broker::Options::Options(const std::string& name) : qpid::Options(name), - port(TcpAddress::DEFAULT_PORT), + port(DEFAULT_PORT), workerThreads(5), maxConnections(500), connectionBacklog(10), @@ -221,10 +220,6 @@ Broker::~Broker() { uint16_t Broker::getPort() const { return getAcceptor().getPort(); } -std::string Broker::getUrl() const { - return Url(TcpAddress(getAcceptor().getHost(), getPort())).str(); -} - Acceptor& Broker::getAcceptor() const { if (!acceptor) { const_cast<Acceptor::shared_ptr&>(acceptor) = diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 1917da83cc..fb4b9916da 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -36,7 +36,6 @@ #include "qpid/management/Broker.h" #include "qpid/Options.h" #include "qpid/Plugin.h" -#include "qpid/Url.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/OutputHandler.h" #include "qpid/framing/ProtocolInitiation.h" @@ -48,6 +47,8 @@ namespace qpid { namespace broker { +static const uint16_t DEFAULT_PORT=5672; + /** * A broker instance. */ @@ -72,7 +73,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M Broker(const Options& configuration); static shared_ptr<Broker> create(const Options& configuration); - static shared_ptr<Broker> create(int16_t port = TcpAddress::DEFAULT_PORT); + static shared_ptr<Broker> create(int16_t port = DEFAULT_PORT); /** * Return listening port. If called before bind this is @@ -82,9 +83,6 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M */ virtual uint16_t getPort() const; - /** Return the broker's URL. */ - virtual std::string getUrl() const; - /** * Run the broker. Implements Runnable::run() so the broker * can be run in a separate thread. diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index ce87d23c0d..49270bcfef 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -47,7 +47,7 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { return out; } -Cluster::Cluster(const std::string& name_, const std::string& url_, broker::Broker&) : +Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) : FrameHandler(0), // FIXME aconway 2008-01-29: handler. + observer cpg(*this), name(name_), @@ -87,7 +87,7 @@ void Cluster::handle(AMQFrame& frame) { } void Cluster::notify() { - AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url)); + AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())); handle(frame); } @@ -143,7 +143,7 @@ void Cluster::handleClusterFrame(Id from, AMQFrame& frame) { { Mutex::ScopedLock l(lock); members[from].url=notifyIn->getUrl(); - if (!self.id && notifyIn->getUrl() == url) + if (!self.id && notifyIn->getUrl() == url.str()) self=from; lock.notifyAll(); QPID_LOG(trace, *this << ": members joined: " << members); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 5aca3faf44..e9809f2264 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -27,6 +27,8 @@ #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "qpid/log/Logger.h" +#include "qpid/Url.h" + #include <boost/optional.hpp> #include <boost/function.hpp> @@ -50,8 +52,8 @@ class Cluster : public framing::FrameHandler, public: /** Details of a cluster member */ struct Member { - Member(const std::string& url_=std::string()) : url(url_) {} - std::string url; ///< Broker address. + Member(const Url& url_=Url()) : url(url_) {} + Url url; ///< Broker address. }; typedef std::vector<Member> MemberList; @@ -61,7 +63,7 @@ class Cluster : public framing::FrameHandler, * @param name of the cluster. * @param url of this broker, sent to the cluster. */ - Cluster(const std::string& name, const std::string& url, broker::Broker&); + Cluster(const std::string& name, const Url& url, broker::Broker&); virtual ~Cluster(); @@ -115,7 +117,7 @@ class Cluster : public framing::FrameHandler, mutable sys::Monitor lock; Cpg cpg; Cpg::Name name; - std::string url; + Url url; Id self; MemberMap members; sys::Thread dispatcher; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index e24c60dc2f..e6b5f1a0bd 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -15,6 +15,10 @@ * limitations under the License. * */ +#include <boost/program_options/value_semantic.hpp> + + + #include "qpid/broker/Broker.h" #include "qpid/cluster/Cluster.h" #include "qpid/Plugin.h" @@ -24,21 +28,31 @@ #include <boost/optional.hpp> #include <boost/utility/in_place_factory.hpp> + namespace qpid { namespace cluster { using namespace std; -struct ClusterPlugin : public Plugin { +struct ClusterOptions : public Options { + string name; + string url; - struct ClusterOptions : public Options { - string clusterName; - ClusterOptions() : Options("Cluster Options") { - addOptions() - ("cluster", optValue(clusterName, "NAME"), - "Joins the cluster named NAME"); - } - }; + ClusterOptions() : Options("Cluster Options") { + addOptions() + ("cluster-name", optValue(name, "NAME"), "Name of cluster to join") + ("cluster-url", optValue(url,"URL"), + "URL of this broker, advertized to the cluster.\n" + "Defaults to a URL listing all the local IP addresses\n"); + } + + Url getUrl(uint16_t port) const { + if (url.empty()) return Url::getIpAddressesUrl(port); + return Url(url); + } +}; + +struct ClusterPlugin : public Plugin { ClusterOptions options; boost::optional<Cluster> cluster; @@ -50,10 +64,12 @@ struct ClusterPlugin : public Plugin { void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); // Only provide to a Broker, and only if the --cluster config is set. - if (broker && !options.clusterName.empty()) { + if (broker && !options.name.empty()) { assert(!cluster); // A process can only belong to one cluster. - cluster = boost::in_place(options.clusterName, broker->getUrl(), boost::ref(*broker)); - // broker->add(make_shared_ptr(&cluster->getHandlerUpdater(), nullDeleter)); + cluster = boost::in_place(options.name, + options.getUrl(broker->getPort()), + boost::ref(*broker)); + // FIXME aconway 2008-02-01: Add observer. } } }; diff --git a/cpp/src/tests/Url.cpp b/cpp/src/tests/Url.cpp index 5d376e3439..bc07737520 100644 --- a/cpp/src/tests/Url.cpp +++ b/cpp/src/tests/Url.cpp @@ -33,7 +33,7 @@ BOOST_AUTO_TEST_CASE(testUrl_str) { url.push_back(TcpAddress("foo.com")); url.push_back(TcpAddress("bar.com", 6789)); BOOST_CHECK_EQUAL("amqp:tcp:foo.com:5672,tcp:bar.com:6789", url.str()); - BOOST_CHECK_EQUAL("amqp:", Url().str()); + BOOST_CHECK(Url().str().empty()); } |