summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-02-01 16:03:02 +0000
committerAlan Conway <aconway@apache.org>2008-02-01 16:03:02 +0000
commitdf599b1716535909317e61f4b43516d48373ad1c (patch)
tree7971d8abe970f9711cf5ba7d817cf57fc0b85a87 /cpp/src
parente5450586ffe0d33c92eed1b4c961e9b150f4663c (diff)
downloadqpid-python-df599b1716535909317e61f4b43516d48373ad1c.tar.gz
Added cluster URL configuration, defaults to all interfaces.
src/qpid/Plugin.h - added doxygen src/qpid/Url.cpp,.h - cache string rep, op==, istream/ostream ops. src/qpid/broker/Broker.h,.cpp - removed getUrl() src/qpid/cluster/Cluster.h,.cpp - use Url class src/qpid/cluster/ClusterPlugin.cpp - added --url configuration. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@617533 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/Plugin.h6
-rw-r--r--cpp/src/qpid/Url.cpp28
-rw-r--r--cpp/src/qpid/Url.h19
-rw-r--r--cpp/src/qpid/broker/Broker.cpp7
-rw-r--r--cpp/src/qpid/broker/Broker.h8
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp6
-rw-r--r--cpp/src/qpid/cluster/Cluster.h10
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp40
-rw-r--r--cpp/src/tests/Url.cpp2
9 files changed, 82 insertions, 44 deletions
diff --git a/cpp/src/qpid/Plugin.h b/cpp/src/qpid/Plugin.h
index 5aed844b43..e040662866 100644
--- a/cpp/src/qpid/Plugin.h
+++ b/cpp/src/qpid/Plugin.h
@@ -70,15 +70,17 @@ class Plugin : boost::noncopyable
/**
* Initialize Plugin functionality on a Target.
- *
* Plugins should ignore targets they don't recognize.
+ *
+ * Called before the target itself is initialized.
*/
virtual void earlyInitialize(Target&) = 0;
/**
* Initialize Plugin functionality on a Target.
- *
* Plugins should ignore targets they don't recognize.
+ *
+ * Called after the target is fully initialized.
*/
virtual void initialize(Target&) = 0;
diff --git a/cpp/src/qpid/Url.cpp b/cpp/src/qpid/Url.cpp
index 1e0e0b3603..d056edc683 100644
--- a/cpp/src/qpid/Url.cpp
+++ b/cpp/src/qpid/Url.cpp
@@ -36,10 +36,16 @@ using namespace std;
namespace qpid {
-Url Url::getHostnameUrl(uint16_t port) {
+std::ostream& operator<<(std::ostream& os, const TcpAddress& a) {
+ return os << "tcp:" << a.host << ":" << a.port;
+}
+
+std::istream& operator>>(std::istream&, const TcpAddress&);
+
+Url Url::getHostNameUrl(uint16_t port) {
char name[HOST_NAME_MAX];
if (::gethostname(name, sizeof(name)) != 0)
- throw Exception(QPID_MSG("Cannot get host name: " << strError(errno)));
+ throw InvalidUrl(QPID_MSG("Cannot get host name: " << strError(errno)));
return Url(TcpAddress(name, port));
}
@@ -66,9 +72,12 @@ Url Url::getIpAddressesUrl(uint16_t port) {
}
string Url::str() const {
- ostringstream os;
- os << *this;
- return os.str();
+ if (cache.empty() && !this->empty()) {
+ ostringstream os;
+ os << *this;
+ cache = os.str();
+ }
+ return cache;
}
ostream& operator<<(ostream& os, const Url& url) {
@@ -140,13 +149,22 @@ struct UrlGrammar : public grammar<UrlGrammar>
};
void Url::parse(const char* url) {
+ cache.clear();
if (!boost::spirit::parse(url, UrlGrammar(*this)).full)
throw InvalidUrl(string("Invalid AMQP url: ")+url);
}
void Url::parseNoThrow(const char* url) {
+ cache.clear();
if (!boost::spirit::parse(url, UrlGrammar(*this)).full)
clear();
}
+std::istream& operator>>(std::istream& is, Url& url) {
+ std::string s;
+ is >> s;
+ url.parse(s);
+ return is;
+}
+
} // namespace qpid
diff --git a/cpp/src/qpid/Url.h b/cpp/src/qpid/Url.h
index 5cdd30399b..2e24ba948d 100644
--- a/cpp/src/qpid/Url.h
+++ b/cpp/src/qpid/Url.h
@@ -42,19 +42,16 @@ inline bool operator==(const TcpAddress& x, const TcpAddress& y) {
return y.host==x.host && y.port == x.port;
}
-inline std::ostream& operator<<(std::ostream& os, const TcpAddress& a) {
- return os << "tcp:" << a.host << ":" << a.port;
-}
+std::ostream& operator<<(std::ostream& os, const TcpAddress& a);
-/** Address is a variant of all address types. */
+/** Address is a variant of all address types, more coming in future. */
typedef boost::variant<TcpAddress> Address;
-
/** An AMQP URL contains a list of addresses */
struct Url : public std::vector<Address> {
/** Url with the hostname as returned by gethostname(2) */
- static Url getHostnameUrl(uint16_t port);
+ static Url getHostNameUrl(uint16_t port);
/** Url with local IP address(es), may be more than one address
* on a multi-homed host. */
@@ -79,20 +76,30 @@ struct Url : public std::vector<Address> {
/** Parse url, throw InvalidUrl if invalid. */
explicit Url(const char* url) { parse(url); }
+ template<class T> Url& operator=(T s) { parse(s); return *this; }
+
/** Replace contents with parsed URL as defined in
* https://wiki.108.redhat.com/jira/browse/AMQP-95
*@exception InvalidUrl if the url is invalid.
*/
void parse(const char* url);
+ void parse(const std::string& url) { parse(url.c_str()); }
/** Replace contesnts with parsed URL as defined in
* https://wiki.108.redhat.com/jira/browse/AMQP-95
* url.empty() will be true if url is invalid.
*/
void parseNoThrow(const char* url);
+
+ private:
+ mutable std::string cache; // cache string form for efficiency.
};
+inline bool operator==(const Url& a, const Url& b) { return a.str()==b.str(); }
+inline bool operator!=(const Url& a, const Url& b) { return a.str()!=b.str(); }
+
std::ostream& operator<<(std::ostream& os, const Url& url);
+std::istream& operator>>(std::istream& is, Url& url);
} // namespace qpid
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 06a0d33a85..3ba07a180a 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -33,7 +33,6 @@
#include "qpid/management/ArgsBrokerEcho.h"
#include "qpid/log/Statement.h"
-#include "qpid/Url.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Acceptor.h"
@@ -61,7 +60,7 @@ namespace broker {
Broker::Options::Options(const std::string& name) :
qpid::Options(name),
- port(TcpAddress::DEFAULT_PORT),
+ port(DEFAULT_PORT),
workerThreads(5),
maxConnections(500),
connectionBacklog(10),
@@ -221,10 +220,6 @@ Broker::~Broker() {
uint16_t Broker::getPort() const { return getAcceptor().getPort(); }
-std::string Broker::getUrl() const {
- return Url(TcpAddress(getAcceptor().getHost(), getPort())).str();
-}
-
Acceptor& Broker::getAcceptor() const {
if (!acceptor) {
const_cast<Acceptor::shared_ptr&>(acceptor) =
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 1917da83cc..fb4b9916da 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -36,7 +36,6 @@
#include "qpid/management/Broker.h"
#include "qpid/Options.h"
#include "qpid/Plugin.h"
-#include "qpid/Url.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
@@ -48,6 +47,8 @@
namespace qpid {
namespace broker {
+static const uint16_t DEFAULT_PORT=5672;
+
/**
* A broker instance.
*/
@@ -72,7 +73,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M
Broker(const Options& configuration);
static shared_ptr<Broker> create(const Options& configuration);
- static shared_ptr<Broker> create(int16_t port = TcpAddress::DEFAULT_PORT);
+ static shared_ptr<Broker> create(int16_t port = DEFAULT_PORT);
/**
* Return listening port. If called before bind this is
@@ -82,9 +83,6 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M
*/
virtual uint16_t getPort() const;
- /** Return the broker's URL. */
- virtual std::string getUrl() const;
-
/**
* Run the broker. Implements Runnable::run() so the broker
* can be run in a separate thread.
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index ce87d23c0d..49270bcfef 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -47,7 +47,7 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
return out;
}
-Cluster::Cluster(const std::string& name_, const std::string& url_, broker::Broker&) :
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) :
FrameHandler(0), // FIXME aconway 2008-01-29: handler. + observer
cpg(*this),
name(name_),
@@ -87,7 +87,7 @@ void Cluster::handle(AMQFrame& frame) {
}
void Cluster::notify() {
- AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url));
+ AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
handle(frame);
}
@@ -143,7 +143,7 @@ void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
{
Mutex::ScopedLock l(lock);
members[from].url=notifyIn->getUrl();
- if (!self.id && notifyIn->getUrl() == url)
+ if (!self.id && notifyIn->getUrl() == url.str())
self=from;
lock.notifyAll();
QPID_LOG(trace, *this << ": members joined: " << members);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 5aca3faf44..e9809f2264 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -27,6 +27,8 @@
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "qpid/log/Logger.h"
+#include "qpid/Url.h"
+
#include <boost/optional.hpp>
#include <boost/function.hpp>
@@ -50,8 +52,8 @@ class Cluster : public framing::FrameHandler,
public:
/** Details of a cluster member */
struct Member {
- Member(const std::string& url_=std::string()) : url(url_) {}
- std::string url; ///< Broker address.
+ Member(const Url& url_=Url()) : url(url_) {}
+ Url url; ///< Broker address.
};
typedef std::vector<Member> MemberList;
@@ -61,7 +63,7 @@ class Cluster : public framing::FrameHandler,
* @param name of the cluster.
* @param url of this broker, sent to the cluster.
*/
- Cluster(const std::string& name, const std::string& url, broker::Broker&);
+ Cluster(const std::string& name, const Url& url, broker::Broker&);
virtual ~Cluster();
@@ -115,7 +117,7 @@ class Cluster : public framing::FrameHandler,
mutable sys::Monitor lock;
Cpg cpg;
Cpg::Name name;
- std::string url;
+ Url url;
Id self;
MemberMap members;
sys::Thread dispatcher;
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index e24c60dc2f..e6b5f1a0bd 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -15,6 +15,10 @@
* limitations under the License.
*
*/
+#include <boost/program_options/value_semantic.hpp>
+
+
+
#include "qpid/broker/Broker.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/Plugin.h"
@@ -24,21 +28,31 @@
#include <boost/optional.hpp>
#include <boost/utility/in_place_factory.hpp>
+
namespace qpid {
namespace cluster {
using namespace std;
-struct ClusterPlugin : public Plugin {
+struct ClusterOptions : public Options {
+ string name;
+ string url;
- struct ClusterOptions : public Options {
- string clusterName;
- ClusterOptions() : Options("Cluster Options") {
- addOptions()
- ("cluster", optValue(clusterName, "NAME"),
- "Joins the cluster named NAME");
- }
- };
+ ClusterOptions() : Options("Cluster Options") {
+ addOptions()
+ ("cluster-name", optValue(name, "NAME"), "Name of cluster to join")
+ ("cluster-url", optValue(url,"URL"),
+ "URL of this broker, advertized to the cluster.\n"
+ "Defaults to a URL listing all the local IP addresses\n");
+ }
+
+ Url getUrl(uint16_t port) const {
+ if (url.empty()) return Url::getIpAddressesUrl(port);
+ return Url(url);
+ }
+};
+
+struct ClusterPlugin : public Plugin {
ClusterOptions options;
boost::optional<Cluster> cluster;
@@ -50,10 +64,12 @@ struct ClusterPlugin : public Plugin {
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker, and only if the --cluster config is set.
- if (broker && !options.clusterName.empty()) {
+ if (broker && !options.name.empty()) {
assert(!cluster); // A process can only belong to one cluster.
- cluster = boost::in_place(options.clusterName, broker->getUrl(), boost::ref(*broker));
- // broker->add(make_shared_ptr(&cluster->getHandlerUpdater(), nullDeleter));
+ cluster = boost::in_place(options.name,
+ options.getUrl(broker->getPort()),
+ boost::ref(*broker));
+ // FIXME aconway 2008-02-01: Add observer.
}
}
};
diff --git a/cpp/src/tests/Url.cpp b/cpp/src/tests/Url.cpp
index 5d376e3439..bc07737520 100644
--- a/cpp/src/tests/Url.cpp
+++ b/cpp/src/tests/Url.cpp
@@ -33,7 +33,7 @@ BOOST_AUTO_TEST_CASE(testUrl_str) {
url.push_back(TcpAddress("foo.com"));
url.push_back(TcpAddress("bar.com", 6789));
BOOST_CHECK_EQUAL("amqp:tcp:foo.com:5672,tcp:bar.com:6789", url.str());
- BOOST_CHECK_EQUAL("amqp:", Url().str());
+ BOOST_CHECK(Url().str().empty());
}