diff options
author | Alan Conway <aconway@apache.org> | 2008-09-18 13:55:30 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-18 13:55:30 +0000 |
commit | b208766dcaf114eac162d6f230fb05370b01e04b (patch) | |
tree | 3cc58c975cab4072bc7bbf97ac1057c1219c17da /qpid/cpp | |
parent | f5714202a325b2c17c348b336846c2832e54f8ee (diff) | |
download | qpid-python-b208766dcaf114eac162d6f230fb05370b01e04b.tar.gz |
Refactor Cluster logic into separate handlers for Joining & Member modes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@696657 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
20 files changed, 546 insertions, 168 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 443db3fb15..8060e49b97 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -28,7 +28,13 @@ cluster_la_SOURCES = \ qpid/cluster/DumpClient.h \ qpid/cluster/DumpClient.cpp \ qpid/cluster/ClusterMap.h \ - qpid/cluster/ClusterMap.cpp + qpid/cluster/ClusterMap.cpp \ + qpid/cluster/ClusterHandler.h \ + qpid/cluster/ClusterHandler.cpp \ + qpid/cluster/JoiningHandler.h \ + qpid/cluster/JoiningHandler.cpp \ + qpid/cluster/MemberHandler.h \ + qpid/cluster/MemberHandler.cpp cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index e64d80e214..53f0ccc08c 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -23,8 +23,6 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/Connection.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AMQP_AllOperations.h" -#include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterDumpRequestBody.h" #include "qpid/framing/ClusterUpdateBody.h" #include "qpid/framing/ClusterReadyBody.h" @@ -55,17 +53,6 @@ using qpid::management::Manageable; using qpid::management::Args; namespace _qmf = qmf::org::apache::qpid::cluster; -struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { - Cluster& cluster; - MemberId member; - ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {} - bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); } - - void update(const FieldTable& members, uint64_t dumping) { cluster.update(members, dumping); } - void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url); } - void ready(const std::string& url) { cluster.ready(member, url); } -}; - Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : broker(b), poller(b.getPoller()), @@ -79,16 +66,18 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : boost::bind(&Cluster::disconnect, this, _1) // disconnect ), connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))), - state(START) + handler(&joiningHandler), + joiningHandler(*this), + memberHandler(*this) { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0){ _qmf::Package packageInit(agent); mgmtObject = new _qmf::Cluster (agent, this, &broker,name.str(),url.str()); agent->addObject (mgmtObject); - mgmtObject->set_status("JOINING"); + mgmtObject->set_status("JOINING"); - // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined. + // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined. } QPID_LOG(notice, self << " joining cluster " << name.str()); broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); @@ -108,9 +97,6 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } -// FIXME aconway 2008-09-10: call leave from cluster admin command. -// Any other type of exit is caught in disconnect(). -// void Cluster::leave() { QPID_LOG(notice, self << " leaving cluster " << name.str()); cpg.leave(name); @@ -147,6 +133,7 @@ std::vector<Url> Cluster::getUrls() const { } // FIXME aconway 2008-09-15: volatile for locked/unlocked functions. +// Check locking from Handler functions. boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) { Mutex::ScopedLock l(lock); if (id.getMember() == self) @@ -179,24 +166,16 @@ void Cluster::deliver( AMQFrame frame; while (frame.decode(buf)) { QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody()); - if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame)) + if (!handler->invoke(e.getConnectionId().getMember(), frame)) throw Exception(QPID_MSG("Invalid cluster control")); } } - else { - // Process connection controls & data via the connectionEventQueue - // unless we are in the DISCARD state, in which case ignore. - if (state != DISCARD) { - e.setConnection(getConnection(e.getConnectionId())); - connectionEventQueue.push(e); - } - } + else + handler->deliver(e); } catch (const std::exception& e) { - // FIXME aconway 2008-01-30: exception handling. QPID_LOG(critical, "Error in cluster deliver: " << e.what()); - assert(0); - throw; + leave(); } } @@ -208,17 +187,19 @@ void Cluster::connectionEvent(const Event& e) { else { // control AMQFrame frame; while (frame.decode(buf)) - e.getConnection()->deliver(frame); + e.getConnection()->received(frame); } } struct AddrList { const cpg_address* addrs; int count; - AddrList(const cpg_address* a, int n) : addrs(a), count(n) {} + const char* prefix; + AddrList(const cpg_address* a, int n, const char* p=0) : addrs(a), count(n), prefix(p) {} }; ostream& operator<<(ostream& o, const AddrList& a) { + if (a.count && a.prefix) o << a.prefix; for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { const char* reasonString; switch (p->reason) { @@ -252,82 +233,41 @@ void Cluster::configChange( cpg_name */*group*/, cpg_address *current, int nCurrent, cpg_address *left, int nLeft, - cpg_address */*joined*/, int nJoined) + cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node. - QPID_LOG(info, "Current cluster: " << AddrList(current, nCurrent)); - QPID_LOG_IF(info, nLeft, "Left the cluster: " << AddrList(left, nLeft)); + QPID_LOG(debug, "Cluster: " << AddrList(current, nCurrent) << ". " + << AddrList(left, nLeft, "Left: ")); - map.left(left, nLeft); if (find(left, left+nLeft, self) != left+nLeft) { // I have left the group, this is the final config change. QPID_LOG(notice, self << " left cluster " << name.str()); broker.shutdown(); return; } + + map.left(left, nLeft); + handler->configChange(current, nCurrent, left, nLeft, joined, nJoined); - if (state == START) { - if (nCurrent == 1 && *current == self) { // First in cluster. - // First in cluster - QPID_LOG(notice, self << " first in cluster."); - map.add(self, url); - ready(); - } - updateMemberStats(); - return; - } - - if (state == DISCARD && !map.dumper) // try another dump request. - mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0); - - if (nJoined && map.sendUpdate(self)) // New members need update - mcastControl(map.toControl(), 0); - + // FIXME aconway 2008-09-17: management update. //update mgnt stats - updateMemberStats(); + updateMemberStats(); } -void Cluster::update(const FieldTable& members, uint64_t dumper) { +void Cluster::update(const MemberId& id, const framing::FieldTable& members, uint64_t dumper) { Mutex::ScopedLock l(lock); - map.update(members, dumper); - QPID_LOG(debug, "Cluster update: " << map); - if (state == START) state = DISCARD; // Got first update. - if (state == DISCARD && !map.dumper) - mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0); + handler->update(id, members, dumper); } void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) { Mutex::ScopedLock l(lock); - if (map.dumper) return; // Dump already in progress, ignore. - map.dumper = map.first(); - if (dumpee == self && state == DISCARD) { // My turn to receive a dump. - QPID_LOG(info, self << " receiving state dump from " << map.dumper); - // FIXME aconway 2008-09-15: RECEIVE DUMP - // state = CATCHUP; - // stall(); - // When received - mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0); - ready(); - } - else if (map.dumper == self && state == READY) { // My turn to send the dump - QPID_LOG(info, self << " sending state dump to " << dumpee); - // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient. - // state = DUMPING; - // stall(); - (void)urlStr; - // When dump complete: - assert(map.dumper == self); - ClusterUpdateBody b = map.toControl(); - b.setDumper(0); - mcastControl(b, 0); - // NB: Don't modify my own map till self-delivery. - } + handler->dumpRequest(dumpee, urlStr); } void Cluster::ready(const MemberId& member, const std::string& url) { Mutex::ScopedLock l(lock); - map.add(member, Url(url)); + handler->ready(member, url); + // FIXME aconway 2008-09-17: management update. } broker::Broker& Cluster::getBroker(){ return broker; } @@ -341,18 +281,18 @@ void Cluster::stall() { // FIXME aconway 2008-09-11: Flow control, we should slow down or // stop reading from local connections while stalled to avoid an // unbounded queue. - if (mgmtObject!=0) - mgmtObject->set_status("STALLED"); + // if (mgmtObject!=0) + // mgmtObject->set_status("STALLED"); } void Cluster::ready() { // Called with lock held - QPID_LOG(info, self << " ready with URL " << url); - state = READY; + QPID_LOG(info, self << " ready at URL " << url); + mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0); + handler = &memberHandler; // Member mode. connectionEventQueue.start(poller); - // FIXME aconway 2008-09-15: stall/unstall map? - if (mgmtObject!=0) - mgmtObject->set_status("ACTIVE"); + // if (mgmtObject!=0) + // mgmtObject->set_status("ACTIVE"); } // Called from Broker::~Broker when broker is shut down. At this @@ -367,52 +307,53 @@ void Cluster::shutdown() { delete this; } -ManagementObject* Cluster::GetManagementObject(void) const -{ - return (ManagementObject*) mgmtObject; +ManagementObject* Cluster::GetManagementObject(void) const { + return (ManagementObject*) mgmtObject; } -Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&) -{ - Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; - QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); - - switch (methodId) - { - case _qmf::Cluster::METHOD_STOPCLUSTERNODE: - stopClusterNode(); - break; - case _qmf::Cluster::METHOD_STOPFULLCLUSTER: - stopFullCluster(); - break; - } - - return status; +Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&) { + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); + + switch (methodId) + { + case _qmf::Cluster::METHOD_STOPCLUSTERNODE: + stopClusterNode(); + break; + case _qmf::Cluster::METHOD_STOPFULLCLUSTER: + stopFullCluster(); + break; + } + + return status; } void Cluster::stopClusterNode(void) { + // FIXME aconway 2008-09-18: QPID_LOG(notice, self << " disconnected from cluster " << name.str()); broker.shutdown(); } void Cluster::stopFullCluster(void) { + // FIXME aconway 2008-09-17: TODO } void Cluster::updateMemberStats(void) { //update mgnt stats - if (mgmtObject!=0){ - mgmtObject->set_clusterSize(size()); - std::vector<Url> vectUrl = getUrls(); - string urlstr; - for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) { - if (iter != vectUrl.begin()) urlstr += ";"; - urlstr += iter->str(); - } - mgmtObject->set_members(urlstr); - } + // FIXME aconway 2008-09-18: +// if (mgmtObject!=0){ +// mgmtObject->set_clusterSize(size()); +// std::vector<Url> vectUrl = getUrls(); +// string urlstr; +// for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) { +// if (iter != vectUrl.begin()) urlstr += ";"; +// urlstr += iter->str(); +// } +// mgmtObject->set_members(urlstr); +// } } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 847f179cc0..7c4e121a9b 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -23,11 +23,12 @@ #include "Event.h" #include "NoOpConnectionOutputHandler.h" #include "ClusterMap.h" +#include "JoiningHandler.h" +#include "MemberHandler.h" #include "qpid/broker/Broker.h" #include "qpid/sys/PollableQueue.h" #include "qpid/sys/Monitor.h" -#include "qpid/framing/AMQP_AllOperations.h" #include "qpid/Url.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/cluster/Cluster.h" @@ -78,7 +79,7 @@ class Cluster : private Cpg::Handler, public management::Manageable void leave(); // Cluster controls. - void update(const framing::FieldTable& members, uint64_t dumping); + void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping); void dumpRequest(const MemberId&, const std::string& url); void ready(const MemberId&, const std::string& url); @@ -127,8 +128,6 @@ class Cluster : private Cpg::Handler, public management::Manageable /** Callback if CPG fd is disconnected. */ void disconnect(sys::DispatchHandle&); - void handleMethod(MemberId from, cluster::Connection* connection, framing::AMQMethodBody& method); - boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&); virtual qpid::management::ManagementObject* GetManagementObject(void) const; @@ -151,6 +150,14 @@ class Cluster : private Cpg::Handler, public management::Manageable EventQueue connectionEventQueue; State state; qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle + + // Handlers for different states. + ClusterHandler* handler; + JoiningHandler joiningHandler; + MemberHandler memberHandler; + + friend class JoiningHandler; + friend class MemberHandler; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp b/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp new file mode 100644 index 0000000000..648d40470c --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/AllInvoker.h" + +#include "ClusterHandler.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FieldTable.h" + + + +namespace qpid { +namespace cluster { + +struct Operations : public framing::AMQP_AllOperations::ClusterHandler { + qpid::cluster::ClusterHandler& handler; + MemberId member; + Operations(qpid::cluster::ClusterHandler& c, const MemberId& id) : handler(c), member(id) {} + + void update(const framing::FieldTable& members, uint64_t dumping) { handler.update(member, members, dumping); } + void dumpRequest(const std::string& url) { handler.dumpRequest(member, url); } + void ready(const std::string& url) { handler.ready(member, url); } +}; + +ClusterHandler::~ClusterHandler() {} + +ClusterHandler::ClusterHandler(Cluster& c) : cluster (c) {} + +bool ClusterHandler::invoke(const MemberId& id, framing::AMQFrame& frame) { + Operations ops(*this, id); + return framing::invoke(ops, *frame.getBody()).wasHandled(); +} + +}} // namespace qpid::cluster + diff --git a/qpid/cpp/src/qpid/cluster/ClusterHandler.h b/qpid/cpp/src/qpid/cluster/ClusterHandler.h new file mode 100644 index 0000000000..5da5cf5b75 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ClusterHandler.h @@ -0,0 +1,65 @@ +#ifndef QPID_CLUSTER_CLUSTERHANDLER_H +#define QPID_CLUSTER_CLUSTERHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Cpg.h" +#include "types.h" + +namespace qpid { + +namespace framing { class AMQFrame; } + +namespace cluster { + +class Cluster; +class Event; + +/** + * Interface for handing cluster events. + * Implementations provide different behavior for different states of a member.. + */ +class ClusterHandler +{ + public: + ClusterHandler(Cluster& c); + virtual ~ClusterHandler(); + + virtual void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping) = 0; + virtual void dumpRequest(const MemberId&, const std::string& url) = 0; + virtual void ready(const MemberId&, const std::string& url) = 0; + + virtual void deliver(Event& e) = 0; // Deliver a connection event. + + virtual void configChange(cpg_address *current, int nCurrent, + cpg_address *left, int nLeft, + cpg_address *joined, int nJoined) = 0; + + bool invoke(const MemberId&, framing::AMQFrame& f); + + protected: + Cluster& cluster; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CLUSTERHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp index 51e360ad73..e14e35998f 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -53,8 +53,8 @@ framing::ClusterUpdateBody ClusterMap::toControl() const { return b; } -void ClusterMap::update(const FieldTable& ftMembers, uint64_t dumper_) { - FieldTable::ValueMap::const_iterator i; +void ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) { + framing:: FieldTable::ValueMap::const_iterator i; for (i = ftMembers.begin(); i != ftMembers.end(); ++i) members[i->first] = Url(i->second->get<std::string>()); dumper = MemberId(dumper_); @@ -82,8 +82,10 @@ bool ClusterMap::sendUpdate(const MemberId& id) const { return dumper==id || (!dumper && first() == id); } -void ClusterMap::add(const MemberId& id, const Url& url) { +void ClusterMap::ready(const MemberId& id, const Url& url) { members[id] = url; + if (id == dumper) + dumper = MemberId(); } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h index c626c7688d..b00f818f88 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.h +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h @@ -34,8 +34,6 @@ namespace qpid { namespace cluster { -// FIXME aconway 2008-09-15: rename cluster status? - /** * Map of established cluster members and brain-dumps in progress. * A dumper is an established member that is sending catch-up data. @@ -58,8 +56,8 @@ class ClusterMap { /** Convert map contents to a cluster update body. */ framing::ClusterUpdateBody toControl() const; - /** Add a new member. */ - void add(const MemberId& id, const Url& url); + /** Add a new member or dump complete if id == dumper. */ + void ready(const MemberId& id, const Url& url); /** Apply update delivered from clsuter. */ void update(const framing::FieldTable& members, uint64_t dumper); @@ -70,6 +68,7 @@ class ClusterMap { std::vector<Url> memberUrls() const; size_t size() const { return members.size(); } + bool empty() const { return members.empty(); } private: friend std::ostream& operator<<(std::ostream&, const ClusterMap&); diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 6cc21633d3..51da5bef25 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -21,11 +21,9 @@ #include "Connection.h" #include "Cluster.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/Invoker.h" -#include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/log/Statement.h" - +#include "qpid/framing/AllInvoker.h" #include <boost/current_function.hpp> namespace qpid { @@ -47,11 +45,6 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, Connection::~Connection() {} -void Connection::received(framing::AMQFrame& ) { - // FIXME aconway 2008-09-02: not called, codec sends straight to deliver - assert(0); -} - bool Connection::doOutput() { return output.doOutput(); } // Delivery of doOutput allows us to run the real connection doOutput() @@ -62,7 +55,7 @@ void Connection::deliverDoOutput(uint32_t requested) { } // Handle frames delivered from cluster. -void Connection::deliver(framing::AMQFrame& f) { +void Connection::received(framing::AMQFrame& f) { QPID_LOG(trace, "DLVR [" << self << "]: " << f); // Handle connection controls, deliver other frames to connection. if (!framing::invoke(*this, *f.getBody()).wasHandled()) @@ -95,14 +88,13 @@ void Connection::deliverClose () { size_t Connection::decode(const char* buffer, size_t size) { ++mcastSeq; cluster.mcastBuffer(buffer, size, self); - // FIXME aconway 2008-09-01: deserialize? return size; } void Connection::deliverBuffer(Buffer& buf) { ++deliverSeq; while (decoder.decode(buf)) - deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread. + received(decoder.frame); } diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 37ff2ac6b4..d17dc704ed 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -63,7 +63,6 @@ class Connection : Cluster& getCluster() { return cluster; } // self-delivery of multicast data. - void deliver(framing::AMQFrame& f); void deliverClose(); void deliverDoOutput(uint32_t requested); void deliverBuffer(framing::Buffer&); diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp index ed046f2ede..3dfd8ecc38 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -36,9 +36,7 @@ ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& return 0; } -// FIXME aconway 2008-08-27: outbound connections need to be made -// with proper qpid::client code for failover, get rid of this -// broker-side hack. +// Used for outgoing Link connections, we don't care. sys::ConnectionCodec* ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) { return next->create(out, id); @@ -60,7 +58,6 @@ size_t ConnectionCodec::decode(const char* buffer, size_t size) { return interceptor->decode(buffer, size); } -// FIXME aconway 2008-09-02: delegate to interceptor? size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); } bool ConnectionCodec::canEncode() { return codec.canEncode(); } void ConnectionCodec::closed() { codec.closed(); } diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp index 2a77fa437a..9b71e4235d 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.cpp +++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp @@ -101,9 +101,7 @@ bool Cpg::isFlowControlEnabled() { return flowState == CPG_FLOW_CONTROL_ENABLED; } -// TODO aconway 2008-08-07: better handling of flow control. -// Wait for flow control to be disabled. -// FIXME aconway 2008-08-08: does flow control check involve a round-trip? If so maybe remove... +// FIXME aconway 2008-08-07: better handling of cpg flow control, no sleeping. void Cpg::waitForFlowControl() { int delayNs=1000; // one millisecond int tries=8; // double the delay on each try. diff --git a/qpid/cpp/src/qpid/cluster/DumpClient.cpp b/qpid/cpp/src/qpid/cluster/DumpClient.cpp index f20ceb2ab6..f76a55c0d3 100644 --- a/qpid/cpp/src/qpid/cluster/DumpClient.cpp +++ b/qpid/cpp/src/qpid/cluster/DumpClient.cpp @@ -44,36 +44,39 @@ using namespace framing::message; using namespace client; -DumpClient::DumpClient(const Url& url, Broker& b, const boost::function<void(const char*)>& f) - : donor(b), failed(f) +DumpClient::DumpClient(const Url& url, Broker& b, + const boost::function<void()>& ok, + const boost::function<void(const std::exception&)>& fail) + : donor(b), done(ok), failed(fail) { + // FIXME aconway 2008-09-16: Identify as DumpClient connection. connection.open(url); session = connection.newSession(); } -DumpClient::~DumpClient() { - session.close(); - connection.close(); -} +DumpClient::~DumpClient() {} // Catch-up exchange name: an illegal AMQP exchange name to avoid clashes. static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange"; static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); void DumpClient::dump() { - // FIXME aconway 2008-09-08: send cluster map frame first. donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); // Catch-up exchange is used to route messages to the proper queue without modifying routing key. session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true); donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); session.sync(); + session.close(); + // FIXME aconway 2008-09-17: send dump complete indication. + connection.close(); } void DumpClient::run() { try { dump(); - } catch (const Exception& e) { - failed(e.what()); + done(); + } catch (const std::exception& e) { + failed(e); } delete this; } diff --git a/qpid/cpp/src/qpid/cluster/DumpClient.h b/qpid/cpp/src/qpid/cluster/DumpClient.h index 1c49b417d7..83c9ac4076 100644 --- a/qpid/cpp/src/qpid/cluster/DumpClient.h +++ b/qpid/cpp/src/qpid/cluster/DumpClient.h @@ -54,7 +54,10 @@ namespace cluster { */ class DumpClient : public sys::Runnable { public: - DumpClient(const Url& url, broker::Broker& donor, const boost::function<void(const char*)>& onFail); + DumpClient(const Url& url, broker::Broker& donor, + const boost::function<void()>& done, + const boost::function<void(const std::exception&)>& fail); + ~DumpClient(); void dump(); void run(); // Will delete this when finished. @@ -69,7 +72,8 @@ class DumpClient : public sys::Runnable { client::Connection connection; client::AsyncSession session; broker::Broker& donor; - boost::function<void(const char*)> failed; + boost::function<void()> done; + boost::function<void(const std::exception& e)> failed; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp b/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp new file mode 100644 index 0000000000..3358e3404b --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "JoiningHandler.h" +#include "Cluster.h" +#include "qpid/framing/ClusterDumpRequestBody.h" +#include "qpid/framing/ClusterReadyBody.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +using namespace sys; +using namespace framing; + +JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START) {} + +void JoiningHandler::configChange( + cpg_address *current, int nCurrent, + cpg_address */*left*/, int nLeft, + cpg_address */*joined*/, int /*nJoined*/) +{ + if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster. + QPID_LOG(notice, cluster.self << " first in cluster."); + cluster.map.ready(cluster.self, cluster.url); + cluster.ready(); + } +} + +void JoiningHandler::deliver(Event& e) { + // Discard connection events unless we are stalled and getting a dump. + if (state == STALLED) { + e.setConnection(cluster.getConnection(e.getConnectionId())); + cluster.connectionEventQueue.push(e); + } +} + +void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) { + cluster.map.update(members, dumper); + QPID_LOG(debug, "Cluster update: " << cluster.map); + checkDumpRequest(); +} + +void JoiningHandler::checkDumpRequest() { + if (state == START && !cluster.map.dumper) { + cluster.broker.getPort(); // ensure the broker is listening. + state = DUMP_REQUESTED; + cluster.mcastControl(ClusterDumpRequestBody(framing::ProtocolVersion(), cluster.url.str()), 0); + } +} + +void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) { + if (cluster.map.dumper) { // Already a dump in progress. + if (dumpee == cluster.self && state == DUMP_REQUESTED) + state = START; // Need to make another request. + } + else { // Start a new dump + cluster.map.dumper = cluster.map.first(); + if (dumpee == cluster.self) { // My turn + + state = DUMP_COMPLETE; // FIXME aconway 2008-09-18: bypass dump + + QPID_LOG(info, cluster.self << " receiving state dump from " << cluster.map.dumper); + switch (state) { + case START: + case STALLED: + assert(0); break; + + case DUMP_REQUESTED: + state = STALLED; + cluster.stall(); + break; + + // FIXME aconway 2008-09-17: no transition to DUMP_COMPLETE state. + case DUMP_COMPLETE: + cluster.ready(); + break; + } + } + } +} + +void JoiningHandler::ready(const MemberId& id, const std::string& url) { + cluster.map.ready(id, Url(url)); + checkDumpRequest(); +} + + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/JoiningHandler.h b/qpid/cpp/src/qpid/cluster/JoiningHandler.h new file mode 100644 index 0000000000..07a48b8281 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/JoiningHandler.h @@ -0,0 +1,56 @@ +#ifndef QPID_CLUSTER_JOININGHANDLER_H +#define QPID_CLUSTER_JOININGHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ClusterHandler.h" + +namespace qpid { +namespace cluster { + +/** + * Cluster handler for the "joining" phase, before the process is a + * full cluster member. + */ +class JoiningHandler : public ClusterHandler +{ + public: + JoiningHandler(Cluster& c); + + void configChange(struct cpg_address */*members*/, int /*nMembers*/, + struct cpg_address */*left*/, int /*nLeft*/, + struct cpg_address */*joined*/, int /*nJoined*/ + ); + + void deliver(Event& e); + + void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping); + void dumpRequest(const MemberId&, const std::string& url); + void ready(const MemberId&, const std::string& url); + + private: + enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state; + void checkDumpRequest(); + +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_JOININGHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/MemberHandler.cpp b/qpid/cpp/src/qpid/cluster/MemberHandler.cpp new file mode 100644 index 0000000000..e82eaec458 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/MemberHandler.cpp @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MemberHandler.h" +#include "Cluster.h" +#include "DumpClient.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/ClusterUpdateBody.h" + +namespace qpid { +namespace cluster { + +using namespace sys; +using namespace framing; + +MemberHandler::MemberHandler(Cluster& c) : ClusterHandler(c) {} + +void MemberHandler::configChange( + cpg_address */*current*/, int /*nCurrent*/, + cpg_address */*left*/, int /*nLeft*/, + cpg_address */*joined*/, int nJoined) +{ + if (nJoined && cluster.map.sendUpdate(cluster.self)) // New members need update + cluster.mcastControl(cluster.map.toControl(), 0); +} + +void MemberHandler::deliver(Event& e) { + e.setConnection(cluster.getConnection(e.getConnectionId())); + cluster.connectionEventQueue.push(e); +} + +void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {} + +void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlStr) { + if (cluster.map.dumper) return; // dump in progress, ignore request. + + cluster.map.dumper = cluster.map.first(); + if (cluster.map.dumper != cluster.self) return; + + QPID_LOG(info, cluster.self << " sending state dump to " << dumpee); + assert(!cluster.connectionEventQueue.isStopped()); // Not currently stalled. + cluster.stall(); + + cluster.ready(); // FIXME aconway 2008-09-18: Bypass dump + (void)urlStr; +// dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker, +// boost::bind(&MemberHandler::dumpDone, this), +// boost::bind(&MemberHandler::dumpError, this, _1))); +} + +void MemberHandler::ready(const MemberId& id, const std::string& url) { + cluster.map.ready(id, Url(url)); +} + + +void MemberHandler::dumpDone() { + dumpThread.join(); // Clean up. + cluster.ready(); +} + +void MemberHandler::dumpError(const std::exception& e) { + QPID_LOG(error, "Error in state dump from " << cluster.self << ": " << e.what()); + dumpDone(); +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/MemberHandler.h b/qpid/cpp/src/qpid/cluster/MemberHandler.h new file mode 100644 index 0000000000..630500a740 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/MemberHandler.h @@ -0,0 +1,60 @@ +#ifndef QPID_CLUSTER_MEMBERHANDLER_H +#define QPID_CLUSTER_MEMBERHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ClusterHandler.h" +#include "qpid/sys/Thread.h" + +namespace qpid { +namespace cluster { + +/** + * Cluster handler for the "member" phase, before the process is a + * full cluster member. + */ +class MemberHandler : public ClusterHandler +{ + public: + MemberHandler(Cluster& c); + + void configChange( + struct cpg_address */*members*/, int /*nMembers*/, + struct cpg_address */*left*/, int /*nLeft*/, + struct cpg_address */*joined*/, int /*nJoined*/ + ); + + void deliver(Event& e); + + void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping); + void dumpRequest(const MemberId&, const std::string& url); + void ready(const MemberId&, const std::string& url); + + void dumpDone(); + void dumpError(const std::exception&); + + public: + sys::Thread dumpThread; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_MEMBERHANDLER_H*/ + diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index 3212d34775..4ff0a88b11 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -88,7 +88,6 @@ void OutputInterceptor::startDoOutput() { // Send a doOutput request if one is not already in flight. void OutputInterceptor::sendDoOutput() { // Call with lock held. - // FIXME aconway 2008-08-28: used to have || parent.getClosed()) if (!parent.isLocal()) return; doingOutput = true; diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h index ca97c0d8c9..2c326b998f 100644 --- a/qpid/cpp/src/qpid/sys/PollableQueue.h +++ b/qpid/cpp/src/qpid/sys/PollableQueue.h @@ -71,6 +71,9 @@ class PollableQueue { /** Stop polling and wait for the current callback, if any, to complete. */ void stop(); + + /** Are we currently stopped?*/ + bool isStopped() const; private: typedef sys::Monitor::ScopedLock ScopedLock; @@ -78,7 +81,7 @@ class PollableQueue { void dispatch(sys::DispatchHandle&); - sys::Monitor lock; + mutable sys::Monitor lock; Callback callback; PollableCondition condition; sys::DispatchHandle handle; @@ -130,6 +133,11 @@ template <class T> void PollableQueue<T>::stop() { while (dispatching) lock.wait(); } +template <class T> bool PollableQueue<T>::isStopped() const { + ScopedLock l(lock); + return stopped; +} + }} // namespace qpid::sys #endif /*!QPID_SYS_POLLABLEQUEUE_H*/ diff --git a/qpid/cpp/src/tests/DumpClientTest.cpp b/qpid/cpp/src/tests/DumpClientTest.cpp index 27c4174ffe..03cf12aec6 100644 --- a/qpid/cpp/src/tests/DumpClientTest.cpp +++ b/qpid/cpp/src/tests/DumpClientTest.cpp @@ -61,7 +61,7 @@ QPID_AUTO_TEST_CASE(testDumpClientSharedState) { c.connection.close(); } Url url(Url::getIpAddressesUrl(receiver.getPort())); - qpid::cluster::DumpClient dump(url, *donor.broker, 0); + qpid::cluster::DumpClient dump(url, *donor.broker, 0, 0); dump.dump(); { Client r(receiver.getPort()); |