summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-10 18:15:25 +0000
committerAlan Conway <aconway@apache.org>2008-09-10 18:15:25 +0000
commit0b778c328001d25b3118450c0bfabb3e0b918971 (patch)
treef9f385408887017cf0499a837a0a46a82b0ce965 /cpp/src
parent71652d22061dd8de9c504c5d670bb15e858e5297 (diff)
downloadqpid-python-0b778c328001d25b3118450c0bfabb3e0b918971.tar.gz
Cluster support for copying shared broker state to new members.
cluster/DumpClient: Copies broker shared state to a new broker via AMQP. broker/*Registry, Queue, QueueBindings: Added iteration functions for DumpClient broker/SemanticState.cpp: Allow DumpClient to sidestep setting of delivery-properties.exchange. client/Connection.h: Added Connection::open(Url) overload. client/SessionImpl: Added send(AMQBody, FrameSet) overload for forwarding broker messages. tests/cluster_test.cpp: Added test for DumpClient copying shared state between brokers. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@693918 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/cluster.mk6
-rw-r--r--cpp/src/qpid/Url.h6
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h87
-rw-r--r--cpp/src/qpid/broker/Message.h5
-rw-r--r--cpp/src/qpid/broker/Queue.h22
-rw-r--r--cpp/src/qpid/broker/QueueBindings.cpp7
-rw-r--r--cpp/src/qpid/broker/QueueBindings.h30
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h18
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp13
-rw-r--r--cpp/src/qpid/broker/SessionState.h1
-rw-r--r--cpp/src/qpid/client/Connection.cpp32
-rw-r--r--cpp/src/qpid/client/Connection.h31
-rw-r--r--cpp/src/qpid/client/Message.cpp4
-rw-r--r--cpp/src/qpid/client/Message.h4
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp44
-rw-r--r--cpp/src/qpid/client/SessionImpl.h1
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp18
-rw-r--r--cpp/src/qpid/cluster/Cluster.h3
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp3
-rw-r--r--cpp/src/qpid/cluster/Connection.h12
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp107
-rw-r--r--cpp/src/qpid/cluster/DumpClient.h74
-rw-r--r--cpp/src/qpid/framing/TransferContent.cpp9
-rw-r--r--cpp/src/qpid/framing/TransferContent.h4
-rw-r--r--cpp/src/tests/cluster_test.cpp86
25 files changed, 528 insertions, 99 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index cb07804550..334cc907e3 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -27,9 +27,11 @@ libqpidcluster_la_SOURCES = \
qpid/cluster/OutputInterceptor.cpp \
qpid/cluster/ProxyInputHandler.h \
qpid/cluster/Event.h \
- qpid/cluster/Event.cpp
+ qpid/cluster/Event.cpp \
+ qpid/cluster/DumpClient.h \
+ qpid/cluster/DumpClient.cpp
-libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
+libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
else
# Empty stub library to satisfy rpm spec file.
diff --git a/cpp/src/qpid/Url.h b/cpp/src/qpid/Url.h
index 97b72ea993..67c8a861aa 100644
--- a/cpp/src/qpid/Url.h
+++ b/cpp/src/qpid/Url.h
@@ -45,7 +45,11 @@ inline bool operator==(const TcpAddress& x, const TcpAddress& y) {
std::ostream& operator<<(std::ostream& os, const TcpAddress& a);
/** Address is a variant of all address types, more coming in future. */
-typedef boost::variant<TcpAddress> Address;
+struct Address : public boost::variant<TcpAddress> {
+ template <class T> Address(const T& t) : boost::variant<TcpAddress>(t) {}
+ template <class T> T* get() { return boost::get<T>(this); }
+ template <class T> const T* get() const { return boost::get<T>(this); }
+};
/** An AMQP URL contains a list of addresses */
struct Url : public std::vector<Address> {
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index 7573e3e415..959e073ddc 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -22,51 +22,64 @@
*
*/
-#include <map>
-#include <boost/function.hpp>
#include "Exchange.h"
#include "MessageStore.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
#include "qpid/management/Manageable.h"
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+
+#include <algorithm>
+#include <map>
+
namespace qpid {
namespace broker {
- struct UnknownExchangeTypeException{};
-
- class ExchangeRegistry{
- public:
- typedef boost::function4<Exchange::shared_ptr, const std::string&,
- bool, const qpid::framing::FieldTable&, qpid::management::Manageable*> FactoryFunction;
-
- ExchangeRegistry () : parent(0) {}
- std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type)
- throw(UnknownExchangeTypeException);
- std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type,
- bool durable, const qpid::framing::FieldTable& args = framing::FieldTable())
- throw(UnknownExchangeTypeException);
- void destroy(const std::string& name);
- Exchange::shared_ptr get(const std::string& name);
- Exchange::shared_ptr getDefault();
-
- /**
- * Register the manageable parent for declared exchanges
- */
- void setParent (management::Manageable* _parent) { parent = _parent; }
-
- void registerType(const std::string& type, FactoryFunction);
- private:
- typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
- typedef std::map<std::string, FactoryFunction > FunctionMap;
-
- ExchangeMap exchanges;
- FunctionMap factory;
- qpid::sys::RWlock lock;
- management::Manageable* parent;
-
- };
-}
-}
+
+struct UnknownExchangeTypeException{};
+
+class ExchangeRegistry{
+ public:
+ typedef boost::function4<Exchange::shared_ptr, const std::string&,
+ bool, const qpid::framing::FieldTable&, qpid::management::Manageable*> FactoryFunction;
+
+ ExchangeRegistry () : parent(0) {}
+ std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type)
+ throw(UnknownExchangeTypeException);
+ std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type,
+ bool durable, const qpid::framing::FieldTable& args = framing::FieldTable())
+ throw(UnknownExchangeTypeException);
+ void destroy(const std::string& name);
+ Exchange::shared_ptr get(const std::string& name);
+ Exchange::shared_ptr getDefault();
+
+ /**
+ * Register the manageable parent for declared exchanges
+ */
+ void setParent (management::Manageable* _parent) { parent = _parent; }
+
+ void registerType(const std::string& type, FactoryFunction);
+
+ /** Call f for each exchange in the registry. */
+ template <class F> void eachExchange(const F& f) const {
+ qpid::sys::RWlock::ScopedWlock l(lock);
+ std::for_each(exchanges.begin(), exchanges.end(),
+ boost::bind(f, boost::bind(&ExchangeMap::value_type::second, _1)));
+ }
+
+ private:
+ typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
+ typedef std::map<std::string, FactoryFunction > FunctionMap;
+
+ ExchangeMap exchanges;
+ FunctionMap factory;
+ mutable qpid::sys::RWlock lock;
+ management::Manageable* parent;
+
+};
+
+}} // namespace qpid::broker
#endif /*!_broker_ExchangeRegistry_h*/
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 0a95fedea6..a5ea0a7a37 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -84,6 +84,11 @@ public:
return p->get<T>(true);
}
+ template <class T> const T* hasProperties() const {
+ const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+ return p->get<T>();
+ }
+
template <class T> const T* getMethod() const {
return frames.as<T>();
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 65d01a6888..274a29e9eb 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -34,14 +34,15 @@
#include "qpid/management/Queue.h"
#include "qpid/framing/amqp_types.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/intrusive_ptr.hpp>
+
#include <list>
#include <vector>
#include <memory>
#include <deque>
-
-#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
-#include <boost/intrusive_ptr.hpp>
+#include <algorithm>
namespace qpid {
namespace broker {
@@ -172,6 +173,8 @@ namespace qpid {
inline const framing::FieldTable& getSettings() const { return settings; }
inline bool isAutoDelete() const { return autodelete; }
bool canAutoDelete() const;
+ const QueueBindings& getBindings() const { return bindings; }
+
bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
/**
@@ -205,6 +208,17 @@ namespace qpid {
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
+
+ /** Apply f to each Message on the queue. */
+ template <class F> void eachMessage(const F& f) const {
+ sys::Mutex::ScopedLock l(messageLock);
+ std::for_each(messages.begin(), messages.end(), f);
+ }
+
+ /** Apply f to each QueueBinding on the queue */
+ template <class F> void eachBinding(const F& f) {
+ bindings.eachBinding(f);
+ }
};
}
}
diff --git a/cpp/src/qpid/broker/QueueBindings.cpp b/cpp/src/qpid/broker/QueueBindings.cpp
index 95e529f47e..6a1fa6aca3 100644
--- a/cpp/src/qpid/broker/QueueBindings.cpp
+++ b/cpp/src/qpid/broker/QueueBindings.cpp
@@ -29,7 +29,7 @@ using namespace qpid::broker;
void QueueBindings::add(const string& exchange, const string& key, const FieldTable& args)
{
- bindings.push_back(new Binding(exchange, key, args));
+ bindings.push_back(QueueBinding(exchange, key, args));
}
void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue)
@@ -37,11 +37,10 @@ void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue)
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
try {
exchanges.get(i->exchange)->unbind(queue, i->key, &(i->args));
- } catch (const NotFoundException&) {
- }
+ } catch (const NotFoundException&) {}
}
}
-QueueBindings::Binding::Binding(const string& _exchange, const string& _key, const FieldTable& _args)
+QueueBinding::QueueBinding(const string& _exchange, const string& _key, const FieldTable& _args)
: exchange(_exchange), key(_key), args(_args)
{}
diff --git a/cpp/src/qpid/broker/QueueBindings.h b/cpp/src/qpid/broker/QueueBindings.h
index b9b0f7c15c..bcebb5bf0a 100644
--- a/cpp/src/qpid/broker/QueueBindings.h
+++ b/cpp/src/qpid/broker/QueueBindings.h
@@ -24,32 +24,38 @@
#include "qpid/framing/FieldTable.h"
#include <boost/ptr_container/ptr_list.hpp>
#include <boost/shared_ptr.hpp>
+#include <algorithm>
namespace qpid {
namespace broker {
class ExchangeRegistry;
class Queue;
+
+struct QueueBinding{
+ std::string exchange;
+ std::string key;
+ qpid::framing::FieldTable args;
+ QueueBinding(const std::string& exchange, const std::string& key, const qpid::framing::FieldTable& args);
+};
+
class QueueBindings
{
- struct Binding{
- const std::string exchange;
- const std::string key;
- const qpid::framing::FieldTable args;
- Binding(const std::string& exchange, const std::string& key, const qpid::framing::FieldTable& args);
- };
-
- typedef boost::ptr_list<Binding> Bindings;
- Bindings bindings;
+ public:
-public:
+ /** Apply f to each QueueBinding. */
+ template <class F> void eachBinding(const F& f) const { std::for_each(bindings.begin(), bindings.end(), f); }
+
void add(const std::string& exchange, const std::string& key, const qpid::framing::FieldTable& args);
void unbind(ExchangeRegistry& exchanges, boost::shared_ptr<Queue> queue);
+
+ private:
+ typedef std::vector<QueueBinding> Bindings;
+ Bindings bindings;
};
-}
-}
+}} // namespace qpid::broker
#endif
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index f7be1c551a..2dc5d9c534 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -21,10 +21,12 @@
#ifndef _QueueRegistry_
#define _QueueRegistry_
-#include <map>
-#include "qpid/sys/Mutex.h"
#include "Queue.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/management/Manageable.h"
+#include <boost/bind.hpp>
+#include <algorithm>
+#include <map>
namespace qpid {
namespace broker {
@@ -98,11 +100,18 @@ class QueueRegistry{
* Register the manageable parent for declared queues
*/
void setParent (management::Manageable* _parent) { parent = _parent; }
+
+ /** Call f for each queue in the registry. */
+ template <class F> void eachQueue(const F& f) const {
+ qpid::sys::RWlock::ScopedWlock l(lock);
+ std::for_each(queues.begin(), queues.end(),
+ boost::bind(f, boost::bind(&QueueMap::value_type::second, _1)));
+ }
private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
QueueMap queues;
- qpid::sys::RWlock lock;
+ mutable qpid::sys::RWlock lock;
int counter;
MessageStore* store;
management::Manageable* parent;
@@ -112,8 +121,7 @@ private:
};
-}
-}
+}} // namespace qpid::broker
#endif
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 3185080f94..675a6a304c 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -353,8 +353,14 @@ void SemanticState::handle(intrusive_ptr<Message> msg) {
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
//TODO: the following should be hidden behind message (using MessageAdapter or similar)
+
+ // Do not replace the delivery-properties.exchange if it is is already set.
+ // This is used internally (by the cluster) to force the exchange name on a message.
+ // The client library ensures this is always empty for messages from normal clients.
if (msg->isA<MessageTransferBody>()) {
- msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
+ if (!msg->hasProperties<DeliveryProperties>() ||
+ msg->getProperties<DeliveryProperties>()->getExchange().empty())
+ msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
}
if (!cacheExchange || cacheExchange->getName() != exchangeName){
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
@@ -392,11 +398,6 @@ void SemanticState::requestDispatch(ConsumerImpl& c)
{
if(c.isBlocked())
outputTasks.activateOutput();
- // TODO aconway 2008-07-16: we could directly call
- // c.doOutput();
- // since we are in the connections thread but for consistency
- // activateOutput() will set it up to be called in the next write idle.
- // Current cluster code depends on this, review cluster code to change.
}
void SemanticState::complete(DeliveryRecord& delivery)
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index c71fe068ce..d72d1974c2 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -117,7 +117,6 @@ class SessionState : public qpid::SessionState,
SessionHandler* handler;
sys::AbsTime expiry; // Used by SessionManager.
bool ignoring;
- std::string name;
SemanticState semanticState;
SessionAdapter adapter;
MessageBuilder msgBuilder;
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index 6572794516..e19cfea059 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -23,6 +23,7 @@
#include "Message.h"
#include "SessionImpl.h"
#include "SessionBase_0_10Access.h"
+#include "qpid/Url.h"
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
@@ -48,6 +49,37 @@ Connection::Connection() : channelIdCounter(0), version(framing::highestProtocol
Connection::~Connection(){ }
void Connection::open(
+ const Url& url,
+ const std::string& uid, const std::string& pwd,
+ const std::string& vhost,
+ uint16_t maxFrameSize)
+{
+ if (url.empty())
+ throw Exception(QPID_MSG("Attempt to open URL with no addresses."));
+ Url::const_iterator i = url.begin();
+ do {
+ const TcpAddress* tcp = i->get<TcpAddress>();
+ i++;
+ if (tcp) {
+ try {
+ ConnectionSettings settings;
+ settings.host = tcp->host;
+ settings.port = tcp->port;
+ settings.username = uid;
+ settings.password = pwd;
+ settings.virtualhost = vhost;
+ settings.maxFrameSize = maxFrameSize;
+ open(settings);
+ break;
+ }
+ catch (const Exception& e) {
+ if (i == url.end()) throw;
+ }
+ }
+ } while (i != url.end());
+}
+
+void Connection::open(
const std::string& host, int port,
const std::string& uid, const std::string& pwd,
const std::string& vhost,
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index ee543e20d2..e8ff2d8660 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -1,5 +1,5 @@
-#ifndef _client_Connection_
-#define _client_Connection_
+#ifndef QPID_CLIENT_CONNECTION_H
+#define QPID_CLIENT_CONNECTION_H
/*
*
@@ -26,6 +26,9 @@
#include "qpid/client/Session.h"
namespace qpid {
+
+class Url;
+
namespace client {
class ConnectionSettings;
@@ -77,6 +80,28 @@ class Connection
const std::string& virtualhost = "/", uint16_t maxFrameSize=65535);
/**
+ * Opens a connection to a broker using a URL.
+ * If the URL contains multiple addresses, try each in turn
+ * till connection is successful.
+ *
+ * @url address of the broker to connect to.
+ *
+ * @param uid the userid to connect with.
+ *
+ * @param pwd the password to connect with (currently SASL
+ * PLAIN is the only authentication method supported so this
+ * is sent in clear text).
+ *
+ * @param virtualhost the AMQP virtual host to use (virtual
+ * hosts, where implemented(!), provide namespace partitioning
+ * within a single broker).
+ */
+ void open(const Url& url,
+ const std::string& uid = "guest",
+ const std::string& pwd = "guest",
+ const std::string& virtualhost = "/", uint16_t maxFrameSize=65535);
+
+ /**
* Opens a connection to a broker.
*
* @param the settings to use (host, port etc) @see ConnectionSettings
@@ -146,4 +171,4 @@ class Connection
}} // namespace qpid::client
-#endif
+#endif /*!QPID_CLIENT_CONNECTION_H*/
diff --git a/cpp/src/qpid/client/Message.cpp b/cpp/src/qpid/client/Message.cpp
index d5464594ee..13caaecefd 100644
--- a/cpp/src/qpid/client/Message.cpp
+++ b/cpp/src/qpid/client/Message.cpp
@@ -24,9 +24,7 @@
namespace qpid {
namespace client {
-Message::Message(const std::string& data_,
- const std::string& routingKey,
- const std::string& exchange) : TransferContent(data_, routingKey, exchange) {}
+Message::Message(const std::string& data, const std::string& routingKey) : TransferContent(data, routingKey) {}
std::string Message::getDestination() const
{
diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h
index 4e6ed49bb4..18efdd20f7 100644
--- a/cpp/src/qpid/client/Message.h
+++ b/cpp/src/qpid/client/Message.h
@@ -40,11 +40,9 @@ public:
/** Create a Message.
*@param data Data for the message body.
*@param routingKey Passed to the exchange that routes the message.
- *@param exchange Name of the exchange that should route the message.
*/
Message(const std::string& data=std::string(),
- const std::string& routingKey=std::string(),
- const std::string& exchange=std::string());
+ const std::string& routingKey=std::string());
/** The destination of messages sent to the broker is the exchange
* name. The destination of messages received from the broker is
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index aef546c427..68955050b4 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -28,9 +28,11 @@
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MethodContent.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/DeliveryProperties.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
@@ -272,6 +274,41 @@ Future SessionImpl::send(const AMQBody& command, const MethodContent& content)
return sendCommand(command, &content);
}
+namespace {
+// Functor for FrameSet::map to send header + content frames but, not method frames.
+struct SendContentFn {
+ FrameHandler& handler;
+ void operator()(const AMQFrame& f) {
+ if (!f.getMethod())
+ handler(const_cast<AMQFrame&>(f));
+ }
+ SendContentFn(FrameHandler& h) : handler(h) {}
+};
+}
+
+Future SessionImpl::send(const AMQBody& command, const FrameSet& content) {
+ Acquire a(sendLock);
+ SequenceNumber id = nextOut++;
+ {
+ Lock l(state);
+ checkOpen();
+ incompleteOut.add(id);
+ }
+ Future f(id);
+ if (command.getMethod()->resultExpected()) {
+ Lock l(state);
+ //result listener must be set before the command is sent
+ f.setFutureResult(results.listenForResult(id));
+ }
+ AMQFrame frame(command);
+ frame.setEof(false);
+ handleOut(frame);
+
+ SendContentFn send(out);
+ content.map(send);
+ return f;
+}
+
Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content)
{
Acquire a(sendLock);
@@ -297,9 +334,16 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con
}
return f;
}
+
void SessionImpl::sendContent(const MethodContent& content)
{
AMQFrame header(content.getHeader());
+
+ // Client is not allowed to set the delivery-properties.exchange.
+ AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
+ if (headerp && headerp->get<DeliveryProperties>())
+ headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
+
header.setFirstSegment(false);
uint64_t data_length = content.getData().length();
if(data_length > 0){
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index 55031a94ae..c63774a23a 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -81,6 +81,7 @@ public:
Future send(const framing::AMQBody& command);
Future send(const framing::AMQBody& command, const framing::MethodContent& content);
+ Future send(const framing::AMQBody& command, const framing::FrameSet& content);
Demux& getDemux();
void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 4c0a768c4f..ce156e85e4 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -25,7 +25,7 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterUrlNoticeBody.h"
+#include "qpid/framing/ClusterJoiningBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
@@ -50,7 +50,13 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
Cluster& cluster;
MemberId member;
ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
- void urlNotice(const std::string& u) { cluster.urlNotice (member, u); }
+ void joining(const std::string& u) { cluster.joining (member, u); }
+ void ready() { cluster.ready(member); }
+
+ void members(const framing::FieldTable& , const framing::FieldTable& , const framing::FieldTable& ) {
+ assert(0); // Not passed to cluster, used to start a brain dump over TCP.
+ }
+
bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
};
@@ -237,7 +243,7 @@ void Cluster::configChange(
if (nJoined) // Notfiy new members of my URL.
mcastFrame(
- AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
+ AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())),
ConnectionId(self,0));
@@ -261,11 +267,15 @@ void Cluster::disconnect(sys::DispatchHandle& h) {
broker->shutdown();
}
-void Cluster::urlNotice(const MemberId& m, const string& url) {
+void Cluster::joining(const MemberId& m, const string& url) {
QPID_LOG(notice, "Cluster member " << m << " has URL " << url);
urls.insert(UrlMap::value_type(m,Url(url)));
}
+void Cluster::ready(const MemberId& ) {
+ // FIXME aconway 2008-09-08: TODO
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 0cd916d5fb..a25b62ea12 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -75,7 +75,8 @@ class Cluster : public RefCounted, private Cpg::Handler
/** Leave the cluster */
void leave();
- void urlNotice(const MemberId&, const std::string& url);
+ void joining(const MemberId&, const std::string& url);
+ void ready(const MemberId&);
broker::Broker& getBroker() { assert(broker); return *broker; }
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index d829683000..31447f2fd0 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -26,6 +26,7 @@
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/shared_ptr.h"
+#include "qpid/log/Statement.h"
#include <boost/utility/in_place_factory.hpp>
@@ -75,7 +76,7 @@ struct ClusterPlugin : public Plugin {
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified.
- if (cluster) throw Exception("Cluster plugin cannot be initialized twice in one process.");
+ QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of cluster plugin.");
cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker);
broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
broker->setConnectionFactory(
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index e6372e80ea..b3e151ce51 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -89,6 +89,18 @@ class Connection :
// ConnectionInputHandlerFactory
sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient);
+ // State dump methods.
+ virtual void sessionState(const framing::SequenceNumber& /*replayId*/,
+ const framing::SequenceNumber& /*sendId*/,
+ const framing::SequenceSet& /*sentIncomplete*/,
+ const framing::SequenceNumber& /*expectedId*/,
+ const framing::SequenceNumber& /*receivedId*/,
+ const framing::SequenceSet& /*unknownCompleted*/,
+ const framing::SequenceSet& /*receivedIncomplete*/) {}
+
+ virtual void shadowReady(uint64_t /*clusterId*/,
+ const std::string& /*userId*/) {}
+
private:
void sendDoOutput();
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
new file mode 100644
index 0000000000..5b92552209
--- /dev/null
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DumpClient.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/enum.h"
+#include "qpid/Url.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace cluster {
+
+using broker::Broker;
+using broker::Exchange;
+using broker::Queue;
+using broker::QueueBinding;
+using broker::Message;
+using namespace framing::message;
+
+using namespace client;
+
+DumpClient::DumpClient(const Url& url) {
+ connection.open(url);
+ session = connection.newSession();
+}
+
+DumpClient::~DumpClient() {
+ session.close();
+ connection.close();
+}
+
+// Catch-up exchange name: an illegal AMQP exchange name to avoid clashes.
+static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
+static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS));
+
+void DumpClient::dump(Broker& donor) {
+ // TODO aconway 2008-09-08: Caller must handle exceptions
+ // FIXME aconway 2008-09-08: send cluster map frame first.
+ donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
+ // Catch-up exchange is used to route messages to the proper queue without modifying routing key.
+ session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true);
+ donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
+ session.sync();
+}
+
+void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) {
+ session.exchangeDeclare(
+ ex->getName(), ex->getType(),
+ ex->getAlternate() ? ex->getAlternate()->getName() : std::string(),
+ arg::passive=false,
+ arg::durable=ex->isDurable(),
+ arg::autoDelete=false,
+ arg::arguments=ex->getArgs());
+}
+
+void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
+ session.queueDeclare(
+ q->getName(),
+ q->getAlternateExchange() ? q->getAlternateExchange()->getName() : std::string(),
+ arg::passive=false,
+ arg::durable=q->isDurable(),
+ arg::exclusive=q->hasExclusiveConsumer(),
+ arg::autoDelete=q->isAutoDelete(),
+ arg::arguments=q->getSettings());
+
+ session.exchangeBind(q->getName(), CATCH_UP, std::string());
+ q->eachMessage(boost::bind(&DumpClient::dumpMessage, this, _1));
+ session.exchangeUnbind(q->getName(), CATCH_UP, std::string());
+ q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1));
+}
+
+void DumpClient::dumpMessage(const broker::QueuedMessage& message) {
+ SessionBase_0_10Access sb(session);
+ framing::MessageTransferBody transfer(framing::ProtocolVersion(), CATCH_UP, ACCEPT_MODE_NONE, ACQUIRE_MODE_PRE_ACQUIRED);
+ sb.get()->send(transfer, message.payload->getFrames());
+}
+
+void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& binding) {
+ session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
+}
+
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h
new file mode 100644
index 0000000000..447fd1abef
--- /dev/null
+++ b/cpp/src/qpid/cluster/DumpClient.h
@@ -0,0 +1,74 @@
+#ifndef QPID_CLUSTER_DUMPCLIENT_H
+#define QPID_CLUSTER_DUMPCLIENT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Connection.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include <boost/shared_ptr.hpp>
+
+
+namespace qpid {
+
+class Url;
+
+namespace broker {
+
+class Broker;
+class Queue;
+class Exchange;
+class QueueBindings;
+class QueueBinding;
+class QueuedMessage;
+} // namespace broker
+
+namespace cluster {
+
+/**
+ * A client that dumps the contents of a local broker to a remote one using AMQP.
+ */
+class DumpClient {
+ public:
+ DumpClient(const Url& receiver);
+ ~DumpClient();
+
+ void dump(broker::Broker& donor);
+
+ private:
+ void dumpQueue(const boost::shared_ptr<broker::Queue>&);
+ void dumpExchange(const boost::shared_ptr<broker::Exchange>&);
+ void dumpMessage(const broker::QueuedMessage&);
+ void dumpBinding(const std::string& queue, const broker::QueueBinding& binding);
+
+ private:
+ client::Connection connection;
+ client::AsyncSession session;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_DUMPCLIENT_H*/
diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp
index 42af2027eb..3fc54296fa 100644
--- a/cpp/src/qpid/framing/TransferContent.cpp
+++ b/cpp/src/qpid/framing/TransferContent.cpp
@@ -24,15 +24,12 @@
namespace qpid {
namespace framing {
-TransferContent::TransferContent(const std::string& data,
- const std::string& routingKey,
- const std::string& exchange)
-{
+TransferContent::TransferContent(const std::string& data, const std::string& key) {
setData(data);
- if (routingKey.size()) getDeliveryProperties().setRoutingKey(routingKey);
- if (exchange.size()) getDeliveryProperties().setExchange(exchange);
+ if (!key.empty()) getDeliveryProperties().setRoutingKey(key);
}
+
AMQHeaderBody TransferContent::getHeader() const
{
return header;
diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h
index 7630421bd4..e3f6666fa4 100644
--- a/cpp/src/qpid/framing/TransferContent.h
+++ b/cpp/src/qpid/framing/TransferContent.h
@@ -36,9 +36,7 @@ class TransferContent : public MethodContent
AMQHeaderBody header;
std::string data;
public:
- TransferContent(const std::string& data = std::string(),
- const std::string& routingKey = std::string(),
- const std::string& exchange = std::string());
+ TransferContent(const std::string& data = std::string(), const std::string& key=std::string());
///@internal
AMQHeaderBody getHeader() const;
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 9abc1b189e..d082d74367 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -23,6 +23,7 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/DumpClient.h"
#include "qpid/framing/AMQBody.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Session.h"
@@ -51,6 +52,7 @@ using namespace qpid;
using namespace qpid::cluster;
using namespace qpid::framing;
using namespace qpid::client;
+using qpid::sys::TIME_SEC;
using qpid::broker::Broker;
using boost::ptr_vector;
using qpid::cluster::Cluster;
@@ -133,6 +135,86 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
+QPID_AUTO_TEST_CASE(testDumpClient) {
+ BrokerFixture donor, receiver;
+ {
+ Client c(donor.getPort());
+ FieldTable args;
+ args.setString("x", "y");
+ c.session.queueDeclare("qa", arg::arguments=args);
+ c.session.queueDeclare("qb", arg::alternateExchange="amq.direct");
+
+ c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", arg::arguments=args);
+ c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", arg::bindingKey="foo");
+ c.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("one", "foo"));
+
+ c.session.exchangeDeclare("ext", arg::type="topic");
+ c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", arg::bindingKey="bar");
+ c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0));
+ c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("one", "bar"));
+ c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("two", "bar"));
+
+ c.session.close();
+ c.connection.close();
+ }
+ qpid::cluster::DumpClient dump(Url::getIpAddressesUrl(receiver.getPort()));
+ dump.dump(*donor.broker);
+ {
+ Client r(receiver.getPort());
+ // Verify exchanges
+ ExchangeQueryResult ex=r.session.exchangeQuery("exd");
+ BOOST_CHECK_EQUAL(ex.getType(), "direct");
+ BOOST_CHECK_EQUAL(ex.getDurable(), false);
+ BOOST_CHECK_EQUAL(ex.getNotFound(), false);
+ BOOST_CHECK_EQUAL(ex.getArguments().getString("x"), "y");
+
+ ex = r.session.exchangeQuery("ext");
+ BOOST_CHECK_EQUAL(ex.getType(), "topic");
+ BOOST_CHECK_EQUAL(ex.getNotFound(), false);
+
+ // Verify queues
+ QueueQueryResult qq = r.session.queueQuery("qa");
+ BOOST_CHECK_EQUAL(qq.getQueue(), "qa");
+ BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "");
+ BOOST_CHECK_EQUAL(qq.getArguments().getString("x"), "y");
+ BOOST_CHECK_EQUAL(qq.getMessageCount(), 1);
+
+ qq = r.session.queueQuery("qb");
+ BOOST_CHECK_EQUAL(qq.getQueue(), "qb");
+ BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "amq.direct");
+ BOOST_CHECK_EQUAL(qq.getMessageCount(), 2);
+
+ // Verify messages
+ Message m;
+ BOOST_CHECK(r.subs.get(m, "qa", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "one");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "exd");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "foo");
+
+ BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "one");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
+
+ BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "two");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
+
+ // Verify bindings
+ r.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("xxx", "foo"));
+ BOOST_CHECK(r.subs.get(m, "qa"));
+ BOOST_CHECK_EQUAL(m.getData(), "xxx");
+
+ r.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("yyy", "bar"));
+ BOOST_CHECK(r.subs.get(m, "qb"));
+ BOOST_CHECK_EQUAL(m.getData(), "yyy");
+
+ r.session.close();
+ r.connection.close();
+ }
+}
+
QPID_AUTO_TEST_CASE(testForkedBroker) {
// Verify the ForkedBroker works as expected.
const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" };
@@ -146,8 +228,7 @@ QPID_AUTO_TEST_CASE(testSingletonCluster) {
ClusterFixture cluster(1);
Client c(cluster[0]);
BOOST_CHECK(c.session.queueQuery("q").getQueue().empty());
- BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty());
- // FIXME aconway 2008-09-01: leaks if aisexec not running, investigate.
+ BOOST_CHECK(c.session.exchangeQuery("ex").getNotFound());
}
QPID_AUTO_TEST_CASE(testWiringReplication) {
@@ -234,5 +315,4 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
}
-
QPID_AUTO_TEST_SUITE_END()