diff options
author | Alan Conway <aconway@apache.org> | 2008-09-10 18:15:25 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-10 18:15:25 +0000 |
commit | 0b778c328001d25b3118450c0bfabb3e0b918971 (patch) | |
tree | f9f385408887017cf0499a837a0a46a82b0ce965 | |
parent | 71652d22061dd8de9c504c5d670bb15e858e5297 (diff) | |
download | qpid-python-0b778c328001d25b3118450c0bfabb3e0b918971.tar.gz |
Cluster support for copying shared broker state to new members.
cluster/DumpClient: Copies broker shared state to a new broker via AMQP.
broker/*Registry, Queue, QueueBindings: Added iteration functions for DumpClient
broker/SemanticState.cpp: Allow DumpClient to sidestep setting of delivery-properties.exchange.
client/Connection.h: Added Connection::open(Url) overload.
client/SessionImpl: Added send(AMQBody, FrameSet) overload for forwarding broker messages.
tests/cluster_test.cpp: Added test for DumpClient copying shared state between brokers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@693918 13f79535-47bb-0310-9956-ffa450edef68
26 files changed, 576 insertions, 101 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index cb07804550..334cc907e3 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -27,9 +27,11 @@ libqpidcluster_la_SOURCES = \ qpid/cluster/OutputInterceptor.cpp \ qpid/cluster/ProxyInputHandler.h \ qpid/cluster/Event.h \ - qpid/cluster/Event.cpp + qpid/cluster/Event.cpp \ + qpid/cluster/DumpClient.h \ + qpid/cluster/DumpClient.cpp -libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la +libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la else # Empty stub library to satisfy rpm spec file. diff --git a/cpp/src/qpid/Url.h b/cpp/src/qpid/Url.h index 97b72ea993..67c8a861aa 100644 --- a/cpp/src/qpid/Url.h +++ b/cpp/src/qpid/Url.h @@ -45,7 +45,11 @@ inline bool operator==(const TcpAddress& x, const TcpAddress& y) { std::ostream& operator<<(std::ostream& os, const TcpAddress& a); /** Address is a variant of all address types, more coming in future. */ -typedef boost::variant<TcpAddress> Address; +struct Address : public boost::variant<TcpAddress> { + template <class T> Address(const T& t) : boost::variant<TcpAddress>(t) {} + template <class T> T* get() { return boost::get<T>(this); } + template <class T> const T* get() const { return boost::get<T>(this); } +}; /** An AMQP URL contains a list of addresses */ struct Url : public std::vector<Address> { diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index 7573e3e415..959e073ddc 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -22,51 +22,64 @@ * */ -#include <map> -#include <boost/function.hpp> #include "Exchange.h" #include "MessageStore.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" #include "qpid/management/Manageable.h" +#include <boost/function.hpp> +#include <boost/bind.hpp> + +#include <algorithm> +#include <map> + namespace qpid { namespace broker { - struct UnknownExchangeTypeException{}; - - class ExchangeRegistry{ - public: - typedef boost::function4<Exchange::shared_ptr, const std::string&, - bool, const qpid::framing::FieldTable&, qpid::management::Manageable*> FactoryFunction; - - ExchangeRegistry () : parent(0) {} - std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) - throw(UnknownExchangeTypeException); - std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type, - bool durable, const qpid::framing::FieldTable& args = framing::FieldTable()) - throw(UnknownExchangeTypeException); - void destroy(const std::string& name); - Exchange::shared_ptr get(const std::string& name); - Exchange::shared_ptr getDefault(); - - /** - * Register the manageable parent for declared exchanges - */ - void setParent (management::Manageable* _parent) { parent = _parent; } - - void registerType(const std::string& type, FactoryFunction); - private: - typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap; - typedef std::map<std::string, FactoryFunction > FunctionMap; - - ExchangeMap exchanges; - FunctionMap factory; - qpid::sys::RWlock lock; - management::Manageable* parent; - - }; -} -} + +struct UnknownExchangeTypeException{}; + +class ExchangeRegistry{ + public: + typedef boost::function4<Exchange::shared_ptr, const std::string&, + bool, const qpid::framing::FieldTable&, qpid::management::Manageable*> FactoryFunction; + + ExchangeRegistry () : parent(0) {} + std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) + throw(UnknownExchangeTypeException); + std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type, + bool durable, const qpid::framing::FieldTable& args = framing::FieldTable()) + throw(UnknownExchangeTypeException); + void destroy(const std::string& name); + Exchange::shared_ptr get(const std::string& name); + Exchange::shared_ptr getDefault(); + + /** + * Register the manageable parent for declared exchanges + */ + void setParent (management::Manageable* _parent) { parent = _parent; } + + void registerType(const std::string& type, FactoryFunction); + + /** Call f for each exchange in the registry. */ + template <class F> void eachExchange(const F& f) const { + qpid::sys::RWlock::ScopedWlock l(lock); + std::for_each(exchanges.begin(), exchanges.end(), + boost::bind(f, boost::bind(&ExchangeMap::value_type::second, _1))); + } + + private: + typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap; + typedef std::map<std::string, FactoryFunction > FunctionMap; + + ExchangeMap exchanges; + FunctionMap factory; + mutable qpid::sys::RWlock lock; + management::Manageable* parent; + +}; + +}} // namespace qpid::broker #endif /*!_broker_ExchangeRegistry_h*/ diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 0a95fedea6..a5ea0a7a37 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -84,6 +84,11 @@ public: return p->get<T>(true); } + template <class T> const T* hasProperties() const { + const qpid::framing::AMQHeaderBody* p = frames.getHeaders(); + return p->get<T>(); + } + template <class T> const T* getMethod() const { return frames.as<T>(); } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 65d01a6888..274a29e9eb 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -34,14 +34,15 @@ #include "qpid/management/Queue.h" #include "qpid/framing/amqp_types.h" +#include <boost/shared_ptr.hpp> +#include <boost/enable_shared_from_this.hpp> +#include <boost/intrusive_ptr.hpp> + #include <list> #include <vector> #include <memory> #include <deque> - -#include <boost/shared_ptr.hpp> -#include <boost/enable_shared_from_this.hpp> -#include <boost/intrusive_ptr.hpp> +#include <algorithm> namespace qpid { namespace broker { @@ -172,6 +173,8 @@ namespace qpid { inline const framing::FieldTable& getSettings() const { return settings; } inline bool isAutoDelete() const { return autodelete; } bool canAutoDelete() const; + const QueueBindings& getBindings() const { return bindings; } + bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg); /** @@ -205,6 +208,17 @@ namespace qpid { management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); + + /** Apply f to each Message on the queue. */ + template <class F> void eachMessage(const F& f) const { + sys::Mutex::ScopedLock l(messageLock); + std::for_each(messages.begin(), messages.end(), f); + } + + /** Apply f to each QueueBinding on the queue */ + template <class F> void eachBinding(const F& f) { + bindings.eachBinding(f); + } }; } } diff --git a/cpp/src/qpid/broker/QueueBindings.cpp b/cpp/src/qpid/broker/QueueBindings.cpp index 95e529f47e..6a1fa6aca3 100644 --- a/cpp/src/qpid/broker/QueueBindings.cpp +++ b/cpp/src/qpid/broker/QueueBindings.cpp @@ -29,7 +29,7 @@ using namespace qpid::broker; void QueueBindings::add(const string& exchange, const string& key, const FieldTable& args) { - bindings.push_back(new Binding(exchange, key, args)); + bindings.push_back(QueueBinding(exchange, key, args)); } void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue) @@ -37,11 +37,10 @@ void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue) for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { try { exchanges.get(i->exchange)->unbind(queue, i->key, &(i->args)); - } catch (const NotFoundException&) { - } + } catch (const NotFoundException&) {} } } -QueueBindings::Binding::Binding(const string& _exchange, const string& _key, const FieldTable& _args) +QueueBinding::QueueBinding(const string& _exchange, const string& _key, const FieldTable& _args) : exchange(_exchange), key(_key), args(_args) {} diff --git a/cpp/src/qpid/broker/QueueBindings.h b/cpp/src/qpid/broker/QueueBindings.h index b9b0f7c15c..bcebb5bf0a 100644 --- a/cpp/src/qpid/broker/QueueBindings.h +++ b/cpp/src/qpid/broker/QueueBindings.h @@ -24,32 +24,38 @@ #include "qpid/framing/FieldTable.h" #include <boost/ptr_container/ptr_list.hpp> #include <boost/shared_ptr.hpp> +#include <algorithm> namespace qpid { namespace broker { class ExchangeRegistry; class Queue; + +struct QueueBinding{ + std::string exchange; + std::string key; + qpid::framing::FieldTable args; + QueueBinding(const std::string& exchange, const std::string& key, const qpid::framing::FieldTable& args); +}; + class QueueBindings { - struct Binding{ - const std::string exchange; - const std::string key; - const qpid::framing::FieldTable args; - Binding(const std::string& exchange, const std::string& key, const qpid::framing::FieldTable& args); - }; - - typedef boost::ptr_list<Binding> Bindings; - Bindings bindings; + public: -public: + /** Apply f to each QueueBinding. */ + template <class F> void eachBinding(const F& f) const { std::for_each(bindings.begin(), bindings.end(), f); } + void add(const std::string& exchange, const std::string& key, const qpid::framing::FieldTable& args); void unbind(ExchangeRegistry& exchanges, boost::shared_ptr<Queue> queue); + + private: + typedef std::vector<QueueBinding> Bindings; + Bindings bindings; }; -} -} +}} // namespace qpid::broker #endif diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index f7be1c551a..2dc5d9c534 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -21,10 +21,12 @@ #ifndef _QueueRegistry_ #define _QueueRegistry_ -#include <map> -#include "qpid/sys/Mutex.h" #include "Queue.h" +#include "qpid/sys/Mutex.h" #include "qpid/management/Manageable.h" +#include <boost/bind.hpp> +#include <algorithm> +#include <map> namespace qpid { namespace broker { @@ -98,11 +100,18 @@ class QueueRegistry{ * Register the manageable parent for declared queues */ void setParent (management::Manageable* _parent) { parent = _parent; } + + /** Call f for each queue in the registry. */ + template <class F> void eachQueue(const F& f) const { + qpid::sys::RWlock::ScopedWlock l(lock); + std::for_each(queues.begin(), queues.end(), + boost::bind(f, boost::bind(&QueueMap::value_type::second, _1))); + } private: typedef std::map<string, Queue::shared_ptr> QueueMap; QueueMap queues; - qpid::sys::RWlock lock; + mutable qpid::sys::RWlock lock; int counter; MessageStore* store; management::Manageable* parent; @@ -112,8 +121,7 @@ private: }; -} -} +}} // namespace qpid::broker #endif diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 3185080f94..675a6a304c 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -353,8 +353,14 @@ void SemanticState::handle(intrusive_ptr<Message> msg) { void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); //TODO: the following should be hidden behind message (using MessageAdapter or similar) + + // Do not replace the delivery-properties.exchange if it is is already set. + // This is used internally (by the cluster) to force the exchange name on a message. + // The client library ensures this is always empty for messages from normal clients. if (msg->isA<MessageTransferBody>()) { - msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); + if (!msg->hasProperties<DeliveryProperties>() || + msg->getProperties<DeliveryProperties>()->getExchange().empty()) + msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); } if (!cacheExchange || cacheExchange->getName() != exchangeName){ cacheExchange = session.getBroker().getExchanges().get(exchangeName); @@ -392,11 +398,6 @@ void SemanticState::requestDispatch(ConsumerImpl& c) { if(c.isBlocked()) outputTasks.activateOutput(); - // TODO aconway 2008-07-16: we could directly call - // c.doOutput(); - // since we are in the connections thread but for consistency - // activateOutput() will set it up to be called in the next write idle. - // Current cluster code depends on this, review cluster code to change. } void SemanticState::complete(DeliveryRecord& delivery) diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index c71fe068ce..d72d1974c2 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -117,7 +117,6 @@ class SessionState : public qpid::SessionState, SessionHandler* handler; sys::AbsTime expiry; // Used by SessionManager. bool ignoring; - std::string name; SemanticState semanticState; SessionAdapter adapter; MessageBuilder msgBuilder; diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index 6572794516..e19cfea059 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -23,6 +23,7 @@ #include "Message.h" #include "SessionImpl.h" #include "SessionBase_0_10Access.h" +#include "qpid/Url.h" #include "qpid/log/Logger.h" #include "qpid/log/Options.h" #include "qpid/log/Statement.h" @@ -48,6 +49,37 @@ Connection::Connection() : channelIdCounter(0), version(framing::highestProtocol Connection::~Connection(){ } void Connection::open( + const Url& url, + const std::string& uid, const std::string& pwd, + const std::string& vhost, + uint16_t maxFrameSize) +{ + if (url.empty()) + throw Exception(QPID_MSG("Attempt to open URL with no addresses.")); + Url::const_iterator i = url.begin(); + do { + const TcpAddress* tcp = i->get<TcpAddress>(); + i++; + if (tcp) { + try { + ConnectionSettings settings; + settings.host = tcp->host; + settings.port = tcp->port; + settings.username = uid; + settings.password = pwd; + settings.virtualhost = vhost; + settings.maxFrameSize = maxFrameSize; + open(settings); + break; + } + catch (const Exception& e) { + if (i == url.end()) throw; + } + } + } while (i != url.end()); +} + +void Connection::open( const std::string& host, int port, const std::string& uid, const std::string& pwd, const std::string& vhost, diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index ee543e20d2..e8ff2d8660 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -1,5 +1,5 @@ -#ifndef _client_Connection_ -#define _client_Connection_ +#ifndef QPID_CLIENT_CONNECTION_H +#define QPID_CLIENT_CONNECTION_H /* * @@ -26,6 +26,9 @@ #include "qpid/client/Session.h" namespace qpid { + +class Url; + namespace client { class ConnectionSettings; @@ -77,6 +80,28 @@ class Connection const std::string& virtualhost = "/", uint16_t maxFrameSize=65535); /** + * Opens a connection to a broker using a URL. + * If the URL contains multiple addresses, try each in turn + * till connection is successful. + * + * @url address of the broker to connect to. + * + * @param uid the userid to connect with. + * + * @param pwd the password to connect with (currently SASL + * PLAIN is the only authentication method supported so this + * is sent in clear text). + * + * @param virtualhost the AMQP virtual host to use (virtual + * hosts, where implemented(!), provide namespace partitioning + * within a single broker). + */ + void open(const Url& url, + const std::string& uid = "guest", + const std::string& pwd = "guest", + const std::string& virtualhost = "/", uint16_t maxFrameSize=65535); + + /** * Opens a connection to a broker. * * @param the settings to use (host, port etc) @see ConnectionSettings @@ -146,4 +171,4 @@ class Connection }} // namespace qpid::client -#endif +#endif /*!QPID_CLIENT_CONNECTION_H*/ diff --git a/cpp/src/qpid/client/Message.cpp b/cpp/src/qpid/client/Message.cpp index d5464594ee..13caaecefd 100644 --- a/cpp/src/qpid/client/Message.cpp +++ b/cpp/src/qpid/client/Message.cpp @@ -24,9 +24,7 @@ namespace qpid { namespace client { -Message::Message(const std::string& data_, - const std::string& routingKey, - const std::string& exchange) : TransferContent(data_, routingKey, exchange) {} +Message::Message(const std::string& data, const std::string& routingKey) : TransferContent(data, routingKey) {} std::string Message::getDestination() const { diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h index 4e6ed49bb4..18efdd20f7 100644 --- a/cpp/src/qpid/client/Message.h +++ b/cpp/src/qpid/client/Message.h @@ -40,11 +40,9 @@ public: /** Create a Message. *@param data Data for the message body. *@param routingKey Passed to the exchange that routes the message. - *@param exchange Name of the exchange that should route the message. */ Message(const std::string& data=std::string(), - const std::string& routingKey=std::string(), - const std::string& exchange=std::string()); + const std::string& routingKey=std::string()); /** The destination of messages sent to the broker is the exchange * name. The destination of messages received from the broker is diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index aef546c427..68955050b4 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -28,9 +28,11 @@ #include "qpid/framing/ClientInvoker.h" #include "qpid/framing/enum.h" #include "qpid/framing/FrameSet.h" +#include "qpid/framing/AMQFrame.h" #include "qpid/framing/MethodContent.h" #include "qpid/framing/SequenceSet.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/DeliveryProperties.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> @@ -272,6 +274,41 @@ Future SessionImpl::send(const AMQBody& command, const MethodContent& content) return sendCommand(command, &content); } +namespace { +// Functor for FrameSet::map to send header + content frames but, not method frames. +struct SendContentFn { + FrameHandler& handler; + void operator()(const AMQFrame& f) { + if (!f.getMethod()) + handler(const_cast<AMQFrame&>(f)); + } + SendContentFn(FrameHandler& h) : handler(h) {} +}; +} + +Future SessionImpl::send(const AMQBody& command, const FrameSet& content) { + Acquire a(sendLock); + SequenceNumber id = nextOut++; + { + Lock l(state); + checkOpen(); + incompleteOut.add(id); + } + Future f(id); + if (command.getMethod()->resultExpected()) { + Lock l(state); + //result listener must be set before the command is sent + f.setFutureResult(results.listenForResult(id)); + } + AMQFrame frame(command); + frame.setEof(false); + handleOut(frame); + + SendContentFn send(out); + content.map(send); + return f; +} + Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content) { Acquire a(sendLock); @@ -297,9 +334,16 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con } return f; } + void SessionImpl::sendContent(const MethodContent& content) { AMQFrame header(content.getHeader()); + + // Client is not allowed to set the delivery-properties.exchange. + AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody()); + if (headerp && headerp->get<DeliveryProperties>()) + headerp->get<DeliveryProperties>(true)->clearExchangeFlag(); + header.setFirstSegment(false); uint64_t data_length = content.getData().length(); if(data_length > 0){ diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 55031a94ae..c63774a23a 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -81,6 +81,7 @@ public: Future send(const framing::AMQBody& command); Future send(const framing::AMQBody& command, const framing::MethodContent& content); + Future send(const framing::AMQBody& command, const framing::FrameSet& content); Demux& getDemux(); void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 4c0a768c4f..ce156e85e4 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -25,7 +25,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterUrlNoticeBody.h" +#include "qpid/framing/ClusterJoiningBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" @@ -50,7 +50,13 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { Cluster& cluster; MemberId member; ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {} - void urlNotice(const std::string& u) { cluster.urlNotice (member, u); } + void joining(const std::string& u) { cluster.joining (member, u); } + void ready() { cluster.ready(member); } + + void members(const framing::FieldTable& , const framing::FieldTable& , const framing::FieldTable& ) { + assert(0); // Not passed to cluster, used to start a brain dump over TCP. + } + bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); } }; @@ -237,7 +243,7 @@ void Cluster::configChange( if (nJoined) // Notfiy new members of my URL. mcastFrame( - AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())), + AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())), ConnectionId(self,0)); @@ -261,11 +267,15 @@ void Cluster::disconnect(sys::DispatchHandle& h) { broker->shutdown(); } -void Cluster::urlNotice(const MemberId& m, const string& url) { +void Cluster::joining(const MemberId& m, const string& url) { QPID_LOG(notice, "Cluster member " << m << " has URL " << url); urls.insert(UrlMap::value_type(m,Url(url))); } +void Cluster::ready(const MemberId& ) { + // FIXME aconway 2008-09-08: TODO +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 0cd916d5fb..a25b62ea12 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -75,7 +75,8 @@ class Cluster : public RefCounted, private Cpg::Handler /** Leave the cluster */ void leave(); - void urlNotice(const MemberId&, const std::string& url); + void joining(const MemberId&, const std::string& url); + void ready(const MemberId&); broker::Broker& getBroker() { assert(broker); return *broker; } diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index d829683000..31447f2fd0 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -26,6 +26,7 @@ #include "qpid/Plugin.h" #include "qpid/Options.h" #include "qpid/shared_ptr.h" +#include "qpid/log/Statement.h" #include <boost/utility/in_place_factory.hpp> @@ -75,7 +76,7 @@ struct ClusterPlugin : public Plugin { void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified. - if (cluster) throw Exception("Cluster plugin cannot be initialized twice in one process."); + QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of cluster plugin."); cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker); broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this)); broker->setConnectionFactory( diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index e6372e80ea..b3e151ce51 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -89,6 +89,18 @@ class Connection : // ConnectionInputHandlerFactory sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient); + // State dump methods. + virtual void sessionState(const framing::SequenceNumber& /*replayId*/, + const framing::SequenceNumber& /*sendId*/, + const framing::SequenceSet& /*sentIncomplete*/, + const framing::SequenceNumber& /*expectedId*/, + const framing::SequenceNumber& /*receivedId*/, + const framing::SequenceSet& /*unknownCompleted*/, + const framing::SequenceSet& /*receivedIncomplete*/) {} + + virtual void shadowReady(uint64_t /*clusterId*/, + const std::string& /*userId*/) {} + private: void sendDoOutput(); diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp new file mode 100644 index 0000000000..5b92552209 --- /dev/null +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DumpClient.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/ExchangeRegistry.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/enum.h" +#include "qpid/Url.h" +#include <boost/bind.hpp> + +namespace qpid { +namespace cluster { + +using broker::Broker; +using broker::Exchange; +using broker::Queue; +using broker::QueueBinding; +using broker::Message; +using namespace framing::message; + +using namespace client; + +DumpClient::DumpClient(const Url& url) { + connection.open(url); + session = connection.newSession(); +} + +DumpClient::~DumpClient() { + session.close(); + connection.close(); +} + +// Catch-up exchange name: an illegal AMQP exchange name to avoid clashes. +static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange"; +static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); + +void DumpClient::dump(Broker& donor) { + // TODO aconway 2008-09-08: Caller must handle exceptions + // FIXME aconway 2008-09-08: send cluster map frame first. + donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); + // Catch-up exchange is used to route messages to the proper queue without modifying routing key. + session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true); + donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); + session.sync(); +} + +void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) { + session.exchangeDeclare( + ex->getName(), ex->getType(), + ex->getAlternate() ? ex->getAlternate()->getName() : std::string(), + arg::passive=false, + arg::durable=ex->isDurable(), + arg::autoDelete=false, + arg::arguments=ex->getArgs()); +} + +void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) { + session.queueDeclare( + q->getName(), + q->getAlternateExchange() ? q->getAlternateExchange()->getName() : std::string(), + arg::passive=false, + arg::durable=q->isDurable(), + arg::exclusive=q->hasExclusiveConsumer(), + arg::autoDelete=q->isAutoDelete(), + arg::arguments=q->getSettings()); + + session.exchangeBind(q->getName(), CATCH_UP, std::string()); + q->eachMessage(boost::bind(&DumpClient::dumpMessage, this, _1)); + session.exchangeUnbind(q->getName(), CATCH_UP, std::string()); + q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1)); +} + +void DumpClient::dumpMessage(const broker::QueuedMessage& message) { + SessionBase_0_10Access sb(session); + framing::MessageTransferBody transfer(framing::ProtocolVersion(), CATCH_UP, ACCEPT_MODE_NONE, ACQUIRE_MODE_PRE_ACQUIRED); + sb.get()->send(transfer, message.payload->getFrames()); +} + +void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& binding) { + session.exchangeBind(queue, binding.exchange, binding.key, binding.args); +} + + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h new file mode 100644 index 0000000000..447fd1abef --- /dev/null +++ b/cpp/src/qpid/cluster/DumpClient.h @@ -0,0 +1,74 @@ +#ifndef QPID_CLUSTER_DUMPCLIENT_H +#define QPID_CLUSTER_DUMPCLIENT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/Connection.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/ExchangeRegistry.h" +#include <boost/shared_ptr.hpp> + + +namespace qpid { + +class Url; + +namespace broker { + +class Broker; +class Queue; +class Exchange; +class QueueBindings; +class QueueBinding; +class QueuedMessage; +} // namespace broker + +namespace cluster { + +/** + * A client that dumps the contents of a local broker to a remote one using AMQP. + */ +class DumpClient { + public: + DumpClient(const Url& receiver); + ~DumpClient(); + + void dump(broker::Broker& donor); + + private: + void dumpQueue(const boost::shared_ptr<broker::Queue>&); + void dumpExchange(const boost::shared_ptr<broker::Exchange>&); + void dumpMessage(const broker::QueuedMessage&); + void dumpBinding(const std::string& queue, const broker::QueueBinding& binding); + + private: + client::Connection connection; + client::AsyncSession session; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_DUMPCLIENT_H*/ diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp index 42af2027eb..3fc54296fa 100644 --- a/cpp/src/qpid/framing/TransferContent.cpp +++ b/cpp/src/qpid/framing/TransferContent.cpp @@ -24,15 +24,12 @@ namespace qpid { namespace framing { -TransferContent::TransferContent(const std::string& data, - const std::string& routingKey, - const std::string& exchange) -{ +TransferContent::TransferContent(const std::string& data, const std::string& key) { setData(data); - if (routingKey.size()) getDeliveryProperties().setRoutingKey(routingKey); - if (exchange.size()) getDeliveryProperties().setExchange(exchange); + if (!key.empty()) getDeliveryProperties().setRoutingKey(key); } + AMQHeaderBody TransferContent::getHeader() const { return header; diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h index 7630421bd4..e3f6666fa4 100644 --- a/cpp/src/qpid/framing/TransferContent.h +++ b/cpp/src/qpid/framing/TransferContent.h @@ -36,9 +36,7 @@ class TransferContent : public MethodContent AMQHeaderBody header; std::string data; public: - TransferContent(const std::string& data = std::string(), - const std::string& routingKey = std::string(), - const std::string& exchange = std::string()); + TransferContent(const std::string& data = std::string(), const std::string& key=std::string()); ///@internal AMQHeaderBody getHeader() const; diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 9abc1b189e..d082d74367 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -23,6 +23,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/Cluster.h" +#include "qpid/cluster/DumpClient.h" #include "qpid/framing/AMQBody.h" #include "qpid/client/Connection.h" #include "qpid/client/Session.h" @@ -51,6 +52,7 @@ using namespace qpid; using namespace qpid::cluster; using namespace qpid::framing; using namespace qpid::client; +using qpid::sys::TIME_SEC; using qpid::broker::Broker; using boost::ptr_vector; using qpid::cluster::Cluster; @@ -133,6 +135,86 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } +QPID_AUTO_TEST_CASE(testDumpClient) { + BrokerFixture donor, receiver; + { + Client c(donor.getPort()); + FieldTable args; + args.setString("x", "y"); + c.session.queueDeclare("qa", arg::arguments=args); + c.session.queueDeclare("qb", arg::alternateExchange="amq.direct"); + + c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", arg::arguments=args); + c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", arg::bindingKey="foo"); + c.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("one", "foo")); + + c.session.exchangeDeclare("ext", arg::type="topic"); + c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", arg::bindingKey="bar"); + c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0)); + c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("one", "bar")); + c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("two", "bar")); + + c.session.close(); + c.connection.close(); + } + qpid::cluster::DumpClient dump(Url::getIpAddressesUrl(receiver.getPort())); + dump.dump(*donor.broker); + { + Client r(receiver.getPort()); + // Verify exchanges + ExchangeQueryResult ex=r.session.exchangeQuery("exd"); + BOOST_CHECK_EQUAL(ex.getType(), "direct"); + BOOST_CHECK_EQUAL(ex.getDurable(), false); + BOOST_CHECK_EQUAL(ex.getNotFound(), false); + BOOST_CHECK_EQUAL(ex.getArguments().getString("x"), "y"); + + ex = r.session.exchangeQuery("ext"); + BOOST_CHECK_EQUAL(ex.getType(), "topic"); + BOOST_CHECK_EQUAL(ex.getNotFound(), false); + + // Verify queues + QueueQueryResult qq = r.session.queueQuery("qa"); + BOOST_CHECK_EQUAL(qq.getQueue(), "qa"); + BOOST_CHECK_EQUAL(qq.getAlternateExchange(), ""); + BOOST_CHECK_EQUAL(qq.getArguments().getString("x"), "y"); + BOOST_CHECK_EQUAL(qq.getMessageCount(), 1); + + qq = r.session.queueQuery("qb"); + BOOST_CHECK_EQUAL(qq.getQueue(), "qb"); + BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "amq.direct"); + BOOST_CHECK_EQUAL(qq.getMessageCount(), 2); + + // Verify messages + Message m; + BOOST_CHECK(r.subs.get(m, "qa", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "one"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "exd"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "foo"); + + BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "one"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar"); + + BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "two"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar"); + + // Verify bindings + r.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("xxx", "foo")); + BOOST_CHECK(r.subs.get(m, "qa")); + BOOST_CHECK_EQUAL(m.getData(), "xxx"); + + r.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("yyy", "bar")); + BOOST_CHECK(r.subs.get(m, "qb")); + BOOST_CHECK_EQUAL(m.getData(), "yyy"); + + r.session.close(); + r.connection.close(); + } +} + QPID_AUTO_TEST_CASE(testForkedBroker) { // Verify the ForkedBroker works as expected. const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" }; @@ -146,8 +228,7 @@ QPID_AUTO_TEST_CASE(testSingletonCluster) { ClusterFixture cluster(1); Client c(cluster[0]); BOOST_CHECK(c.session.queueQuery("q").getQueue().empty()); - BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty()); - // FIXME aconway 2008-09-01: leaks if aisexec not running, investigate. + BOOST_CHECK(c.session.exchangeQuery("ex").getNotFound()); } QPID_AUTO_TEST_CASE(testWiringReplication) { @@ -234,5 +315,4 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); } - QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index a5f9d51a13..d239bb11f3 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -22,20 +22,66 @@ o<?xml version="1.0"?> <amqp major="0" minor="10" port="5672"> + <!-- Controls sent between cluster nodes. --> + <class name = "cluster" code = "0x80" label="Qpid clustering extensions."> <doc>Qpid extension class to allow clustered brokers to communicate.</doc> - <control name = "url-notice" code="0x1"> - <field name="url-notice" type="str16" /> + <!-- Cluster membership --> + + <control name = "joining" code="0x1"> + <field name="joining" type="str16" label="URL of new member joining cluster."/> + </control> + + + <control name="ready" code="0x2" label="New member is ready."/> + + <control name="members" code="0x3" label="Cluster map sent to new members."> + <field name="members" type="map"/> <!-- member-id -> URL --> + <field name="donors" type="map"/> <!-- member-id -> uint32 (donor-count) --> + <field name="newbies" type="map"/> <!-- member-id -> URL --> </control> + + <!-- Transferring broker state --> + </class> + <!-- TODO aconway 2008-09-10: support for un-attached connections. --> + + <!-- Controls associated with a specific connection. --> + <class name="cluster-connection" code="0x81" label="Qpid clustering extensions."> + <control name="deliver-close" code="0x2"> </control> <control name="deliver-do-output" code="0x3"> <field name="bytes" type="uint32"/> </control> + + <!-- Brain-dump controls. Sent to a new broker in joining mode. + A connection is dumped as followed: + - open as a normal connection. + - attach sessions, create consumers, set flow with normal AMQP cokmmands. + - reset session state by sending session-state for each session. + - frames following session-state are replay frames. + - send shadow-ready to mark end of dump. + --> + <control name="session-state" code="0x4" label="Set session state during a brain dump."> + <!-- Target session deduced from channel number. --> + <field name="replay-id" type="sequence-no"/> + <field name="send-id" type="sequence-no"/> + <field name="sent-incomplete" type="sequence-set"/> + + <field name="expected-id" type="sequence-no"/> + <field name="received-id" type="sequence-no"/> + <field name="unknown-completed" type="sequence-set"/> + <field name="received-incomplete" type="sequence-set"/> + </control> + + <control name="shadow-ready" code="0x5" label="End of shadow connection dump."> + <field name="cluster-id" type="uint64"/> + <field name="user-id" type="vbin16"/> + </control> </class> </amqp> |