diff options
author | Alan Conway <aconway@apache.org> | 2008-09-12 18:07:47 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-12 18:07:47 +0000 |
commit | 0d3aecbd765846e7bdfd45850bbfe413c85eb6e0 (patch) | |
tree | 7e1a077994afdf746fb14ab406ee0caf16609b36 | |
parent | 99a2f7119b4aed3e2b2ad878a3f5905e52ccbe83 (diff) | |
download | qpid-python-0d3aecbd765846e7bdfd45850bbfe413c85eb6e0.tar.gz |
Added ClusterMap and test. Moved PollableCondition, PollableQueue to sys.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@694758 13f79535-47bb-0310-9956-ffa450edef68
22 files changed, 673 insertions, 107 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index f02e5e1644..443db3fb15 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -26,7 +26,9 @@ cluster_la_SOURCES = \ qpid/cluster/Event.h \ qpid/cluster/Event.cpp \ qpid/cluster/DumpClient.h \ - qpid/cluster/DumpClient.cpp + qpid/cluster/DumpClient.cpp \ + qpid/cluster/ClusterMap.h \ + qpid/cluster/ClusterMap.cpp cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 07ed4596e0..9db2a61a82 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -25,7 +25,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterJoiningBody.h" +#include "qpid/framing/ClusterUrlNoticeBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" @@ -50,7 +50,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { Cluster& cluster; MemberId member; ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {} - void joining(const std::string& u) { cluster.joining (member, u); } + void urlNotice(const std::string& u) { cluster.urlNotice(member, u); } void ready() { cluster.ready(member); } void members(const framing::FieldTable& , const framing::FieldTable& , const framing::FieldTable& ) { @@ -58,6 +58,11 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { } bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); } + + virtual void map(const FieldTable& ,const FieldTable& ,const FieldTable& ) { + // FIXME aconway 2008-09-12: TODO + } + }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : @@ -72,13 +77,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : 0, // write boost::bind(&Cluster::disconnect, this, _1) // disconnect ), - deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1))) + connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))), + state(DISCARD) { QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str()); broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); cpg.join(name); - deliverQueue.start(poller); + connectionEventQueue.start(poller); cpgDispatchHandle.startWatch(poller); } @@ -103,6 +109,7 @@ void Cluster::erase(ConnectionId id) { void Cluster::leave() { QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str()); cpg.leave(name); + // Cluster will shut down in configChange when the cluster knows we've left. } template <class T> void decodePtr(Buffer& buf, T*& ptr) { @@ -172,8 +179,23 @@ void Cluster::deliver( { try { MemberId from(nodeid, pid); - QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); // FIXME aconway 2008-09-10: - deliverQueue.push(Event::delivered(from, msg, msg_len)); + Event e = Event::delivered(from, msg, msg_len); + QPID_LOG(trace, "Cluster deliver: " << e); + + // Process cluster controls immediately + if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control + Buffer buf(e); + AMQFrame frame; + while (frame.decode(buf)) + if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame)) + throw Exception("Invalid cluster control"); + } + else { // Process connection controls & data via the connectionEventQueue. + if (state != DISCARD) { + e.setConnection(getConnection(e.getConnectionId())); + connectionEventQueue.push(e); + } + } } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. @@ -183,24 +205,15 @@ void Cluster::deliver( } } -void Cluster::deliverEvent(const Event& e) { - QPID_LOG(trace, "Delivered: " << e); +void Cluster::connectionEvent(const Event& e) { Buffer buf(e); - if (e.getConnection().getConnectionPtr() == 0) { // Cluster control + assert(e.getConnection()); + if (e.getType() == DATA) + e.getConnection()->deliverBuffer(buf); + else { // control AMQFrame frame; - while (frame.decode(buf)) - if (!ClusterOperations(*this, e.getConnection().getMember()).invoke(frame)) - throw Exception("Invalid cluster control"); - } - else { // Connection data or control - boost::intrusive_ptr<Connection> c = getConnection(e.getConnection()); - if (e.getType() == DATA) - c->deliverBuffer(buf); - else { // control - AMQFrame frame; - while (frame.decode(buf)) - c->deliver(frame); - } + while (frame.decode(buf)) + e.getConnection()->deliver(frame); } } @@ -239,7 +252,7 @@ void Cluster::configChange( if (nJoined) // Notfiy new members of my URL. mcastFrame( - AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())), + AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())), ConnectionId(self,0)); if (find(left, left+nLeft, self) != left+nLeft) { @@ -266,8 +279,15 @@ void Cluster::disconnect(sys::DispatchHandle& ) { broker.shutdown(); } -void Cluster::joining(const MemberId& m, const string& url) { +void Cluster::urlNotice(const MemberId& m, const string& url) { + //FIXME aconway 2008-09-12: Rdo join logic using ClusterMap. Implement xml map function also. + //FIXME aconway 2008-09-11: Note multiple meanings of my own notice - + //from DISCARD->STALL and from STALL->READY via map. + QPID_LOG(info, "Cluster member " << m << " has URL " << url); + // My brain dump is up to this point, stall till it is complete. + if (m == self && state == DISCARD) + state = STALL; urls.insert(UrlMap::value_type(m,Url(url))); } @@ -289,4 +309,18 @@ void Cluster::shutdown() { broker::Broker& Cluster::getBroker(){ return broker; } +void Cluster::stall() { + // Stop processing connection events. We still process config changes + // and cluster controls in deliver() + + // FIXME aconway 2008-09-11: Flow control, we should slow down or + // stop reading from local connections while stalled to avoid an + // unbounded queue. + connectionEventQueue.stop(); +} + +void Cluster::unStall() { + connectionEventQueue.start(poller); +} + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 3a254684ad..5187cb08e7 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -75,11 +75,14 @@ class Cluster : private Cpg::Handler /** Leave the cluster */ void leave(); - void joining(const MemberId&, const std::string& url); + void urlNotice(const MemberId&, const std::string& url); void ready(const MemberId&); MemberId getSelf() const { return self; } + void stall(); + void unStall(); + void shutdown(); broker::Broker& getBroker(); @@ -88,15 +91,13 @@ class Cluster : private Cpg::Handler typedef std::map<MemberId, Url> UrlMap; typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; typedef sys::PollableQueue<Event> EventQueue; + enum State { + DISCARD, // Initially discard connection events up to my own join message. + READY, // Normal processing. + STALL // Stalled while a new member joins. + }; - boost::function<void()> shutdownNext; - - /** Handle a delivered frame */ - void deliverFrame(framing::AMQFrame&, const ConnectionId&); - - void deliverBuffer(const char*, size_t, const ConnectionId&); - - void deliverEvent(const Event&); + void connectionEvent(const Event&); /** CPG deliver callback. */ void deliver( @@ -136,7 +137,8 @@ class Cluster : private Cpg::Handler ConnectionMap connections; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; - EventQueue deliverQueue; + EventQueue connectionEventQueue; + State state; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&); diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp new file mode 100644 index 0000000000..b0c45ad625 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ClusterMap.h" +#include "qpid/Url.h" +#include "qpid/framing/FieldTable.h" +#include <boost/bind.hpp> +#include <algorithm> +#include <functional> + +namespace qpid { +using namespace framing; + +namespace cluster { + +ClusterMap::ClusterMap() {} + +MemberId ClusterMap::urlNotice(const MemberId& id, const Url& url) { + if (isMember(id)) return MemberId(); // Ignore notices from established members. + if (isDumpee(id)) { + // Dumpee caught up, graduate to member with new URL and remove dumper from list. + dumpees.erase(id); + members[id] = url; + } + else if (members.empty()) { + // First in cluster, congratulations! + members[id] = url; + } + else { + // New member needs brain dump. + MemberId dumper = nextDumper(); + Dumpee& d = dumpees[id]; + d.url = url; + d.dumper = dumper; + return dumper; + } + return MemberId(); +} + +MemberId ClusterMap::nextDumper() const { + // Choose the first member in member-id order of the group that + // has the least number of dumps-in-progress. + assert(!members.empty()); + MemberId dumper = members.begin()->first; + int minDumps = dumps(dumper); + MemberMap::const_iterator i = ++members.begin(); + while (i != members.end()) { + int d = dumps(i->first); + if (d < minDumps) { + minDumps = d; + dumper = i->first; + } + ++i; + } + return dumper; +} + +void ClusterMap::leave(const MemberId& id) { + if (isDumpee(id)) + dumpees.erase(id); + if (isMember(id)) { + members.erase(id); + DumpeeMap::iterator i = dumpees.begin(); + while (i != dumpees.end()) { + if (i->second.dumper == id) dumpees.erase(i++); + else ++i; + } + } +} + +struct ClusterMap::MatchDumper { + MemberId d; + MatchDumper(const MemberId& i) : d(i) {} + bool operator()(const DumpeeMap::value_type& v) const { return v.second.dumper == d; } +}; + +int ClusterMap::dumps(const MemberId& id) const { + return std::count_if(dumpees.begin(), dumpees.end(), MatchDumper(id)); +} + +void ClusterMap::dumpFailed(const MemberId& dumpee) { dumpees.erase(dumpee); } + +framing::ClusterMapBody ClusterMap::toControl() const { + framing::ClusterMapBody b; + for (MemberMap::const_iterator i = members.begin(); i != members.end(); ++i) + b.getMembers().setString(i->first.str(), i->second.str()); + for (DumpeeMap::const_iterator i = dumpees.begin(); i != dumpees.end(); ++i) { + b.getDumpees().setString(i->first.str(), i->second.url.str()); + b.getDumps().setString(i->first.str(), i->second.dumper.str()); + } + return b; +} + +void ClusterMap::fromControl(const framing::ClusterMapBody& b) { + *this = ClusterMap(); // Reset any current contents. + FieldTable::ValueMap::const_iterator i; + for (i = b.getMembers().begin(); i != b.getMembers().end(); ++i) + members[i->first] = Url(i->second->get<std::string>()); + for (i = b.getDumpees().begin(); i != b.getDumpees().end(); ++i) + dumpees[i->first].url = Url(i->second->get<std::string>()); + for (i = b.getDumps().begin(); i != b.getDumps().end(); ++i) + dumpees[i->first].dumper = MemberId(i->second->get<std::string>()); +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h new file mode 100644 index 0000000000..7695ebeabb --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h @@ -0,0 +1,86 @@ +#ifndef QPID_CLUSTER_CLUSTERMAP_H +#define QPID_CLUSTER_CLUSTERMAP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "qpid/framing/ClusterMapBody.h" +#include "qpid/Url.h" +#include <boost/optional.hpp> +#include <vector> +#include <map> + +namespace qpid { +namespace cluster { + +/** + * Map of established cluster members and brain-dumps in progress. + * A dumper is an established member that is sending catch-up data. + * A dumpee is an aspiring member that is receiving catch-up data. + */ +class ClusterMap +{ + public: + ClusterMap(); + + /** Update map for url-notice event. + *@param from Member that sent the notice. + *@param url URL for from. + *@return MemberId of member that should dump to URL, or a null + * MemberId() if no dump is needed. + */ + MemberId urlNotice(const MemberId& from, const Url& url); + + /** Dump failed notice */ + void dumpFailed(const MemberId&); + + /** Update map for leave event */ + void leave(const MemberId&); + + /** Number of unfinished dumps for member. */ + int dumps(const MemberId&) const; + + /** Convert map contents to a cluster control body. */ + framing::ClusterMapBody toControl() const; + + /** Initialize map contents from a cluster control body. */ + void fromControl(const framing::ClusterMapBody&); + + size_t memberCount() const { return members.size(); } + size_t dumpeeCount() const { return dumpees.size(); } + bool isMember(const MemberId& id) const { return members.find(id) != members.end(); } + bool isDumpee(const MemberId& id) const { return dumpees.find(id) != dumpees.end(); } + + private: + struct Dumpee { Url url; MemberId dumper; }; + typedef std::map<MemberId, Url> MemberMap; + typedef std::map<MemberId, Dumpee> DumpeeMap; + struct MatchDumper; + + MemberId nextDumper() const; + + MemberMap members; + DumpeeMap dumpees; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CLUSTERMAP_H*/ diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index f4128634a6..8457467196 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -76,7 +76,6 @@ struct ClusterPlugin : public Plugin { void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified. - QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of cluster plugin."); cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker); broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 68d1b16dfa..00d3901886 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -19,6 +19,7 @@ * */ #include "Connection.h" +#include "Cluster.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/Invoker.h" #include "qpid/framing/AllInvoker.h" diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index a30350585f..37ff2ac6b4 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -23,9 +23,9 @@ */ #include "types.h" -#include "Cluster.h" #include "WriteEstimate.h" #include "OutputInterceptor.h" +#include "NoOpConnectionOutputHandler.h" #include "qpid/broker/Connection.h" #include "qpid/amqp_0_10/Connection.h" @@ -39,6 +39,7 @@ namespace qpid { namespace framing { class AMQFrame; } namespace cluster { +class Cluster; /** Intercept broker::Connection calls for shadow and local cluster connections. */ class Connection : diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp index 6179eab724..ed046f2ede 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -20,6 +20,7 @@ */ #include "ConnectionCodec.h" #include "Connection.h" +#include "Cluster.h" #include "ProxyInputHandler.h" #include "qpid/broker/Connection.h" #include "qpid/log/Statement.h" diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp index 96a5b3da43..2a77fa437a 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.cpp +++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp @@ -183,4 +183,15 @@ ostream& operator<<(ostream& o, const ConnectionId& c) { return o << c.first << "-" << c.second; } +std::string MemberId::str() const { + char s[8]; + reinterpret_cast<uint32_t&>(s[0]) = htonl(first); + reinterpret_cast<uint32_t&>(s[4]) = htonl(second); + return std::string(s,8); +} + +MemberId::MemberId(const std::string& s) { + first = ntohl(reinterpret_cast<const uint32_t&>(s[0])); + second = ntohl(reinterpret_cast<const uint32_t&>(s[4])); +} }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp index 4dfb7ab400..89c6268d7f 100644 --- a/qpid/cpp/src/qpid/cluster/Event.cpp +++ b/qpid/cpp/src/qpid/cluster/Event.cpp @@ -34,7 +34,7 @@ using framing::Buffer; const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t); Event::Event(EventType t, const ConnectionId c, const size_t s) - : type(t), connection(c), size(s), data(RefCountedBuffer::create(s)) {} + : type(t), connectionId(c), size(s), data(RefCountedBuffer::create(s)) {} Event Event::delivered(const MemberId& m, void* d, size_t s) { Buffer buf(static_cast<char*>(d), s); @@ -50,7 +50,7 @@ void Event::mcast (const Cpg::Name& name, Cpg& cpg) const { char header[OVERHEAD]; Buffer b(header, OVERHEAD); b.putOctet(type); - b.putLongLong(reinterpret_cast<uint64_t>(connection.getConnectionPtr())); + b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getConnectionPtr())); iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } }; cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov)); } @@ -61,7 +61,7 @@ Event::operator Buffer() const { static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; std::ostream& operator << (std::ostream& o, const Event& e) { - o << "[event: " << e.getConnection() + o << "[event: " << e.getConnectionId() << " " << EVENT_TYPE_NAMES[e.getType()] << " " << e.getSize() << " bytes: "; std::ostream_iterator<char> oi(o,""); diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h index d0e12831f4..a0f9bc0e49 100644 --- a/qpid/cpp/src/qpid/cluster/Event.h +++ b/qpid/cpp/src/qpid/cluster/Event.h @@ -24,6 +24,7 @@ #include "types.h" #include "Cpg.h" +#include "Connection.h" #include "qpid/RefCountedBuffer.h" #include "qpid/framing/Buffer.h" #include <iosfwd> @@ -39,7 +40,7 @@ namespace cluster { * Events are sent to/received from the cluster. * Refcounted so they can be stored on queues. */ -struct Event { +class Event { public: /** Create an event to mcast with a buffer of size bytes. */ Event(EventType t=DATA, const ConnectionId c=ConnectionId(), size_t size=0); @@ -50,17 +51,21 @@ struct Event { void mcast(const Cpg::Name& name, Cpg& cpg) const; EventType getType() const { return type; } - ConnectionId getConnection() const { return connection; } + ConnectionId getConnectionId() const { return connectionId; } size_t getSize() const { return size; } char* getData() { return data; } const char* getData() const { return data; } + boost::intrusive_ptr<Connection> getConnection() const { return connection; } + void setConnection(const boost::intrusive_ptr<Connection>& c) { connection=c; } + operator framing::Buffer() const; private: static const size_t OVERHEAD; EventType type; - ConnectionId connection; + ConnectionId connectionId; + boost::intrusive_ptr<Connection> connection; size_t size; RefCountedBuffer::pointer data; }; diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index ae021a9c4a..82b0d3f077 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -20,6 +20,7 @@ */ #include "OutputInterceptor.h" #include "Connection.h" +#include "Cluster.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h index 0cd6f1afbb..d62ad62b49 100644 --- a/qpid/cpp/src/qpid/cluster/types.h +++ b/qpid/cpp/src/qpid/cluster/types.h @@ -23,6 +23,8 @@ */ #include <utility> #include <iosfwd> +#include <string> + #include <stdint.h> extern "C" { @@ -39,10 +41,15 @@ enum EventType { DATA, CONTROL }; /** first=node-id, second=pid */ struct MemberId : std::pair<uint32_t, uint32_t> { - MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {} + explicit MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {} MemberId(const cpg_address& caddr) : std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {} + MemberId(const std::string&); // Decode from string. uint32_t getNode() const { return first; } uint32_t getPid() const { return second; } + operator bool() const { return first || second; } + + // Encode as string, network byte order. + std::string str() const; }; inline bool operator==(const cpg_address& caddr, const MemberId& id) { return id == MemberId(caddr); } @@ -55,6 +62,13 @@ struct ConnectionId : public std::pair<MemberId, Connection*> { Connection* getConnectionPtr() const { return second; } }; +/** State of a cluster member */ +enum State { + DISCARD, // Initially discard connection events up to my own join message. + STALL, // All members stall while a new member joins. + READY // Normal processing. +}; + std::ostream& operator<<(std::ostream&, const ConnectionId&); }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h index 2e5d3a0d3d..153ae31135 100644 --- a/qpid/cpp/src/qpid/sys/PollableQueue.h +++ b/qpid/cpp/src/qpid/sys/PollableQueue.h @@ -24,7 +24,7 @@ #include "qpid/sys/PollableCondition.h" #include "qpid/sys/Dispatcher.h" -#include "qpid/sys/Mutex.h" +#include "qpid/sys/Monitor.h" #include <boost/function.hpp> #include <boost/bind.hpp> #include <algorithm> @@ -54,58 +54,84 @@ class PollableQueue { /** Callback to process a range of items. */ typedef boost::function<void (const iterator&, const iterator&)> Callback; - /** Functor tempalate to create a Callback from a functor that handles a single item. */ + /** @see forEach() */ template <class F> struct ForEach { F handleOne; ForEach(const F& f) : handleOne(f) {} void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); } }; - /** Function to create ForEach instances */ + + /** Create a range callback from a functor that processes a single item. */ template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); } /** When the queue is selected by the poller, values are passed to callback cb. */ explicit PollableQueue(const Callback& cb); /** Push a value onto the queue. Thread safe */ - void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); } + void push(const T& t); /** Start polling. */ - void start(const boost::shared_ptr<sys::Poller>& poller) { handle.startWatch(poller); } + void start(const boost::shared_ptr<sys::Poller>& poller); - /** Stop polling. */ - void stop() { handle.stopWatch(); } + /** Stop polling and wait for the current callback, if any, to complete. */ + void stop(); private: - typedef sys::Mutex::ScopedLock ScopedLock; - typedef sys::Mutex::ScopedUnlock ScopedUnlock; + typedef sys::Monitor::ScopedLock ScopedLock; + typedef sys::Monitor::ScopedUnlock ScopedUnlock; void dispatch(sys::DispatchHandle&); - sys::Mutex lock; + sys::Monitor lock; Callback callback; PollableCondition condition; sys::DispatchHandle handle; Queue queue; Queue batch; + bool dispatching, stopped; }; template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: : callback(cb), - handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0) + handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), + dispatching(false), stopped(true) {} +template <class T> void PollableQueue<T>::start(const boost::shared_ptr<sys::Poller>& poller) { + ScopedLock l(lock); + stopped = false; + handle.startWatch(poller); +} + +template <class T> void PollableQueue<T>::push(const T& t) { + ScopedLock l(lock); + queue.push_back(t); + condition.set(); +} + template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { - ScopedLock l(lock); // Lock for concurrent push() - batch.clear(); - batch.swap(queue); + ScopedLock l(lock); + if (stopped) return; + dispatching = true; condition.clear(); + batch.clear(); + batch.swap(queue); // Snapshot of current queue contents. { // Process outside the lock to allow concurrent push. ScopedUnlock u(lock); callback(batch.begin(), batch.end()); - h.rewatch(); } batch.clear(); + dispatching = false; + if (stopped) lock.notifyAll(); + else h.rewatch(); +} + +template <class T> void PollableQueue<T>::stop() { + ScopedLock l(lock); + handle.stopWatch(); + stopped = true; + while (dispatching) lock.wait(); } }} // namespace qpid::sys diff --git a/qpid/cpp/src/tests/ClusterMapTest.cpp b/qpid/cpp/src/tests/ClusterMapTest.cpp new file mode 100644 index 0000000000..344dd71eb5 --- /dev/null +++ b/qpid/cpp/src/tests/ClusterMapTest.cpp @@ -0,0 +1,191 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +#include "unit_test.h" +#include "test_tools.h" +#include "qpid/cluster/ClusterMap.h" +#include "qpid/framing/ClusterMapBody.h" +#include "qpid/framing/Buffer.h" +#include "qpid/Url.h" +#include <boost/assign.hpp> + +QPID_AUTO_TEST_SUITE(CluterMapTest) + +using namespace std; +using namespace qpid; +using namespace cluster; +using namespace framing; + +MemberId id(int i) { return MemberId(i,i); } + +Url url(const char* host) { return Url(TcpAddress(host)); } + +QPID_AUTO_TEST_CASE(testNotice) { + ClusterMap m; + BOOST_CHECK(!m.urlNotice(id(0), url("0-ready"))); // id(0) member, no dump. + BOOST_CHECK(m.isMember(id(0))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); + BOOST_CHECK_EQUAL(m.memberCount(), 1); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + + BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(1), url("1-dump"))); // Newbie, needs dump + BOOST_CHECK(m.isMember(id(0))); + BOOST_CHECK(m.isDumpee(id(1))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); + BOOST_CHECK_EQUAL(m.dumps(id(1)), 0); + BOOST_CHECK_EQUAL(m.memberCount(), 1); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 1); + + BOOST_CHECK(!m.urlNotice(id(1), url("1-ready"))); // id(1) is ready. + BOOST_CHECK(m.isMember(id(0))); + BOOST_CHECK(m.isMember(id(1))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); + BOOST_CHECK_EQUAL(m.dumps(id(1)), 0); + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + + BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(2), url("2-dump"))); // id(2) needs dump + BOOST_CHECK(m.isDumpee(id(2))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 1); + + BOOST_CHECK_EQUAL(id(1), m.urlNotice(id(3), url("3-dump"))); // 0 busy, dump to id(1). + BOOST_CHECK(m.isDumpee(id(3))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); + BOOST_CHECK_EQUAL(m.dumps(id(1)), 1); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 2); + + BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(4), url("4-dump"))); // Equally busy, 0 is first on list. + BOOST_CHECK_EQUAL(m.dumps(id(0)), 2); + BOOST_CHECK_EQUAL(m.dumps(id(1)), 1); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 3); + + // My dumpees both complete + BOOST_CHECK(!m.urlNotice(id(2), url("2-ready"))); + BOOST_CHECK(!m.urlNotice(id(4), url("4-ready"))); + BOOST_CHECK(m.isMember(id(2))); + BOOST_CHECK(m.isMember(id(4))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); + BOOST_CHECK_EQUAL(m.dumps(id(1)), 1); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 1); + + // Final dumpee completes. + BOOST_CHECK(!m.urlNotice(id(3), url("3-ready"))); + BOOST_CHECK(m.isMember(id(3))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); + BOOST_CHECK_EQUAL(m.dumps(id(1)), 0); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + +} + +QPID_AUTO_TEST_CASE(testLeave) { + ClusterMap m; + BOOST_CHECK(!m.urlNotice(id(0), url("0-ready"))); + BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(1), url("1-dump"))); + BOOST_CHECK(!m.urlNotice(id(1), url("1-ready"))); + BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(2), url("2-dump"))); + BOOST_CHECK(!m.urlNotice(id(2), url("2-ready"))); + BOOST_CHECK_EQUAL(m.memberCount(), 3); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + + m.leave(id(1)); + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + BOOST_CHECK(m.isMember(id(0))); + BOOST_CHECK(m.isMember(id(2))); + + BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(4), url("4-dump"))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); + BOOST_CHECK(m.isDumpee(id(4))); + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 1); + + m.dumpFailed(id(4)); // Dumper detected a failure. + BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); + BOOST_CHECK(!m.isDumpee(id(4))); + BOOST_CHECK(!m.isMember(id(4))); + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + + m.leave(id(4)); // Dumpee leaves, no-op since we already know it failed. + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + + BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(5), url("5-dump"))); + BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); + BOOST_CHECK(m.isDumpee(id(5))); + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 1); + + m.leave(id(5)); // Dumpee detects failure and leaves cluster. + BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); + BOOST_CHECK(!m.isDumpee(id(5))); + BOOST_CHECK(!m.isMember(id(5))); + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + + m.dumpFailed(id(5)); // Dumper reports failure - no op, we already know. + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); +} + +QPID_AUTO_TEST_CASE(testToControl) { + ClusterMap m; + m.urlNotice(id(0), url("0")); + m.urlNotice(id(1), url("1dump")); + m.urlNotice(id(1), url("1")); + m.urlNotice(id(2), url("2dump")); + m.urlNotice(id(3), url("3dump")); + m.urlNotice(id(4), url("4dump")); + + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 3); + + ClusterMapBody b = m.toControl(); + + BOOST_CHECK_EQUAL(b.getMembers().count(), 2); + BOOST_CHECK_EQUAL(b.getMembers().getString(id(0).str()), url("0").str()); + BOOST_CHECK_EQUAL(b.getMembers().getString(id(1).str()), url("1").str()); + + BOOST_CHECK_EQUAL(b.getDumpees().count(), 3); + BOOST_CHECK_EQUAL(b.getDumpees().getString(id(2).str()), url("2dump").str()); + BOOST_CHECK_EQUAL(b.getDumpees().getString(id(3).str()), url("3dump").str()); + BOOST_CHECK_EQUAL(b.getDumpees().getString(id(4).str()), url("4dump").str()); + + BOOST_CHECK_EQUAL(b.getDumps().count(), 3); + BOOST_CHECK_EQUAL(b.getDumps().getString(id(2).str()), id(0).str()); + BOOST_CHECK_EQUAL(b.getDumps().getString(id(3).str()), id(1).str()); + BOOST_CHECK_EQUAL(b.getDumps().getString(id(4).str()), id(0).str()); + + std::string s(b.size(), '\0'); + Buffer buf(&s[0], s.size()); + b.encode(buf); + + ClusterMap m2; + m2.fromControl(b); + ClusterMapBody b2 = m2.toControl(); + std::string s2(b2.size(), '\0'); + Buffer buf2(&s2[0], s2.size()); + b2.encode(buf2); + + // Verify a round-trip encoding produces identical results. + BOOST_CHECK_EQUAL(s,s2); +} + +QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 3b82aff8a8..8b0c12e47c 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -28,7 +28,7 @@ CLEANFILES= TESTS+=unit_test check_PROGRAMS+=unit_test unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \ - $(lib_client) $(lib_broker) # $(lib_amqp_0_10) + $(lib_client) $(lib_broker) unit_test_SOURCES= unit_test.cpp unit_test.h \ BrokerFixture.h SocketProxy.h \ diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index 3454beb9bc..40d1c0df3e 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -18,4 +18,7 @@ check_PROGRAMS+=cluster_test cluster_test_SOURCES=unit_test.cpp cluster_test.cpp cluster_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework +unit_test_SOURCES+=ClusterMapTest.cpp +unit_test_LDADD+=$(lib_cluster) + endif diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 1f9aac8fc5..da77d7405e 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -59,12 +59,19 @@ using boost::ptr_vector; using qpid::cluster::Cluster; using qpid::cluster::getGlobalCluster; +/** Parse broker & cluster options */ +Broker::Options parseOpts(size_t argc, const char* argv[]) { + Broker::Options opts; + Plugin::addOptions(opts); // Pick up cluster options. + opts.parse(argc, argv, "", true); // Allow-unknown for --load-module + return opts; +} + /** Cluster fixture is a vector of ports for the replicas. * Replica 0 is in the current process, all others are forked as children. */ struct ClusterFixture : public vector<uint16_t> { string name; - Broker::Options opts; std::auto_ptr<BrokerFixture> broker0; boost::ptr_vector<ForkedBroker> forkedBrokers; @@ -96,7 +103,7 @@ void ClusterFixture::add() { const char* argv[] = { "qpidd " __FILE__ , - "--load-module=../.libs/libqpidcluster.so", + "--load-module=../.libs/cluster.so", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir", "--log-prefix", prefix.c_str(), @@ -108,11 +115,8 @@ void ClusterFixture::add() { push_back(forkedBrokers.back().getPort()); } else { // First broker, run in this process. - Broker::Options opts; qpid::log::Logger::instance().setPrefix("main"); - Plugin::addOptions(opts); // Pick up cluster options. - opts.parse(argc, argv, "", true); // Allow-unknown for --load-module - broker0.reset(new BrokerFixture(opts)); + broker0.reset(new BrokerFixture(parseOpts(argc, argv))); push_back(broker0->getPort()); } } @@ -136,38 +140,16 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } -#if 0 // FIXME aconway 2008-09-10: finish & enable -QPID_AUTO_TEST_CASE(testDumpConsumers) { - ClusterFixture cluster(1); - Client a(cluster[0]); - a.session.queueDeclare("q"); - a.subs.subscribe(a.lq, "q"); - - cluster.add(); - Client b(cluster[1]); - try { - b.connection.newSession(a.session.getId().getName()); - BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName()); - } catch (const SessionBusyException&) {} - - // Transfer some messages to the subscription by client a. - Message m; - a.session.messageTransfer(arg::bindingKey="q", arg::content=Message("aaa", "q")); - BOOST_CHECK(a.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "aaa"); - - b.session.messageTransfer(arg::bindingKey="q", arg::content=Message("bbb", "q")); - BOOST_CHECK(a.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "bbb"); - - // Verify that the queue has been drained on both brokers. - // This proves that the consumer was replicated when the second broker joined. - BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0); -} -#endif - +// FIXME aconway 2008-09-11: This test has to be first otherwise +// it picks up the cluster name from a previous test and runs the +// brokers as cluster nodes. Something wrong with option parsing... +// QPID_AUTO_TEST_CASE(testDumpClientSharedState) { - BrokerFixture donor, receiver; + // In this test we don't want the cluster plugin to initialize, so set --cluster-name="" + const char* argv[] = { "--cluster-name", "" }; + Broker::Options opts = parseOpts(sizeof(argv)/sizeof(*argv), argv); + + BrokerFixture donor(opts), receiver(opts); { Client c(donor.getPort()); FieldTable args; @@ -246,6 +228,94 @@ QPID_AUTO_TEST_CASE(testDumpClientSharedState) { } } + +// FIXME aconway 2008-09-12: finish the new join protocol. +QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchUpSharedState, 1) { + ClusterFixture cluster(1); + Client c0(cluster[0], "c0"); + // Create some shared state. + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=Message("foo","q")); + while (c0.session.queueQuery("q").getMessageCount() != 1) + ::usleep(1000); // Wait for message to show up on broker 0. + + // Now join new broker, should catch up. + cluster.add(); + c0.session.messageTransfer(arg::content=Message("bar","q")); + c0.session.queueDeclare("p"); + c0.session.messageTransfer(arg::content=Message("poo","p")); + + // Verify new broker has all state. + Message m; + Client c1(cluster[1], "c1"); + BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "foo"); + BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "bar"); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0); + BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "poo"); + BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0); +} + +QPID_AUTO_TEST_CASE(testStall) { + ClusterFixture cluster(2); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + + // Declare on all to avoid race condition. + c0.session.queueDeclare("q"); + c1.session.queueDeclare("q"); + + // Stall 0, verify it does not process deliverys while stalled. + getGlobalCluster().stall(); + c1.session.messageTransfer(arg::content=Message("foo","q")); + while (c1.session.queueQuery("q").getMessageCount() != 1) + ::usleep(1000); // Wait for message to show up on broker 1. + sleep(2); // FIXME aconway 2008-09-11: remove. + // But it should not be on broker 0. + boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q"); + BOOST_REQUIRE(q0); + BOOST_CHECK_EQUAL(q0->getMessageCount(), 0); + // Now unstall and we should get the message. + getGlobalCluster().unStall(); + Message m; + BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "foo"); +} + +#if 0 // FIXME aconway 2008-09-10: finish & enable +QPID_AUTO_TEST_CASE(testDumpConsumers) { + ClusterFixture cluster(1); + Client a(cluster[0]); + a.session.queueDeclare("q"); + a.subs.subscribe(a.lq, "q"); + + cluster.add(); + Client b(cluster[1]); + try { + b.connection.newSession(a.session.getId().getName()); + BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName()); + } catch (const SessionBusyException&) {} + + // Transfer some messages to the subscription by client a. + Message m; + a.session.messageTransfer(arg::content=Message("aaa", "q")); + BOOST_CHECK(a.lq.get(m, TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "aaa"); + + b.session.messageTransfer(arg::content=Message("bbb", "q")); + BOOST_CHECK(a.lq.get(m, TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "bbb"); + + // Verify that the queue has been drained on both brokers. + // This proves that the consumer was replicated when the second broker joined. + BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0); +} + + +#endif + QPID_AUTO_TEST_CASE(testForkedBroker) { // Verify the ForkedBroker works as expected. const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" }; diff --git a/qpid/cpp/src/tests/start_cluster b/qpid/cpp/src/tests/start_cluster index 6d254190df..0c1722e566 100755 --- a/qpid/cpp/src/tests/start_cluster +++ b/qpid/cpp/src/tests/start_cluster @@ -12,7 +12,7 @@ test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; } rm -f cluster*.log SIZE=$1; shift CLUSTER=`pwd` # Cluster name=pwd, avoid clashes. -OPTS="-d --load-module ../.libs/libqpidcluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $*" +OPTS="-d --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $*" if test "$SIZE" = "one"; then # Special case of singleton cluster, use default port. ../qpidd -q diff --git a/qpid/cpp/src/tests/start_cluster_hosts b/qpid/cpp/src/tests/start_cluster_hosts index 683798453b..f1dce9f0de 100755 --- a/qpid/cpp/src/tests/start_cluster_hosts +++ b/qpid/cpp/src/tests/start_cluster_hosts @@ -15,7 +15,7 @@ # QPIDD=${QPIDD:-$PWD/../qpidd} -LIBQPIDCLUSTER=${LIBQPIDCLUSTER:-$PWD/../.libs/libqpidcluster.so} +LIBQPIDCLUSTER=${LIBQPIDCLUSTER:-$PWD/../.libs/cluster.so} NAME=$USER # User name is default cluster name. RESTART=NO diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 13c9054ba3..596a00158b 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -29,17 +29,14 @@ o<?xml version="1.0"?> <!-- Cluster membership --> - <control name = "joining" code="0x1"> - <field name="joining" type="str16" label="URL of new member joining cluster."/> + <control name = "url-notice" code="0x1" label="Url to use for a cluster member"> + <field name="url" type="str16" label="URL for brain dump to new member."/> </control> - - <control name="ready" code="0x2" label="New member is ready."/> - - <control name="members" code="0x3" label="Cluster map sent to new members."> + <control name="map" code="0x3" label="Cluster map sent to new members."> <field name="members" type="map"/> <!-- member-id -> URL --> - <field name="donors" type="map"/> <!-- member-id -> uint32 (donor-count) --> - <field name="newbies" type="map"/> <!-- member-id -> URL --> + <field name="dumpees" type="map"/> <!-- dumpee-id -> braindump URL --> + <field name="dumps" type="map"/> <!-- dumpee-id -> donor-id --> </control> <!-- Transferring broker state --> |