diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 187 |
1 files changed, 64 insertions, 123 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index e64d80e214..53f0ccc08c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -23,8 +23,6 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/Connection.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AMQP_AllOperations.h" -#include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterDumpRequestBody.h" #include "qpid/framing/ClusterUpdateBody.h" #include "qpid/framing/ClusterReadyBody.h" @@ -55,17 +53,6 @@ using qpid::management::Manageable; using qpid::management::Args; namespace _qmf = qmf::org::apache::qpid::cluster; -struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { - Cluster& cluster; - MemberId member; - ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {} - bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); } - - void update(const FieldTable& members, uint64_t dumping) { cluster.update(members, dumping); } - void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url); } - void ready(const std::string& url) { cluster.ready(member, url); } -}; - Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : broker(b), poller(b.getPoller()), @@ -79,16 +66,18 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : boost::bind(&Cluster::disconnect, this, _1) // disconnect ), connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))), - state(START) + handler(&joiningHandler), + joiningHandler(*this), + memberHandler(*this) { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0){ _qmf::Package packageInit(agent); mgmtObject = new _qmf::Cluster (agent, this, &broker,name.str(),url.str()); agent->addObject (mgmtObject); - mgmtObject->set_status("JOINING"); + mgmtObject->set_status("JOINING"); - // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined. + // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined. } QPID_LOG(notice, self << " joining cluster " << name.str()); broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); @@ -108,9 +97,6 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } -// FIXME aconway 2008-09-10: call leave from cluster admin command. -// Any other type of exit is caught in disconnect(). -// void Cluster::leave() { QPID_LOG(notice, self << " leaving cluster " << name.str()); cpg.leave(name); @@ -147,6 +133,7 @@ std::vector<Url> Cluster::getUrls() const { } // FIXME aconway 2008-09-15: volatile for locked/unlocked functions. +// Check locking from Handler functions. boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) { Mutex::ScopedLock l(lock); if (id.getMember() == self) @@ -179,24 +166,16 @@ void Cluster::deliver( AMQFrame frame; while (frame.decode(buf)) { QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody()); - if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame)) + if (!handler->invoke(e.getConnectionId().getMember(), frame)) throw Exception(QPID_MSG("Invalid cluster control")); } } - else { - // Process connection controls & data via the connectionEventQueue - // unless we are in the DISCARD state, in which case ignore. - if (state != DISCARD) { - e.setConnection(getConnection(e.getConnectionId())); - connectionEventQueue.push(e); - } - } + else + handler->deliver(e); } catch (const std::exception& e) { - // FIXME aconway 2008-01-30: exception handling. QPID_LOG(critical, "Error in cluster deliver: " << e.what()); - assert(0); - throw; + leave(); } } @@ -208,17 +187,19 @@ void Cluster::connectionEvent(const Event& e) { else { // control AMQFrame frame; while (frame.decode(buf)) - e.getConnection()->deliver(frame); + e.getConnection()->received(frame); } } struct AddrList { const cpg_address* addrs; int count; - AddrList(const cpg_address* a, int n) : addrs(a), count(n) {} + const char* prefix; + AddrList(const cpg_address* a, int n, const char* p=0) : addrs(a), count(n), prefix(p) {} }; ostream& operator<<(ostream& o, const AddrList& a) { + if (a.count && a.prefix) o << a.prefix; for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { const char* reasonString; switch (p->reason) { @@ -252,82 +233,41 @@ void Cluster::configChange( cpg_name */*group*/, cpg_address *current, int nCurrent, cpg_address *left, int nLeft, - cpg_address */*joined*/, int nJoined) + cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node. - QPID_LOG(info, "Current cluster: " << AddrList(current, nCurrent)); - QPID_LOG_IF(info, nLeft, "Left the cluster: " << AddrList(left, nLeft)); + QPID_LOG(debug, "Cluster: " << AddrList(current, nCurrent) << ". " + << AddrList(left, nLeft, "Left: ")); - map.left(left, nLeft); if (find(left, left+nLeft, self) != left+nLeft) { // I have left the group, this is the final config change. QPID_LOG(notice, self << " left cluster " << name.str()); broker.shutdown(); return; } + + map.left(left, nLeft); + handler->configChange(current, nCurrent, left, nLeft, joined, nJoined); - if (state == START) { - if (nCurrent == 1 && *current == self) { // First in cluster. - // First in cluster - QPID_LOG(notice, self << " first in cluster."); - map.add(self, url); - ready(); - } - updateMemberStats(); - return; - } - - if (state == DISCARD && !map.dumper) // try another dump request. - mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0); - - if (nJoined && map.sendUpdate(self)) // New members need update - mcastControl(map.toControl(), 0); - + // FIXME aconway 2008-09-17: management update. //update mgnt stats - updateMemberStats(); + updateMemberStats(); } -void Cluster::update(const FieldTable& members, uint64_t dumper) { +void Cluster::update(const MemberId& id, const framing::FieldTable& members, uint64_t dumper) { Mutex::ScopedLock l(lock); - map.update(members, dumper); - QPID_LOG(debug, "Cluster update: " << map); - if (state == START) state = DISCARD; // Got first update. - if (state == DISCARD && !map.dumper) - mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0); + handler->update(id, members, dumper); } void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) { Mutex::ScopedLock l(lock); - if (map.dumper) return; // Dump already in progress, ignore. - map.dumper = map.first(); - if (dumpee == self && state == DISCARD) { // My turn to receive a dump. - QPID_LOG(info, self << " receiving state dump from " << map.dumper); - // FIXME aconway 2008-09-15: RECEIVE DUMP - // state = CATCHUP; - // stall(); - // When received - mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0); - ready(); - } - else if (map.dumper == self && state == READY) { // My turn to send the dump - QPID_LOG(info, self << " sending state dump to " << dumpee); - // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient. - // state = DUMPING; - // stall(); - (void)urlStr; - // When dump complete: - assert(map.dumper == self); - ClusterUpdateBody b = map.toControl(); - b.setDumper(0); - mcastControl(b, 0); - // NB: Don't modify my own map till self-delivery. - } + handler->dumpRequest(dumpee, urlStr); } void Cluster::ready(const MemberId& member, const std::string& url) { Mutex::ScopedLock l(lock); - map.add(member, Url(url)); + handler->ready(member, url); + // FIXME aconway 2008-09-17: management update. } broker::Broker& Cluster::getBroker(){ return broker; } @@ -341,18 +281,18 @@ void Cluster::stall() { // FIXME aconway 2008-09-11: Flow control, we should slow down or // stop reading from local connections while stalled to avoid an // unbounded queue. - if (mgmtObject!=0) - mgmtObject->set_status("STALLED"); + // if (mgmtObject!=0) + // mgmtObject->set_status("STALLED"); } void Cluster::ready() { // Called with lock held - QPID_LOG(info, self << " ready with URL " << url); - state = READY; + QPID_LOG(info, self << " ready at URL " << url); + mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0); + handler = &memberHandler; // Member mode. connectionEventQueue.start(poller); - // FIXME aconway 2008-09-15: stall/unstall map? - if (mgmtObject!=0) - mgmtObject->set_status("ACTIVE"); + // if (mgmtObject!=0) + // mgmtObject->set_status("ACTIVE"); } // Called from Broker::~Broker when broker is shut down. At this @@ -367,52 +307,53 @@ void Cluster::shutdown() { delete this; } -ManagementObject* Cluster::GetManagementObject(void) const -{ - return (ManagementObject*) mgmtObject; +ManagementObject* Cluster::GetManagementObject(void) const { + return (ManagementObject*) mgmtObject; } -Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&) -{ - Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; - QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); - - switch (methodId) - { - case _qmf::Cluster::METHOD_STOPCLUSTERNODE: - stopClusterNode(); - break; - case _qmf::Cluster::METHOD_STOPFULLCLUSTER: - stopFullCluster(); - break; - } - - return status; +Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&) { + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); + + switch (methodId) + { + case _qmf::Cluster::METHOD_STOPCLUSTERNODE: + stopClusterNode(); + break; + case _qmf::Cluster::METHOD_STOPFULLCLUSTER: + stopFullCluster(); + break; + } + + return status; } void Cluster::stopClusterNode(void) { + // FIXME aconway 2008-09-18: QPID_LOG(notice, self << " disconnected from cluster " << name.str()); broker.shutdown(); } void Cluster::stopFullCluster(void) { + // FIXME aconway 2008-09-17: TODO } void Cluster::updateMemberStats(void) { //update mgnt stats - if (mgmtObject!=0){ - mgmtObject->set_clusterSize(size()); - std::vector<Url> vectUrl = getUrls(); - string urlstr; - for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) { - if (iter != vectUrl.begin()) urlstr += ";"; - urlstr += iter->str(); - } - mgmtObject->set_members(urlstr); - } + // FIXME aconway 2008-09-18: +// if (mgmtObject!=0){ +// mgmtObject->set_clusterSize(size()); +// std::vector<Url> vectUrl = getUrls(); +// string urlstr; +// for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) { +// if (iter != vectUrl.begin()) urlstr += ";"; +// urlstr += iter->str(); +// } +// mgmtObject->set_members(urlstr); +// } } |