summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp231
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h40
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.cpp101
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.h43
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/DumpClient.cpp17
-rw-r--r--qpid/cpp/src/qpid/cluster/DumpClient.h11
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/types.h7
-rw-r--r--qpid/cpp/src/tests/ClusterMapTest.cpp74
-rw-r--r--qpid/cpp/src/tests/DumpClientTest.cpp122
-rw-r--r--qpid/cpp/src/tests/cluster.mk2
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp90
-rw-r--r--qpid/cpp/xml/cluster.xml12
14 files changed, 476 insertions, 281 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 9db2a61a82..c441686def 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -25,7 +25,10 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterUrlNoticeBody.h"
+#include "qpid/framing/ClusterDumpRequestBody.h"
+#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterDumpErrorBody.h"
+#include "qpid/framing/ClusterMapBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
@@ -50,19 +53,14 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
Cluster& cluster;
MemberId member;
ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
- void urlNotice(const std::string& u) { cluster.urlNotice(member, u); }
- void ready() { cluster.ready(member); }
-
- void members(const framing::FieldTable& , const framing::FieldTable& , const framing::FieldTable& ) {
- assert(0); // Not passed to cluster, used to start a brain dump over TCP.
- }
-
bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
- virtual void map(const FieldTable& ,const FieldTable& ,const FieldTable& ) {
- // FIXME aconway 2008-09-12: TODO
+ void dumpRequest(const std::string& u) { cluster.dumpRequest(member, u); }
+ void dumpError(uint64_t dumpee) { cluster.dumpError(member, MemberId(dumpee)); }
+ void ready(const std::string& u) { cluster.ready(member, u); }
+ virtual void map(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) {
+ cluster.mapInit(members, dumpees, dumps);
}
-
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
@@ -80,17 +78,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
state(DISCARD)
{
- QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str());
+ QPID_LOG(notice, self << " joining cluster " << name.str());
broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
- cpg.join(name);
-
- connectionEventQueue.start(poller);
cpgDispatchHandle.startWatch(poller);
+ cpg.join(name);
+
}
-Cluster::~Cluster() {
- QPID_LOG(debug, "~Cluster()");
-}
+Cluster::~Cluster() {}
void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
Mutex::ScopedLock l(lock);
@@ -102,60 +97,47 @@ void Cluster::erase(ConnectionId id) {
connections.erase(id);
}
-// FIXME aconway 2008-09-10: leave is currently not called,
-// It should be called if we are shut down by a cluster admin command.
+// FIXME aconway 2008-09-10: call leave from cluster admin command.
// Any other type of exit is caught in disconnect().
//
void Cluster::leave() {
- QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str());
+ QPID_LOG(notice, self << " leaving cluster " << name.str());
cpg.leave(name);
- // Cluster will shut down in configChange when the cluster knows we've left.
-}
-
-template <class T> void decodePtr(Buffer& buf, T*& ptr) {
- uint64_t value = buf.getLongLong();
- ptr = reinterpret_cast<T*>(value);
-}
-
-template <class T> void encodePtr(Buffer& buf, T* ptr) {
- uint64_t value = reinterpret_cast<uint64_t>(ptr);
- buf.putLongLong(value);
+ // Defer shut down to the final configChange when the group knows we've left.
}
-void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) {
- QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
- Event e(CONTROL, connection, frame.size());
+void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) {
+ QPID_LOG(trace, "MCAST [" << self << "]: " << body);
+ AMQFrame f(body);
+ Event e(CONTROL, ConnectionId(self, cptr), f.size());
Buffer buf(e);
- frame.encode(buf);
+ f.encode(buf);
mcastEvent(e);
}
void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) {
- QPID_LOG(trace, "MCAST [" << connection << "] " << size << "bytes of data");
Event e(DATA, connection, size);
memcpy(e.getData(), data, size);
mcastEvent(e);
}
void Cluster::mcastEvent(const Event& e) {
- QPID_LOG(trace, "Multicasting: " << e);
e.mcast(name, cpg);
}
size_t Cluster::size() const {
Mutex::ScopedLock l(lock);
- return urls.size();
+ return map.memberCount();
}
std::vector<Url> Cluster::getUrls() const {
Mutex::ScopedLock l(lock);
- std::vector<Url> result(urls.size());
- std::transform(urls.begin(), urls.end(), result.begin(),
- boost::bind(&UrlMap::value_type::second, _1));
- return result;
-}
+ return map.memberUrls();
+}
+// FIXME aconway 2008-09-15: volatile for locked/unlocked functions.
boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
+ Mutex::ScopedLock l(lock);
if (id.getMember() == self)
return boost::intrusive_ptr<Connection>(id.getConnectionPtr());
ConnectionMap::iterator i = connections.find(id);
@@ -180,17 +162,19 @@ void Cluster::deliver(
try {
MemberId from(nodeid, pid);
Event e = Event::delivered(from, msg, msg_len);
- QPID_LOG(trace, "Cluster deliver: " << e);
-
// Process cluster controls immediately
if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control
Buffer buf(e);
AMQFrame frame;
- while (frame.decode(buf))
+ while (frame.decode(buf)) {
+ QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody());
if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame))
- throw Exception("Invalid cluster control");
+ throw Exception(QPID_MSG("Invalid cluster control"));
+ }
}
- else { // Process connection controls & data via the connectionEventQueue.
+ else {
+ // Process connection controls & data via the connectionEventQueue
+ // unless we are in the DISCARD state, in which case ignore.
if (state != DISCARD) {
e.setConnection(getConnection(e.getConnectionId()));
connectionEventQueue.push(e);
@@ -227,15 +211,15 @@ ostream& operator<<(ostream& o, const AddrList& a) {
for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
const char* reasonString;
switch (p->reason) {
- case CPG_REASON_JOIN: reasonString = " joined "; break;
- case CPG_REASON_LEAVE: reasonString = " left ";break;
- case CPG_REASON_NODEDOWN: reasonString = " node-down ";break;
- case CPG_REASON_NODEUP: reasonString = " node-up ";break;
- case CPG_REASON_PROCDOWN: reasonString = " process-down ";break;
+ case CPG_REASON_JOIN: reasonString = " joined"; break;
+ case CPG_REASON_LEAVE: reasonString = " left";break;
+ case CPG_REASON_NODEDOWN: reasonString = " node-down";break;
+ case CPG_REASON_NODEUP: reasonString = " node-up";break;
+ case CPG_REASON_PROCDOWN: reasonString = " process-down";break;
default: reasonString = " ";
}
qpid::cluster::MemberId member(*p);
- o << member << reasonString;
+ o << member << reasonString << ((p+1 < a.addrs+a.count) ? ", " : "");
}
return o;
}
@@ -247,23 +231,28 @@ void Cluster::configChange(
cpg_address *left, int nLeft,
cpg_address *joined, int nJoined)
{
- QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: "
- << AddrList(joined, nJoined) << AddrList(left, nLeft));
-
- if (nJoined) // Notfiy new members of my URL.
- mcastFrame(
- AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
- ConnectionId(self,0));
-
+ // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node.
+ QPID_LOG(notice, "Current cluster: " << AddrList(current, nCurrent));
+ QPID_LOG_IF(notice, nLeft, "Left the cluster: " << AddrList(left, nLeft));
if (find(left, left+nLeft, self) != left+nLeft) {
// We have left the group, this is the final config change.
- QPID_LOG(notice, "Cluster member " << self << " left cluster " << name.str());
- broker.shutdown();
+ QPID_LOG(notice, self << " left cluster " << name.str());
+ broker.shutdown();
}
Mutex::ScopedLock l(lock);
- for (int i = 0; i < nLeft; ++i) urls.erase(left[i]);
- // Add new members when their URL notice arraives.
- lock.notifyAll(); // Threads waiting for membership changes.
+ if (state == DISCARD) {
+ if (nCurrent == 1 && *current == self) {
+ QPID_LOG(notice, self << " first in cluster.");
+ map.ready(self, url);
+ ready(); // First in cluster.
+ }
+ else if (find(joined, joined+nJoined, self) != joined+nJoined) {
+ QPID_LOG(notice, self << " requesting state dump."); // Just joined
+ mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
+ }
+ }
+ for (int i = 0; i < nLeft; ++i)
+ map.leave(left[i]);
}
void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -275,24 +264,59 @@ void Cluster::disconnect(sys::DispatchHandle& ) {
// FIXME aconway 2008-09-11: this should be logged as critical,
// when we provide admin option to shut down cluster and let
// members leave cleanly.
- QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster " << name.str());
+ QPID_LOG(notice, self << " disconnected from cluster " << name.str());
broker.shutdown();
}
-void Cluster::urlNotice(const MemberId& m, const string& url) {
- //FIXME aconway 2008-09-12: Rdo join logic using ClusterMap. Implement xml map function also.
- //FIXME aconway 2008-09-11: Note multiple meanings of my own notice -
- //from DISCARD->STALL and from STALL->READY via map.
+// FIXME aconway 2008-09-15: can't serve multiple dump requests, stall in wrong place.
+// Only one at a time to simplify things?
+void Cluster::dumpRequest(const MemberId& m, const string& urlStr) {
+ Mutex::ScopedLock l(lock);
+ Url url(urlStr);
+ if (self == m) {
+ switch (state) {
+ case DISCARD: state = CATCHUP; stall(); break;
+ case HAVE_DUMP: ready(); break; // FIXME aconway 2008-09-15: apply dump to map.
+ default: assert(0);
+ };
+ }
+ else if (self == map.dumpRequest(m, url)) {
+ assert(state == READY);
+ QPID_LOG(info, self << " dumping to " << url);
+ // state = DUMPING;
+ // stall();
+ // FIXME aconway 2008-09-15: need to stall map?
+ // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
+ mcastControl(map.toControl(), 0); // FIXME aconway 2008-09-15: stand-in for dump.
+ }
+}
+
+void Cluster::ready(const MemberId& m, const string& urlStr) {
+ Mutex::ScopedLock l(lock);
+ Url url(urlStr);
+ map.ready(m, url);
+}
+
+broker::Broker& Cluster::getBroker(){ return broker; }
+
+void Cluster::stall() {
+ Mutex::ScopedLock l(lock);
+ // Stop processing connection events. We still process config changes
+ // and cluster controls in deliver()
+ connectionEventQueue.stop();
- QPID_LOG(info, "Cluster member " << m << " has URL " << url);
- // My brain dump is up to this point, stall till it is complete.
- if (m == self && state == DISCARD)
- state = STALL;
- urls.insert(UrlMap::value_type(m,Url(url)));
+ // FIXME aconway 2008-09-11: Flow control, we should slow down or
+ // stop reading from local connections while stalled to avoid an
+ // unbounded queue.
}
-void Cluster::ready(const MemberId& ) {
- // FIXME aconway 2008-09-08: TODO
+void Cluster::ready() {
+ // Called with lock held
+ QPID_LOG(info, self << " ready with URL " << url);
+ state = READY;
+ mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+ connectionEventQueue.start(poller);
+ // FIXME aconway 2008-09-15: stall/unstall map?
}
// Called from Broker::~Broker when broker is shut down. At this
@@ -301,26 +325,51 @@ void Cluster::ready(const MemberId& ) {
// callbacks will be invoked.
//
void Cluster::shutdown() {
- QPID_LOG(notice, "Cluster member " << self << " shutting down.");
+ QPID_LOG(notice, self << " shutting down.");
try { cpg.shutdown(); }
catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); }
delete this;
}
-broker::Broker& Cluster::getBroker(){ return broker; }
+/** Received from cluster */
+void Cluster::dumpError(const MemberId& dumper, const MemberId& dumpee) {
+ QPID_LOG(error, "Error in dump from " << dumper << " to " << dumpee);
+ Mutex::ScopedLock l(lock);
+ map.dumpError(dumpee);
+ if (state == DUMPING && map.dumps(self) == 0)
+ ready();
+}
-void Cluster::stall() {
- // Stop processing connection events. We still process config changes
- // and cluster controls in deliver()
+/** Called in local dump thread */
+void Cluster::dumpError(const MemberId& dumpee, const Url& url, const char* msg) {
+ assert(state == DUMPING);
+ QPID_LOG(error, "Error in local dump to " << dumpee << " at " << url << ": " << msg);
+ mcastControl(ClusterDumpErrorBody(ProtocolVersion(), dumpee), 0);
+ Mutex::ScopedLock l(lock);
+ map.dumpError(dumpee);
+ if (map.dumps(self) == 0) // Unstall immediately.
+ ready();
+}
- // FIXME aconway 2008-09-11: Flow control, we should slow down or
- // stop reading from local connections while stalled to avoid an
- // unbounded queue.
- connectionEventQueue.stop();
+void Cluster::mapInit(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) {
+ Mutex::ScopedLock l(lock);
+ // FIXME aconway 2008-09-15: faking out dump here.
+ switch (state) {
+ case DISCARD:
+ map.init(members, dumpees, dumps);
+ state = HAVE_DUMP;
+ break;
+ case CATCHUP:
+ map.init(members, dumpees, dumps);
+ ready();
+ break;
+ default:
+ break;
+ }
}
-void Cluster::unStall() {
- connectionEventQueue.start(poller);
+void Cluster::dumpTo(const Url& ) {
+ // FIXME aconway 2008-09-12: DumpClient
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 5187cb08e7..24db07b32b 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -19,19 +19,19 @@
*
*/
-#include "qpid/cluster/Cpg.h"
-#include "qpid/cluster/Event.h"
-#include "qpid/sys/PollableQueue.h"
-#include "qpid/cluster/NoOpConnectionOutputHandler.h"
+#include "Cpg.h"
+#include "Event.h"
+#include "NoOpConnectionOutputHandler.h"
+#include "ClusterMap.h"
#include "qpid/broker/Broker.h"
+#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/Url.h"
#include <boost/intrusive_ptr.hpp>
-#include <map>
#include <vector>
namespace qpid {
@@ -68,33 +68,38 @@ class Cluster : private Cpg::Handler
bool empty() const { return size() == 0; }
/** Send to the cluster */
- void mcastFrame(const framing::AMQFrame&, const ConnectionId&);
+ void mcastControl(const framing::AMQBody& controlBody, Connection* cptr);
void mcastBuffer(const char*, size_t, const ConnectionId&);
void mcastEvent(const Event& e);
/** Leave the cluster */
void leave();
- void urlNotice(const MemberId&, const std::string& url);
- void ready(const MemberId&);
+ void dumpRequest(const MemberId&, const std::string& url);
+ void dumpError(const MemberId& dumper, const MemberId& dumpee);
+ void ready(const MemberId&, const std::string& url);
+ void mapInit(const framing::FieldTable& members,
+ const framing::FieldTable& dumpees,
+ const framing::FieldTable& dumps);
MemberId getSelf() const { return self; }
void stall();
- void unStall();
+ void ready();
void shutdown();
broker::Broker& getBroker();
private:
- typedef std::map<MemberId, Url> UrlMap;
typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
typedef sys::PollableQueue<Event> EventQueue;
enum State {
- DISCARD, // Initially discard connection events up to my own join message.
- READY, // Normal processing.
- STALL // Stalled while a new member joins.
+ DISCARD, // Discard updates up to catchup point.
+ HAVE_DUMP, // Received state dump, waiting for catchup point.
+ CATCHUP, // Stalled at catchup point, waiting for dump.
+ DUMPING, // Stalled while sending a state dump.
+ READY // Normal processing.
};
void connectionEvent(const Event&);
@@ -126,23 +131,22 @@ class Cluster : private Cpg::Handler
boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
+ void dumpTo(const Url&);
+ void dumpError(const MemberId&, const Url&, const char* msg);
+
mutable sys::Monitor lock; // Protect access to members.
broker::Broker& broker;
boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
Cpg::Name name;
Url url;
- UrlMap urls;
+ ClusterMap map;
MemberId self;
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
EventQueue connectionEventQueue;
State state;
-
- friend std::ostream& operator <<(std::ostream&, const Cluster&);
- friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);
- friend std::ostream& operator <<(std::ostream&, const UrlMap&);
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
index b0c45ad625..24c3ed5552 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -24,36 +24,38 @@
#include <boost/bind.hpp>
#include <algorithm>
#include <functional>
+#include <iterator>
+#include <ostream>
namespace qpid {
using namespace framing;
namespace cluster {
-ClusterMap::ClusterMap() {}
+ClusterMap::ClusterMap() : stalled(false) {}
-MemberId ClusterMap::urlNotice(const MemberId& id, const Url& url) {
- if (isMember(id)) return MemberId(); // Ignore notices from established members.
- if (isDumpee(id)) {
- // Dumpee caught up, graduate to member with new URL and remove dumper from list.
- dumpees.erase(id);
- members[id] = url;
- }
- else if (members.empty()) {
- // First in cluster, congratulations!
- members[id] = url;
+MemberId ClusterMap::dumpRequest(const MemberId& id, const Url& url) {
+ if (stalled) {
+ stallq.push_back(boost::bind(&ClusterMap::dumpRequest, this, id, url));
+ return MemberId();
}
- else {
- // New member needs brain dump.
- MemberId dumper = nextDumper();
- Dumpee& d = dumpees[id];
- d.url = url;
- d.dumper = dumper;
- return dumper;
+ MemberId dumper = nextDumper();
+ Dumpee& d = dumpees[id];
+ d.url = url;
+ d.dumper = dumper;
+ return dumper;
+}
+
+void ClusterMap::ready(const MemberId& id, const Url& url) {
+ if (stalled) {
+ stallq.push_back(boost::bind(&ClusterMap::ready, this, id, url));
+ return;
}
- return MemberId();
+ dumpees.erase(id);
+ members[id] = url;
}
+
MemberId ClusterMap::nextDumper() const {
// Choose the first member in member-id order of the group that
// has the least number of dumps-in-progress.
@@ -73,6 +75,11 @@ MemberId ClusterMap::nextDumper() const {
}
void ClusterMap::leave(const MemberId& id) {
+ if (stalled) {
+ stallq.push_back(boost::bind(&ClusterMap::leave, this, id));
+ return;
+ }
+
if (isDumpee(id))
dumpees.erase(id);
if (isMember(id)) {
@@ -95,7 +102,13 @@ int ClusterMap::dumps(const MemberId& id) const {
return std::count_if(dumpees.begin(), dumpees.end(), MatchDumper(id));
}
-void ClusterMap::dumpFailed(const MemberId& dumpee) { dumpees.erase(dumpee); }
+void ClusterMap::dumpError(const MemberId& dumpee) {
+ if (stalled) {
+ stallq.push_back(boost::bind(&ClusterMap::dumpError, this, dumpee));
+ return;
+ }
+ dumpees.erase(dumpee);
+}
framing::ClusterMapBody ClusterMap::toControl() const {
framing::ClusterMapBody b;
@@ -108,15 +121,55 @@ framing::ClusterMapBody ClusterMap::toControl() const {
return b;
}
-void ClusterMap::fromControl(const framing::ClusterMapBody& b) {
+void ClusterMap::init(const FieldTable& ftMembers,const FieldTable& ftDumpees, const FieldTable& ftDumps) {
*this = ClusterMap(); // Reset any current contents.
FieldTable::ValueMap::const_iterator i;
- for (i = b.getMembers().begin(); i != b.getMembers().end(); ++i)
+ for (i = ftMembers.begin(); i != ftMembers.end(); ++i)
members[i->first] = Url(i->second->get<std::string>());
- for (i = b.getDumpees().begin(); i != b.getDumpees().end(); ++i)
+ for (i = ftDumpees.begin(); i != ftDumpees.end(); ++i)
dumpees[i->first].url = Url(i->second->get<std::string>());
- for (i = b.getDumps().begin(); i != b.getDumps().end(); ++i)
+ for (i = ftDumps.begin(); i != ftDumps.end(); ++i)
dumpees[i->first].dumper = MemberId(i->second->get<std::string>());
}
+void ClusterMap::fromControl(const framing::ClusterMapBody& b) {
+ init(b.getMembers(), b.getDumpees(), b.getDumps());
+}
+
+std::vector<Url> ClusterMap::memberUrls() const {
+ std::vector<Url> result(members.size());
+ std::transform(members.begin(), members.end(), result.begin(),
+ boost::bind(&MemberMap::value_type::second, _1));
+ return result;
+}
+
+void ClusterMap::stall() { stalled = true; }
+
+namespace {
+template <class F> void call(const F& f) { f(); }
+}
+
+void ClusterMap::unstall() {
+ stalled = false;
+ std::for_each(stallq.begin(), stallq.end(),
+ boost::bind(&boost::function<void()>::operator(), _1));
+ stallq.clear();
+}
+
+std::ostream& operator<<(std::ostream& o, const ClusterMap::MemberMap::value_type& mv) {
+ return o << mv.first << "=" << mv.second;
+}
+
+std::ostream& operator<<(std::ostream& o, const ClusterMap::DumpeeMap::value_type& dv) {
+ return o << "dump: " << dv.second.dumper << " to " << dv.first << "=" << dv.second.url;
+}
+
+std::ostream& operator<<(std::ostream& o, const ClusterMap& m) {
+ std::ostream_iterator<ClusterMap::MemberMap::value_type> im(o, "\n ");
+ std::ostream_iterator<ClusterMap::DumpeeMap::value_type> id(o, "\n ");
+ std::copy(m.members.begin(), m.members.end(), im);
+ std::copy(m.dumpees.begin(), m.dumpees.end(), id);
+ return o;
+}
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h
index 7695ebeabb..04323c5905 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h
@@ -25,9 +25,11 @@
#include "types.h"
#include "qpid/framing/ClusterMapBody.h"
#include "qpid/Url.h"
-#include <boost/optional.hpp>
+#include <boost/function.hpp>
#include <vector>
+#include <deque>
#include <map>
+#include <iosfwd>
namespace qpid {
namespace cluster {
@@ -41,21 +43,22 @@ class ClusterMap
{
public:
ClusterMap();
+
+ MemberId dumpRequest(const MemberId& from, const Url& url);
+
+ void dumpError(const MemberId&);
+
+ void ready(const MemberId& from, const Url& url);
- /** Update map for url-notice event.
- *@param from Member that sent the notice.
- *@param url URL for from.
- *@return MemberId of member that should dump to URL, or a null
- * MemberId() if no dump is needed.
- */
- MemberId urlNotice(const MemberId& from, const Url& url);
-
- /** Dump failed notice */
- void dumpFailed(const MemberId&);
-
- /** Update map for leave event */
+ /** Update map for cpg leave event */
void leave(const MemberId&);
+ /** Instead of updating the map, queue the updates for unstall */
+ void stall();
+
+ /** Apply queued updates */
+ void unstall();
+
/** Number of unfinished dumps for member. */
int dumps(const MemberId&) const;
@@ -63,13 +66,20 @@ class ClusterMap
framing::ClusterMapBody toControl() const;
/** Initialize map contents from a cluster control body. */
+ void init(const framing::FieldTable& members,
+ const framing::FieldTable& dumpees,
+ const framing::FieldTable& dumps);
+
void fromControl(const framing::ClusterMapBody&);
size_t memberCount() const { return members.size(); }
size_t dumpeeCount() const { return dumpees.size(); }
+
bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }
bool isDumpee(const MemberId& id) const { return dumpees.find(id) != dumpees.end(); }
+ std::vector<Url> memberUrls() const;
+
private:
struct Dumpee { Url url; MemberId dumper; };
typedef std::map<MemberId, Url> MemberMap;
@@ -80,7 +90,14 @@ class ClusterMap
MemberMap members;
DumpeeMap dumpees;
+ bool stalled;
+ std::deque<boost::function<void()> > stallq;
+
+ friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
+ friend std::ostream& operator<<(std::ostream& o, const ClusterMap::DumpeeMap::value_type& dv);
+ friend std::ostream& operator<<(std::ostream& o, const ClusterMap::MemberMap::value_type& mv);
};
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_CLUSTERMAP_H*/
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 00d3901886..6cc21633d3 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -79,7 +79,7 @@ void Connection::closed() {
// handler will be deleted.
//
connection.setOutputHandler(&discardHandler);
- cluster.mcastFrame(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self);
+ cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
++mcastSeq;
}
catch (const std::exception& e) {
@@ -93,7 +93,6 @@ void Connection::deliverClose () {
}
size_t Connection::decode(const char* buffer, size_t size) {
- QPID_LOG(trace, "mcastBuffer " << self << " " << mcastSeq << " " << size);
++mcastSeq;
cluster.mcastBuffer(buffer, size, self);
// FIXME aconway 2008-09-01: deserialize?
@@ -101,7 +100,6 @@ size_t Connection::decode(const char* buffer, size_t size) {
}
void Connection::deliverBuffer(Buffer& buf) {
- QPID_LOG(trace, "deliverBuffer " << self << " " << deliverSeq << " " << buf.available());
++deliverSeq;
while (decoder.decode(buf))
deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread.
diff --git a/qpid/cpp/src/qpid/cluster/DumpClient.cpp b/qpid/cpp/src/qpid/cluster/DumpClient.cpp
index 5b92552209..f20ceb2ab6 100644
--- a/qpid/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/DumpClient.cpp
@@ -28,6 +28,7 @@
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/enum.h"
+#include "qpid/log/Statement.h"
#include "qpid/Url.h"
#include <boost/bind.hpp>
@@ -43,7 +44,9 @@ using namespace framing::message;
using namespace client;
-DumpClient::DumpClient(const Url& url) {
+DumpClient::DumpClient(const Url& url, Broker& b, const boost::function<void(const char*)>& f)
+ : donor(b), failed(f)
+{
connection.open(url);
session = connection.newSession();
}
@@ -57,8 +60,7 @@ DumpClient::~DumpClient() {
static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS));
-void DumpClient::dump(Broker& donor) {
- // TODO aconway 2008-09-08: Caller must handle exceptions
+void DumpClient::dump() {
// FIXME aconway 2008-09-08: send cluster map frame first.
donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
// Catch-up exchange is used to route messages to the proper queue without modifying routing key.
@@ -67,6 +69,15 @@ void DumpClient::dump(Broker& donor) {
session.sync();
}
+void DumpClient::run() {
+ try {
+ dump();
+ } catch (const Exception& e) {
+ failed(e.what());
+ }
+ delete this;
+}
+
void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) {
session.exchangeDeclare(
ex->getName(), ex->getType(),
diff --git a/qpid/cpp/src/qpid/cluster/DumpClient.h b/qpid/cpp/src/qpid/cluster/DumpClient.h
index 447fd1abef..1c49b417d7 100644
--- a/qpid/cpp/src/qpid/cluster/DumpClient.h
+++ b/qpid/cpp/src/qpid/cluster/DumpClient.h
@@ -29,6 +29,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/sys/Runnable.h"
#include <boost/shared_ptr.hpp>
@@ -51,12 +52,12 @@ namespace cluster {
/**
* A client that dumps the contents of a local broker to a remote one using AMQP.
*/
-class DumpClient {
+class DumpClient : public sys::Runnable {
public:
- DumpClient(const Url& receiver);
+ DumpClient(const Url& url, broker::Broker& donor, const boost::function<void(const char*)>& onFail);
~DumpClient();
-
- void dump(broker::Broker& donor);
+ void dump();
+ void run(); // Will delete this when finished.
private:
void dumpQueue(const boost::shared_ptr<broker::Queue>&);
@@ -67,6 +68,8 @@ class DumpClient {
private:
client::Connection connection;
client::AsyncSession session;
+ broker::Broker& donor;
+ boost::function<void(const char*)> failed;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 82b0d3f077..3212d34775 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -98,8 +98,7 @@ void OutputInterceptor::sendDoOutput() {
// Send it anyway to keep the doOutput chain going until we are sure there's no more output
// (in deliverDoOutput)
//
- parent.getCluster().mcastFrame(AMQFrame(in_place<ClusterConnectionDeliverDoOutputBody>(
- framing::ProtocolVersion(), request)), parent.getId());
+ parent.getCluster().mcastControl(ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), request), &parent);
QPID_LOG(trace, &parent << "Send doOutput request for " << request);
}
diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h
index d62ad62b49..f48ba2db30 100644
--- a/qpid/cpp/src/qpid/cluster/types.h
+++ b/qpid/cpp/src/qpid/cluster/types.h
@@ -62,13 +62,6 @@ struct ConnectionId : public std::pair<MemberId, Connection*> {
Connection* getConnectionPtr() const { return second; }
};
-/** State of a cluster member */
-enum State {
- DISCARD, // Initially discard connection events up to my own join message.
- STALL, // All members stall while a new member joins.
- READY // Normal processing.
-};
-
std::ostream& operator<<(std::ostream&, const ConnectionId&);
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/tests/ClusterMapTest.cpp b/qpid/cpp/src/tests/ClusterMapTest.cpp
index cce0efb69f..f8ac2e22e6 100644
--- a/qpid/cpp/src/tests/ClusterMapTest.cpp
+++ b/qpid/cpp/src/tests/ClusterMapTest.cpp
@@ -38,13 +38,13 @@ Url url(const char* host) { return Url(TcpAddress(host)); }
QPID_AUTO_TEST_CASE(testNotice) {
ClusterMap m;
- BOOST_CHECK(!m.urlNotice(id(0), url("0-ready"))); // id(0) member, no dump.
+ m.ready(id(0), url("0-ready")); // id(0) member, no dump.
BOOST_CHECK(m.isMember(id(0)));
BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)1);
BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
- BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(1), url("1-dump"))); // Newbie, needs dump
+ BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(1), url("1-dump"))); // Newbie, needs dump
BOOST_CHECK(m.isMember(id(0)));
BOOST_CHECK(m.isDumpee(id(1)));
BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
@@ -52,7 +52,7 @@ QPID_AUTO_TEST_CASE(testNotice) {
BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)1);
BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1);
- BOOST_CHECK(!m.urlNotice(id(1), url("1-ready"))); // id(1) is ready.
+ m.ready(id(1), url("1-ready")); // id(1) is ready.
BOOST_CHECK(m.isMember(id(0)));
BOOST_CHECK(m.isMember(id(1)));
BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
@@ -60,25 +60,25 @@ QPID_AUTO_TEST_CASE(testNotice) {
BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
- BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(2), url("2-dump"))); // id(2) needs dump
+ BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(2), url("2-dump"))); // id(2) needs dump
BOOST_CHECK(m.isDumpee(id(2)));
BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1);
- BOOST_CHECK_EQUAL(id(1), m.urlNotice(id(3), url("3-dump"))); // 0 busy, dump to id(1).
+ BOOST_CHECK_EQUAL(id(1), m.dumpRequest(id(3), url("3-dump"))); // 0 busy, dump to id(1).
BOOST_CHECK(m.isDumpee(id(3)));
BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
BOOST_CHECK_EQUAL(m.dumps(id(1)), 1);
BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)2);
- BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(4), url("4-dump"))); // Equally busy, 0 is first on list.
+ BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(4), url("4-dump"))); // Equally busy, 0 is first on list.
BOOST_CHECK_EQUAL(m.dumps(id(0)), 2);
BOOST_CHECK_EQUAL(m.dumps(id(1)), 1);
BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)3);
// My dumpees both complete
- BOOST_CHECK(!m.urlNotice(id(2), url("2-ready")));
- BOOST_CHECK(!m.urlNotice(id(4), url("4-ready")));
+ m.ready(id(2), url("2-ready"));
+ m.ready(id(4), url("4-ready"));
BOOST_CHECK(m.isMember(id(2)));
BOOST_CHECK(m.isMember(id(4)));
BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
@@ -86,7 +86,7 @@ QPID_AUTO_TEST_CASE(testNotice) {
BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1);
// Final dumpee completes.
- BOOST_CHECK(!m.urlNotice(id(3), url("3-ready")));
+ m.ready(id(3), url("3-ready"));
BOOST_CHECK(m.isMember(id(3)));
BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
BOOST_CHECK_EQUAL(m.dumps(id(1)), 0);
@@ -96,11 +96,11 @@ QPID_AUTO_TEST_CASE(testNotice) {
QPID_AUTO_TEST_CASE(testLeave) {
ClusterMap m;
- BOOST_CHECK(!m.urlNotice(id(0), url("0-ready")));
- BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(1), url("1-dump")));
- BOOST_CHECK(!m.urlNotice(id(1), url("1-ready")));
- BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(2), url("2-dump")));
- BOOST_CHECK(!m.urlNotice(id(2), url("2-ready")));
+ m.ready(id(0), url("0-ready"));
+ BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(1), url("1-dump")));
+ m.ready(id(1), url("1-ready"));
+ BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(2), url("2-dump")));
+ m.ready(id(2), url("2-ready"));
BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)3);
BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
@@ -110,13 +110,13 @@ QPID_AUTO_TEST_CASE(testLeave) {
BOOST_CHECK(m.isMember(id(0)));
BOOST_CHECK(m.isMember(id(2)));
- BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(4), url("4-dump")));
+ BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(4), url("4-dump")));
BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
BOOST_CHECK(m.isDumpee(id(4)));
BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1);
- m.dumpFailed(id(4)); // Dumper detected a failure.
+ m.dumpError(id(4)); // Dumper detected a failure.
BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
BOOST_CHECK(!m.isDumpee(id(4)));
BOOST_CHECK(!m.isMember(id(4)));
@@ -127,7 +127,7 @@ QPID_AUTO_TEST_CASE(testLeave) {
BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
- BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(5), url("5-dump")));
+ BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(5), url("5-dump")));
BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
BOOST_CHECK(m.isDumpee(id(5)));
BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
@@ -140,19 +140,19 @@ QPID_AUTO_TEST_CASE(testLeave) {
BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
- m.dumpFailed(id(5)); // Dumper reports failure - no op, we already know.
+ m.dumpError(id(5)); // Dumper reports failure - no op, we already know.
BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
}
QPID_AUTO_TEST_CASE(testToControl) {
ClusterMap m;
- m.urlNotice(id(0), url("0"));
- m.urlNotice(id(1), url("1dump"));
- m.urlNotice(id(1), url("1"));
- m.urlNotice(id(2), url("2dump"));
- m.urlNotice(id(3), url("3dump"));
- m.urlNotice(id(4), url("4dump"));
+ m.ready(id(0), url("0"));
+ m.dumpRequest(id(1), url("1dump"));
+ m.ready(id(1), url("1"));
+ m.dumpRequest(id(2), url("2dump"));
+ m.dumpRequest(id(3), url("3dump"));
+ m.dumpRequest(id(4), url("4dump"));
BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)3);
@@ -188,4 +188,30 @@ QPID_AUTO_TEST_CASE(testToControl) {
BOOST_CHECK_EQUAL(s,s2);
}
+QPID_AUTO_TEST_CASE(testStall) {
+ ClusterMap m;
+ m.ready(id(0), url("0"));
+ BOOST_CHECK_EQUAL(m.memberCount(), 1);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+ m.stall();
+
+ m.dumpRequest(id(1), url("1dump"));
+ m.ready(id(1), url("1"));
+ m.leave(id(0));
+ m.dumpRequest(id(2), url("2dump"));
+ m.ready(id(2), url("2"));
+ m.dumpRequest(id(3), url("3dump"));
+ BOOST_CHECK_EQUAL(m.memberCount(), 1);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+ m.unstall();
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+ BOOST_CHECK(!m.isMember(id(0)));
+ BOOST_CHECK(m.isMember(id(1)));
+ BOOST_CHECK(m.isMember(id(2)));
+ BOOST_CHECK(m.isDumpee(id(3)));
+}
+
+
QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/DumpClientTest.cpp b/qpid/cpp/src/tests/DumpClientTest.cpp
new file mode 100644
index 0000000000..27c4174ffe
--- /dev/null
+++ b/qpid/cpp/src/tests/DumpClientTest.cpp
@@ -0,0 +1,122 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "BrokerFixture.h"
+#include "qpid/cluster/DumpClient.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/Url.h"
+#include <boost/assign.hpp>
+
+QPID_AUTO_TEST_SUITE(DumpClientTest)
+
+using namespace std;
+using namespace qpid;
+using namespace framing;
+using namespace client;
+using namespace cluster;
+using namespace sys;
+
+// Verify we can copy shared state - wiring + messages - from one
+// broker to another via the DumpClient.
+//
+QPID_AUTO_TEST_CASE(testDumpClientSharedState) {
+ BrokerFixture donor, receiver;
+ {
+ Client c(donor.getPort());
+ FieldTable args;
+ args.setString("x", "y");
+ c.session.queueDeclare("qa", arg::arguments=args);
+ c.session.queueDeclare("qb", arg::alternateExchange="amq.direct");
+
+ c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", arg::arguments=args);
+ c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", arg::bindingKey="foo");
+ c.session.messageTransfer(arg::destination="exd", arg::content=Message("one", "foo"));
+
+ c.session.exchangeDeclare("ext", arg::type="topic");
+ c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", arg::bindingKey="bar");
+ c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0));
+ c.session.messageTransfer(arg::destination="ext", arg::content=Message("one", "bar"));
+ c.session.messageTransfer(arg::destination="ext", arg::content=Message("two", "bar"));
+
+ c.session.close();
+ c.connection.close();
+ }
+ Url url(Url::getIpAddressesUrl(receiver.getPort()));
+ qpid::cluster::DumpClient dump(url, *donor.broker, 0);
+ dump.dump();
+ {
+ Client r(receiver.getPort());
+ // Verify exchanges
+ ExchangeQueryResult ex=r.session.exchangeQuery("exd");
+ BOOST_CHECK_EQUAL(ex.getType(), "direct");
+ BOOST_CHECK_EQUAL(ex.getDurable(), false);
+ BOOST_CHECK_EQUAL(ex.getNotFound(), false);
+ BOOST_CHECK_EQUAL(ex.getArguments().getString("x"), "y");
+
+ ex = r.session.exchangeQuery("ext");
+ BOOST_CHECK_EQUAL(ex.getType(), "topic");
+ BOOST_CHECK_EQUAL(ex.getNotFound(), false);
+
+ // Verify queues
+ QueueQueryResult qq = r.session.queueQuery("qa");
+ BOOST_CHECK_EQUAL(qq.getQueue(), "qa");
+ BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "");
+ BOOST_CHECK_EQUAL(qq.getArguments().getString("x"), "y");
+ BOOST_CHECK_EQUAL(qq.getMessageCount(), (unsigned)1);
+
+ qq = r.session.queueQuery("qb");
+ BOOST_CHECK_EQUAL(qq.getQueue(), "qb");
+ BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "amq.direct");
+ BOOST_CHECK_EQUAL(qq.getMessageCount(), (unsigned)2);
+
+ // Verify messages
+ Message m;
+ BOOST_CHECK(r.subs.get(m, "qa", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "one");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "exd");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "foo");
+
+ BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "one");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
+
+ BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "two");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
+
+ // Verify bindings
+ r.session.messageTransfer(arg::destination="exd", arg::content=Message("xxx", "foo"));
+ BOOST_CHECK(r.subs.get(m, "qa"));
+ BOOST_CHECK_EQUAL(m.getData(), "xxx");
+
+ r.session.messageTransfer(arg::destination="ext", arg::content=Message("yyy", "bar"));
+ BOOST_CHECK(r.subs.get(m, "qb"));
+ BOOST_CHECK_EQUAL(m.getData(), "yyy");
+
+ r.session.close();
+ r.connection.close();
+ }
+}
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk
index 40d1c0df3e..0fc95b3c7d 100644
--- a/qpid/cpp/src/tests/cluster.mk
+++ b/qpid/cpp/src/tests/cluster.mk
@@ -18,7 +18,7 @@ check_PROGRAMS+=cluster_test
cluster_test_SOURCES=unit_test.cpp cluster_test.cpp
cluster_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
-unit_test_SOURCES+=ClusterMapTest.cpp
unit_test_LDADD+=$(lib_cluster)
+unit_test_SOURCES+=ClusterMapTest.cpp DumpClientTest.cpp
endif
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 28ee439a5d..af380c629d 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -140,94 +140,6 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
-// FIXME aconway 2008-09-11: This test has to be first otherwise
-// it picks up the cluster name from a previous test and runs the
-// brokers as cluster nodes. Something wrong with option parsing...
-//
-QPID_AUTO_TEST_CASE(testDumpClientSharedState) {
- // In this test we don't want the cluster plugin to initialize, so set --cluster-name=""
- const char* argv[] = { "--cluster-name", "" };
- Broker::Options opts = parseOpts(sizeof(argv)/sizeof(*argv), argv);
-
- BrokerFixture donor(opts), receiver(opts);
- {
- Client c(donor.getPort());
- FieldTable args;
- args.setString("x", "y");
- c.session.queueDeclare("qa", arg::arguments=args);
- c.session.queueDeclare("qb", arg::alternateExchange="amq.direct");
-
- c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", arg::arguments=args);
- c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", arg::bindingKey="foo");
- c.session.messageTransfer(arg::destination="exd", arg::content=Message("one", "foo"));
-
- c.session.exchangeDeclare("ext", arg::type="topic");
- c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", arg::bindingKey="bar");
- c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0));
- c.session.messageTransfer(arg::destination="ext", arg::content=Message("one", "bar"));
- c.session.messageTransfer(arg::destination="ext", arg::content=Message("two", "bar"));
-
- c.session.close();
- c.connection.close();
- }
- qpid::cluster::DumpClient dump(Url::getIpAddressesUrl(receiver.getPort()));
- dump.dump(*donor.broker);
- {
- Client r(receiver.getPort());
- // Verify exchanges
- ExchangeQueryResult ex=r.session.exchangeQuery("exd");
- BOOST_CHECK_EQUAL(ex.getType(), "direct");
- BOOST_CHECK_EQUAL(ex.getDurable(), false);
- BOOST_CHECK_EQUAL(ex.getNotFound(), false);
- BOOST_CHECK_EQUAL(ex.getArguments().getString("x"), "y");
-
- ex = r.session.exchangeQuery("ext");
- BOOST_CHECK_EQUAL(ex.getType(), "topic");
- BOOST_CHECK_EQUAL(ex.getNotFound(), false);
-
- // Verify queues
- QueueQueryResult qq = r.session.queueQuery("qa");
- BOOST_CHECK_EQUAL(qq.getQueue(), "qa");
- BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "");
- BOOST_CHECK_EQUAL(qq.getArguments().getString("x"), "y");
- BOOST_CHECK_EQUAL(qq.getMessageCount(), (unsigned)1);
-
- qq = r.session.queueQuery("qb");
- BOOST_CHECK_EQUAL(qq.getQueue(), "qb");
- BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "amq.direct");
- BOOST_CHECK_EQUAL(qq.getMessageCount(), (unsigned)2);
-
- // Verify messages
- Message m;
- BOOST_CHECK(r.subs.get(m, "qa", TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "one");
- BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "exd");
- BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "foo");
-
- BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "one");
- BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext");
- BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
-
- BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "two");
- BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext");
- BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
-
- // Verify bindings
- r.session.messageTransfer(arg::destination="exd", arg::content=Message("xxx", "foo"));
- BOOST_CHECK(r.subs.get(m, "qa"));
- BOOST_CHECK_EQUAL(m.getData(), "xxx");
-
- r.session.messageTransfer(arg::destination="ext", arg::content=Message("yyy", "bar"));
- BOOST_CHECK(r.subs.get(m, "qb"));
- BOOST_CHECK_EQUAL(m.getData(), "yyy");
-
- r.session.close();
- r.connection.close();
- }
-}
-
// FIXME aconway 2008-09-12: finish the new join protocol.
QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchUpSharedState, 1) {
@@ -278,7 +190,7 @@ QPID_AUTO_TEST_CASE(testStall) {
BOOST_REQUIRE(q0);
BOOST_CHECK_EQUAL(q0->getMessageCount(), (unsigned)0);
// Now unstall and we should get the message.
- getGlobalCluster().unStall();
+ getGlobalCluster().ready();
Message m;
BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "foo");
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 596a00158b..cefe92c657 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -29,11 +29,19 @@ o<?xml version="1.0"?>
<!-- Cluster membership -->
- <control name = "url-notice" code="0x1" label="Url to use for a cluster member">
+ <control name = "dump-request" code="0x1" label="Url to use for a cluster member">
<field name="url" type="str16" label="URL for brain dump to new member."/>
</control>
- <control name="map" code="0x3" label="Cluster map sent to new members.">
+ <control name = "dump-error" code="0x2" label="Error while dumping to new member">
+ <field name="dumpee" type="uint64"/>
+ </control>
+
+ <control name = "ready" code="0x3" label="Cluster member ready at URL">
+ <field name="url" type="str16" label="URL for client connections."/>
+ </control>
+
+ <control name="map" code="0x4" label="Cluster map sent to new members.">
<field name="members" type="map"/> <!-- member-id -> URL -->
<field name="dumpees" type="map"/> <!-- dumpee-id -> braindump URL -->
<field name="dumps" type="map"/> <!-- dumpee-id -> donor-id -->