diff options
author | Alan Conway <aconway@apache.org> | 2008-12-11 22:50:02 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-12-11 22:50:02 +0000 |
commit | e1d0293e7944c73995b52e2352d60ae8ba9ebc3d (patch) | |
tree | 2b14f861d8b6c46776cb00e7d1925343b740c88b /cpp/src | |
parent | cc781622299a4de5af2fdde6bfc1e2eb42e1623a (diff) | |
download | qpid-python-e1d0293e7944c73995b52e2352d60ae8ba9ebc3d.tar.gz |
cluster: refactor multicast concerns into separate Multicaster class with separate locking.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@725853 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/cluster.mk | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 72 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 62 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterLeaveException.h | 35 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 81 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.h | 69 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 4 |
9 files changed, 237 insertions, 94 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index fe2342a416..5c44fa7c76 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -46,6 +46,9 @@ cluster_la_SOURCES = \ qpid/cluster/ClusterMap.cpp \ qpid/cluster/FailoverExchange.h \ qpid/cluster/FailoverExchange.cpp \ + qpid/cluster/Multicaster.h \ + qpid/cluster/Multicaster.cpp \ + qpid/cluster/ClusterLeaveException.h \ qpid/cluster/Quorum.h cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 222aa07548..aac5bc1dd8 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -90,20 +90,19 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b name(name_), myUrl(url_), myId(cpg.self()), + readMax(readMax_), cpgDispatchHandle( cpg, boost::bind(&Cluster::dispatch, this, _1), // read 0, // write boost::bind(&Cluster::disconnect, this, _1) // disconnect ), - deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller), - mcastQueue(boost::bind(&Cluster::sendMcast, this, _1), poller), - mcastId(0), + mcast(cpg, poller), mgmtObject(0), + deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller), state(INIT), lastSize(0), - lastBroker(false), - readMax(readMax_) + lastBroker(false) { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0){ @@ -116,7 +115,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b failoverExchange.reset(new FailoverExchange(this)); cpgDispatchHandle.startWatch(poller); deliverQueue.start(); - mcastQueue.start(); QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); if (quorum_) quorum.init(); cpg.join(name); @@ -135,49 +133,6 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } -void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq) { - Event e(Event::control(body, id, seq)); - QPID_LOG(trace, *this << " MCAST " << e << ": " << body); - mcast(e); -} - -void Cluster::mcastControl(const framing::AMQBody& body) { - Event e(Event::control(body, ConnectionId(myId,0), ++mcastId)); - QPID_LOG(trace, *this << " MCAST " << e << ": " << body); - mcast(e); -} - -void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id) { - Event e(DATA, connection, size, id); - memcpy(e.getData(), data, size); - { - Lock l(lock); - if (state <= CATCHUP && e.isConnection()) { - // Stall outgoing connection events untill we are fully READY - QPID_LOG(trace, *this << " MCAST deferred: " << e ); - mcastStallQueue.push_back(e); - return; - } - } - QPID_LOG(trace, *this << " MCAST " << e); - mcast(e); -} - -void Cluster::mcast(const Event& e) { mcastQueue.push(e); } - -void Cluster::sendMcast(PollableEventQueue::Queue& values) { - try { - PollableEventQueue::Queue::iterator i = values.begin(); - while (i != values.end() && i->mcast(cpg)) - ++i; - values.erase(values.begin(), i); - } - catch (const std::exception& e) { - QPID_LOG(critical, "Multicast failure: " << e.what()); - leave(); - } -} - std::vector<Url> Cluster::getUrls() const { Lock l(lock); return getUrls(l); @@ -315,7 +270,6 @@ ostream& operator<<(ostream& o, const AddrList& a) { void Cluster::dispatch(sys::DispatchHandle& h) { try { cpg.dispatchAll(); - mcastQueue.start(); // In case it was stopped by flow control. h.rewatch(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what()); @@ -361,7 +315,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& if (state == INIT) { // First configChange if (map.aliveCount() == 1) { setClusterId(true); + // FIXME aconway 2008-12-11: Centralize transition to READY and associated actions eg mcast.release() state = READY; + mcast.release(); QPID_LOG(notice, *this << " first in cluster"); if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); map = ClusterMap(myId, myUrl, true); @@ -370,7 +326,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& else { // Joining established group. state = NEWBIE; QPID_LOG(info, *this << " joining cluster: " << map); - mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str())); + mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId); } } else if (state >= READY && memberChange) @@ -384,7 +340,7 @@ void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isNewbie(id)) { state = OFFER; QPID_LOG(info, *this << " send dump-offer to " << id); - mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId)); + mcast.mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), myId); } } @@ -414,10 +370,10 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { memberUpdate(l); if (state == CATCHUP && id == myId) { state = READY; + mcast.release(); QPID_LOG(notice, *this << " caught up, active cluster member"); if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); - for_each(mcastStallQueue.begin(), mcastStallQueue.end(), boost::bind(&Cluster::mcast, this, _1)); - mcastStallQueue.clear(); + mcast.release(); } } @@ -432,6 +388,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& } else { // Another offer was first. state = READY; + mcast.release(); QPID_LOG(info, *this << " cancelled dump offer to " << dumpee); tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer. } @@ -461,6 +418,7 @@ void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) { boost::bind(&Cluster::dumpOutError, this, _1))); } +// Called in dump thread. void Cluster::dumpInDone(const ClusterMap& m) { Lock l(lock); dumpedMap = m; @@ -471,8 +429,7 @@ void Cluster::checkDumpIn(Lock& ) { if (state == LEFT) return; if (state == DUMPEE && dumpedMap) { map = *dumpedMap; - mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str())); - // Don't flush the mcast queue till we are READY, on self-deliver. + mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); state = CATCHUP; QPID_LOG(info, *this << " received dump, starting catch-up"); deliverQueue.start(); @@ -487,6 +444,7 @@ void Cluster::dumpOutDone() { void Cluster::dumpOutDone(Lock& l) { assert(state == DUMPER); state = READY; + mcast.release(); QPID_LOG(info, *this << " sent dump"); deliverQueue.start(); tryMakeOffer(map.firstNewbie(), l); // Try another offer @@ -523,7 +481,7 @@ void Cluster::stopClusterNode(Lock& l) { void Cluster::stopFullCluster(Lock& ) { QPID_LOG(notice, *this << " shutting down cluster " << name); - mcastControl(ClusterShutdownBody()); + mcast.mcastControl(ClusterShutdownBody(), myId); } void Cluster::memberUpdate(Lock& l) { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index feeb68fd4b..f962f4c72f 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -26,6 +26,7 @@ #include "ConnectionMap.h" #include "FailoverExchange.h" #include "Quorum.h" +#include "Multicaster.h" #include "qpid/broker/Broker.h" #include "qpid/sys/PollableQueue.h" @@ -55,8 +56,10 @@ namespace cluster { class Connection; /** - * Connection to the cluster. + * Connection to the cluster * + * Threading notes: 3 thread categories: connection, deliver, dump. + * */ class Cluster : private Cpg::Handler, public management::Manageable { public: @@ -70,29 +73,26 @@ class Cluster : private Cpg::Handler, public management::Manageable { virtual ~Cluster(); - // Connection map + // Connection map - called in connection threads. void insert(const ConnectionPtr&); void erase(ConnectionId); - // Send to the cluster - void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&, uint32_t id); - void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id); - - // URLs of current cluster members. + // URLs of current cluster members - called in connection threads. std::vector<Url> getUrls() const; boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; } - // Leave the cluster + // Leave the cluster - called in any thread. void leave(); - // Dump completedJo + // Dump completed - called in dump thread void dumpInDone(const ClusterMap&); MemberId getId() const; broker::Broker& getBroker() const; + Multicaster& getMulticast() { return mcast; } boost::function<bool ()> isQuorate; - void checkQuorum(); + void checkQuorum(); // called in connection threads. size_t getReadMax() { return readMax; } @@ -109,22 +109,17 @@ class Cluster : private Cpg::Handler, public management::Manageable { // The parameter makes it hard to forget since you have to have an instance of // a Lock to call the unlocked functions. - void mcastControl(const framing::AMQBody& controlBody); - void mcast(const Event& e); - void leave(Lock&); std::vector<Url> getUrls(Lock&) const; - void sendMcast(PollableEventQueue::Queue& ); - - // Called via CPG, deliverQueue or DumpClient threads. + // Make an offer if we can - called in deliver thread. void tryMakeOffer(const MemberId&, Lock&); // Called in main thread in ~Broker. void brokerShutdown(); // Cluster controls implement XML methods from cluster.xml. - // May be called in CPG thread via deliver() OR in deliverQueue thread. + // Called in deliver thread. // void dumpRequest(const MemberId&, const std::string&, Lock&); void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&, Lock&); @@ -134,6 +129,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void delivered(PollableEventQueue::Queue&); // deliverQueue callback void deliveredEvent(const Event&); + // Helper, called in deliver thread. void dumpStart(const MemberId& dumpee, const Url& url, Lock&); // CPG callbacks, called in CPG IO thread. @@ -177,25 +173,31 @@ class Cluster : private Cpg::Handler, public management::Manageable { void setClusterId(const framing::Uuid&); - mutable sys::Monitor lock; - + // Immutable members set on construction, never changed. broker::Broker& broker; boost::shared_ptr<sys::Poller> poller; Cpg cpg; const std::string name; const Url myUrl; const MemberId myId; - - ConnectionMap connections; + const size_t readMax; + framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; - PollableEventQueue deliverQueue, mcastQueue; - PlainEventQueue mcastStallQueue; - uint32_t mcastId; - framing::Uuid clusterId; + + // Thread safe members + Multicaster mcast; qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle + PollableEventQueue deliverQueue; + ConnectionMap connections; + boost::shared_ptr<FailoverExchange> failoverExchange; + Quorum quorum; + + // Remaining members are protected by lock. + mutable sys::Monitor lock; + // Local cluster state, cluster map enum { INIT, ///< Initial state, no CPG messages received. NEWBIE, ///< Sent dump request, waiting for dump offer. @@ -206,17 +208,13 @@ class Cluster : private Cpg::Handler, public management::Manageable { DUMPER, ///< Offer accepted, sending a state dump. LEFT ///< Final state, left the cluster. } state; - ClusterMap map; - sys::Thread dumpThread; - boost::optional<ClusterMap> dumpedMap; - size_t lastSize; bool lastBroker; - boost::shared_ptr<FailoverExchange> failoverExchange; - Quorum quorum; - size_t readMax; + // Dump related + sys::Thread dumpThread; + boost::optional<ClusterMap> dumpedMap; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; diff --git a/cpp/src/qpid/cluster/ClusterLeaveException.h b/cpp/src/qpid/cluster/ClusterLeaveException.h new file mode 100644 index 0000000000..e5bdbc560a --- /dev/null +++ b/cpp/src/qpid/cluster/ClusterLeaveException.h @@ -0,0 +1,35 @@ +#ifndef QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H +#define QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/Exception.h" + +namespace qpid { +namespace cluster { + +struct ClusterLeaveException : public Exception +{ + ClusterLeaveException(const std::string& message=std::string()) : Exception(message) {} +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H*/ diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index a422164c81..f0d38bf299 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -163,7 +163,7 @@ void Connection::closed() { // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. output.setOutputHandler(discardHandler); - cluster.mcastControl(ClusterConnectionDeliverCloseBody(), self, ++mcastSeq); + cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self); } } catch (const std::exception& e) { @@ -193,7 +193,7 @@ size_t Connection::decode(const char* buffer, size_t size) { } else { // Multicast local connections. assert(isLocal()); - cluster.mcastBuffer(buffer, size, self, ++mcastSeq); + cluster.getMulticast().mcastBuffer(buffer, size, self); } return size; } diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index ce80f42414..29e42ce534 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -163,7 +163,6 @@ class Connection : framing::FrameDecoder localDecoder; framing::FrameDecoder mcastDecoder; broker::Connection connection; - framing::SequenceNumber mcastSeq; framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp new file mode 100644 index 0000000000..896f7c6a6e --- /dev/null +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Multicaster.h" +#include "Cpg.h" +#include "ClusterLeaveException.h" +#include "qpid/log/Statement.h" + + +namespace qpid { +namespace cluster { + +Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller) : + cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), + holding(true) +{ + queue.start(); +} + +void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) { + mcast(Event::control(body, id)); +} + +void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) { + Event e(DATA, id, size); + memcpy(e.getData(), data, size); + mcast(e); +} + +void Multicaster::mcast(const Event& e) { + { + sys::Mutex::ScopedLock l(lock); + if (e.getType() == DATA && e.isConnection() && holding) { + holdingQueue.push_back(e); + QPID_LOG(trace, " MCAST held: " << e ); + return; + } + } + queue.push(e); +} + +void Multicaster::sendMcast(PollableEventQueue::Queue& values) { + try { + PollableEventQueue::Queue::iterator i = values.begin(); + while (i != values.end() && i->mcast(cpg)) { + QPID_LOG(trace, " MCAST " << *i); + ++i; + } + values.erase(values.begin(), i); + } + catch (const std::exception& e) { + throw ClusterLeaveException(e.what()); + } +} + +void Multicaster::release() { + sys::Mutex::ScopedLock l(lock); + holding = false; + std::for_each(holdingQueue.begin(), holdingQueue.end(), boost::bind(&Multicaster::mcast, this, _1)); + holdingQueue.clear(); +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h new file mode 100644 index 0000000000..e7aff7fe7c --- /dev/null +++ b/cpp/src/qpid/cluster/Multicaster.h @@ -0,0 +1,69 @@ +#ifndef QPID_CLUSTER_MULTICASTER_H +#define QPID_CLUSTER_MULTICASTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "Event.h" +#include "qpid/sys/PollableQueue.h" +#include "qpid/sys/Mutex.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { + +namespace sys { +class Poller; +} + +namespace cluster { + +class Cpg; + +/** + * Multicast to the cluster. Shared, thread safe object. + */ +class Multicaster +{ + public: + /** Starts in holding mode: connection data events are held, other events are mcast */ + Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& ); + void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&); + void mcastBuffer(const char*, size_t, const ConnectionId&); + void mcast(const Event& e); + /** End holding mode, held events are mcast */ + void release(); + + private: + typedef sys::PollableQueue<Event> PollableEventQueue; + typedef std::deque<Event> PlainEventQueue; + + void sendMcast(PollableEventQueue::Queue& ); + + sys::Mutex lock; + Cpg& cpg; + PollableEventQueue queue; + bool holding; + PlainEventQueue holdingQueue; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_MULTICASTER_H*/ diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index 0385ffc04e..ae2a040ef3 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -99,8 +99,8 @@ void OutputInterceptor::sendDoOutput() { // Send it anyway to keep the doOutput chain going until we are sure there's no more output // (in deliverDoOutput) // - // FIXME aconway 2008-10-16: use ++parent.mcastSeq as sequence no,not 0 - parent.getCluster().mcastControl(ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), request), parent.getId(), 0); + parent.getCluster().getMulticast().mcastControl( + ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), request), parent.getId()); QPID_LOG(trace, parent << "Send doOutput request for " << request); } |