diff options
author | Alan Conway <aconway@apache.org> | 2009-03-05 20:24:41 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-03-05 20:24:41 +0000 |
commit | 5f9b4a56232ad922d3e25a408924cb5bef0036d8 (patch) | |
tree | 60f2884f112278156a00d0dc806246eccfe8d214 /cpp/src | |
parent | 97d5254a47121a42d435e1ca808cb4c56cdbf18f (diff) | |
download | qpid-python-5f9b4a56232ad922d3e25a408924cb5bef0036d8.tar.gz |
Cluster: restore separate event/frame threads.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@750574 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 87 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 62 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ExpiryPolicy.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ExpiryPolicy.h | 3 |
4 files changed, 70 insertions, 89 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 69a63ad83c..8946a71446 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -96,6 +96,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : self(cpg.self()), readMax(settings.readMax), writeEstimate(settings.writeEstimate), + expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), @@ -106,12 +107,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : boost::bind(&Cluster::leave, this), "Error delivering frames", poller), - expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())), - eventId(0), + connections(*this), frameId(0), initialized(false), state(INIT), - connections(*this), + eventId(0), lastSize(0), lastBroker(false) { @@ -156,19 +156,15 @@ void Cluster::initialize() { // Called in connection thread to insert a client connection. void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { - Lock l(lock); connections.insert(c); } // Called in connection thread to insert an updated shadow connection. void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { - Lock l(lock); - assert(state <= UPDATEE); // Only during update. connections.insert(c); } void Cluster::erase(const ConnectionId& id) { - // Called only by Connection::deliverClose in deliver thread with lock held. connections.erase(id); } @@ -225,12 +221,11 @@ void Cluster::deliver( } void Cluster::deliver(const Event& e) { - if (state == LEFT) return; - QPID_LATENCY_INIT(e); deliverEventQueue.push(e); } -// Handler for deliverEventQueue +// Handler for deliverEventQueue. +// This thread executes cluster controls and decodes connection data events. void Cluster::deliveredEvent(const Event& event) { Event e(event); Mutex::ScopedLock l(lock); @@ -246,26 +241,34 @@ void Cluster::deliveredEvent(const Event& event) { throw Exception(QPID_MSG("Invalid cluster control")); } else if (state >= CATCHUP) { // Handle connection frames - if (e.getType() == CONTROL) { + if (e.getType() == CONTROL) connectionFrame(EventFrame(e, e.getFrame())); - } else connections.decode(e, e.getData()); } // Drop connection frames while state < CATCHUP } -// Handler for deliverFrameQueue +void Cluster::connectionFrame(const EventFrame& frame) { + deliverFrameQueue.push(frame); +} + +// Handler for deliverFrameQueue. +// This thread executes connection control and data frames. void Cluster::deliveredFrame(const EventFrame& event) { - Mutex::ScopedLock l(lock); // TODO aconway 2009-03-02: don't need this lock? + // No lock, only use connections, not Cluster state. EventFrame e(event); - assert(!e.isCluster()); // Only connection frames on this queue. - QPID_LOG(trace, *this << " DLVR: " << e); - if (e.type == DATA) // Add cluster-id to to data frames. - e.frame.setClusterId(frameId++); - boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); - if (connection) // Ignore frames to closed local connections. - connection->deliveredFrame(e); + if(!e.frame.getBody()) { // marks the stall point, start the update task. + updateThread=Thread(*updateTask); + } + else { + QPID_LOG(trace, *this << " DLVR: " << e); + if (e.type == DATA) // Add cluster-id to to data frames. + e.frame.setClusterId(frameId++); + boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); + if (connection) + connection->deliveredFrame(e); + } } struct AddrList { @@ -333,7 +336,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& if (state == INIT) { // First configChange if (map.aliveCount() == 1) { - setClusterId(true); + setClusterId(true, l); setReady(l); map = ClusterMap(self, myUrl, true); memberUpdate(l); @@ -358,8 +361,6 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& } } -bool Cluster::isLeader() const { return elders.empty(); } - void Cluster::makeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; @@ -374,11 +375,9 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) { // callbacks will be invoked. // void Cluster::brokerShutdown() { - if (state != LEFT) { - try { cpg.shutdown(); } - catch (const std::exception& e) { - QPID_LOG(error, *this << " shutting down CPG: " << e.what()); - } + try { cpg.shutdown(); } + catch (const std::exception& e) { + QPID_LOG(error, *this << " shutting down CPG: " << e.what()); } delete this; } @@ -401,10 +400,6 @@ void Cluster::stall(Lock&) { // Stop processing the deliveredEventQueue in order to send or // recieve an update. deliverEventQueue.stop(); - - // FIXME aconway 2009-03-04: if frame queue is re-enabled, we must - // also wait for it to be empty before we are stalled, so that - // our local model is up-to-date to give an update. } void Cluster::unstall(Lock&) { @@ -430,7 +425,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu } else if (updatee == self && url) { assert(state == JOINER); - setClusterId(uuid); + setClusterId(uuid, l); state = UPDATEE; QPID_LOG(info, *this << " receiving update from " << updater); stall(l); @@ -444,16 +439,20 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { state = UPDATER; QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url); stall(l); - if (updateThread.id()) updateThread.join(); // Join the previous updatethread. + + if (updateThread.id()) + updateThread.join(); // Join the previous updateThread to avoid leaks. client::ConnectionSettings cs; cs.username = settings.username; cs.password = settings.password; cs.mechanism = settings.mechanism; - updateThread = Thread( - new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(), + updateTask = new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(), boost::bind(&Cluster::updateOutDone, this), boost::bind(&Cluster::updateOutError, this, _1), - cs)); + cs); + // Push an empty frame onto the deliverFrameQueue to mark the stall point. + // The deliverFrameQueue thread will start the update at that point. + deliverFrameQueue.push(EventFrame(EventHeader(), AMQFrame())); } // Called in update thread. @@ -461,6 +460,7 @@ void Cluster::updateInDone(const ClusterMap& m, uint64_t eventId_, uint64_t fram Lock l(lock); updatedMap = m; eventId = eventId_; + // Safe to use frameId here because we are stalled: deliveredFrame cannot be called concurrently. frameId = frameId_; checkUpdateIn(l); } @@ -602,7 +602,7 @@ void Cluster::checkQuorum() { } } -void Cluster::setClusterId(const Uuid& uuid) { +void Cluster::setClusterId(const Uuid& uuid, Lock&) { clusterId = uuid; if (mgmtObject) { stringstream stream; @@ -617,13 +617,4 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { expiryPolicy->deliverExpire(id); } -void Cluster::connectionFrame(const EventFrame& frame) { - // FIXME aconway 2009-03-02: bypass deliverFrameQueue to avoid race condition. - // Measure performance impact & review. - // - // deliverFrameQueue.push(frame); - // - deliveredFrame(frame); -} - }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 898ec2879f..3e25815b8e 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -66,25 +66,27 @@ class Cluster : private Cpg::Handler, public management::Manageable { typedef boost::intrusive_ptr<Connection> ConnectionPtr; typedef std::vector<ConnectionPtr> Connections; - /** Construct the cluster in plugin earlyInitialize */ + // Public functions are thread safe unless otherwise mentioned in a comment. + + // Construct the cluster in plugin earlyInitialize. Cluster(const ClusterSettings&, broker::Broker&); virtual ~Cluster(); - /** Join the cluster in plugin initialize. Requires transport - * plugins to be available.. */ + // Called by plugin initialize: cluster start-up requires transport plugins . + // Thread safety: only called by plugin initialize. void initialize(); - // Connection map - called in connection threads. + // Connection map. void addLocalConnection(const ConnectionPtr&); void addShadowConnection(const ConnectionPtr&); void erase(const ConnectionId&); - // URLs of current cluster members - called in connection threads. + // URLs of current cluster members. std::vector<std::string> getIds() const; std::vector<Url> getUrls() const; boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; } - // Leave the cluster - called in any thread. + // Leave the cluster - called when fatal errors occur. void leave(); // Update completed - called in update thread @@ -94,15 +96,13 @@ class Cluster : private Cpg::Handler, public management::Manageable { broker::Broker& getBroker() const; Multicaster& getMulticast() { return mcast; } - boost::function<bool ()> isQuorate; - void checkQuorum(); // called in connection threads. + void checkQuorum(); size_t getReadMax() { return readMax; } size_t getWriteEstimate() { return writeEstimate; } - bool isLeader() const; // Called in deliver thread. - - // Called by Connection in deliver event thread with decoded connection data frames. + // Process a connection frame. Called by Connection with decoded frames. + // Thread safety: only called in deliverEventQueue thread. void connectionFrame(const EventFrame&); private: @@ -111,11 +111,9 @@ class Cluster : private Cpg::Handler, public management::Manageable { typedef PollableQueue<Event> PollableEventQueue; typedef PollableQueue<EventFrame> PollableFrameQueue; - // NB: The final Lock& parameter on functions below is used to mark functions - // that should only be called by a function that already holds the lock. - // The parameter makes it hard to forget since you have to have an instance of - // a Lock to call the unlocked functions. - + // NB: A dummy Lock& parameter marks functions that must only be + // called with Cluster::lock locked. + void leave(Lock&); std::vector<std::string> getIds(Lock&) const; std::vector<Url> getUrls(Lock&) const; @@ -123,18 +121,19 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Make an offer if we can - called in deliver thread. void makeOffer(const MemberId&, Lock&); - // Called in main thread in ~Broker. + // Called in main thread from Broker destructor. void brokerShutdown(); // Cluster controls implement XML methods from cluster.xml. - // Called in deliveredEvent thread. - // + // Called in deliverEventQueue thread. void updateRequest(const MemberId&, const std::string&, Lock&); void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); void shutdown(const MemberId&, Lock&); + // Helper, called by updateOffer. + void updateStart(const MemberId& updatee, const Url& url, Lock&); // Used by cluster controls. void stall(Lock&); @@ -144,13 +143,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { void deliveredEvent(const Event&); void deliveredFrame(const EventFrame&); - // Helper, called in deliver thread. - void updateStart(const MemberId& updatee, const Url& url, Lock&); - - // Called in event deliver thread to check for update status. - bool isUpdateComplete(const EventFrame&); - bool isUpdateComplete(); - void setReady(Lock&); void deliver( // CPG deliver callback. @@ -186,7 +178,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void updateOutError(const std::exception&); void updateOutDone(Lock&); - void setClusterId(const framing::Uuid&); + void setClusterId(const framing::Uuid&, Lock&); // Immutable members set on construction, never changed. ClusterSettings settings; @@ -202,6 +194,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; qpid::management::ManagementAgent* mAgent; + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; // Thread safe members Multicaster mcast; @@ -210,11 +203,9 @@ class Cluster : private Cpg::Handler, public management::Manageable { PollableFrameQueue deliverFrameQueue; boost::shared_ptr<FailoverExchange> failoverExchange; Quorum quorum; - - // Used only in deliveredFrame thread - ClusterMap::Set elders; - boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - uint64_t eventId; // FIXME aconway 2009-03-04: review use for thread safety frame-q thread re-enabled. + ConnectionMap connections; + + // Used only in deliverFrameQueue thread or in deliverEventQueue thread when stalled. uint64_t frameId; // Used only during initialization @@ -235,15 +226,16 @@ class Cluster : private Cpg::Handler, public management::Manageable { LEFT ///< Final state, left the cluster. } state; - ConnectionMap connections; + uint64_t eventId; ClusterMap map; + ClusterMap::Set elders; size_t lastSize; bool lastBroker; - - // Update related sys::Thread updateThread; + sys::Runnable* updateTask; boost::optional<ClusterMap> updatedMap; + friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; }; diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp index 690acfc3ad..cc451bf661 100644 --- a/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -30,8 +30,8 @@ namespace qpid { namespace cluster { -ExpiryPolicy::ExpiryPolicy(const boost::function<bool()> & f, Multicaster& m, const MemberId& id, broker::Timer& t) - : expiredPolicy(new Expired), isLeader(f), mcast(m), memberId(id), timer(t) {} +ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t) + : expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {} namespace { uint64_t clusterId(const broker::Message& m) { @@ -65,8 +65,7 @@ bool ExpiryPolicy::hasExpired(broker::Message& m) { void ExpiryPolicy::sendExpire(uint64_t id) { sys::Mutex::ScopedLock l(lock); - if (isLeader()) - mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); + mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); } void ExpiryPolicy::deliverExpire(uint64_t id) { diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.h b/cpp/src/qpid/cluster/ExpiryPolicy.h index 7fb63c731e..b58946b8f7 100644 --- a/cpp/src/qpid/cluster/ExpiryPolicy.h +++ b/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -42,7 +42,7 @@ class Multicaster; class ExpiryPolicy : public broker::ExpiryPolicy { public: - ExpiryPolicy(const boost::function<bool()> & isLeader, Multicaster&, const MemberId&, broker::Timer&); + ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&); void willExpire(broker::Message&); @@ -65,7 +65,6 @@ class ExpiryPolicy : public broker::ExpiryPolicy IdSet expired; boost::intrusive_ptr<Expired> expiredPolicy; - boost::function<bool()> isLeader; Multicaster& mcast; MemberId memberId; broker::Timer& timer; |