#ifndef QPID_CLUSTER_CLUSTER_H #define QPID_CLUSTER_CLUSTER_H /* * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include "ClusterMap.h" #include "ClusterSettings.h" #include "Cpg.h" #include "Decoder.h" #include "ErrorCheck.h" #include "Event.h" #include "EventFrame.h" #include "ExpiryPolicy.h" #include "FailoverExchange.h" #include "LockedConnectionMap.h" #include "Multicaster.h" #include "NoOpConnectionOutputHandler.h" #include "PollableQueue.h" #include "PollerDispatch.h" #include "Quorum.h" #include "qmf/org/apache/qpid/cluster/Cluster.h" #include "qpid/Url.h" #include "qpid/broker/Broker.h" #include "qpid/management/Manageable.h" #include "qpid/sys/Monitor.h" #include #include #include #include #include #include namespace qpid { namespace framing { class AMQBody; class Uuid; } namespace cluster { class Connection; class EventFrame; /** * Connection to the cluster */ class Cluster : private Cpg::Handler, public management::Manageable { public: typedef boost::intrusive_ptr ConnectionPtr; typedef std::vector ConnectionVector; // Public functions are thread safe unless otherwise mentioned in a comment. // Construct the cluster in plugin earlyInitialize. Cluster(const ClusterSettings&, broker::Broker&); virtual ~Cluster(); // Called by plugin initialize: cluster start-up requires transport plugins . // Thread safety: only called by plugin initialize. void initialize(); // Connection map. void addLocalConnection(const ConnectionPtr&); void addShadowConnection(const ConnectionPtr&); void erase(const ConnectionId&); // URLs of current cluster members. std::vector getIds() const; std::vector getUrls() const; boost::shared_ptr getFailoverExchange() const { return failoverExchange; } // Leave the cluster - called when fatal errors occur. void leave(); // Update completed - called in update thread void updateInDone(const ClusterMap&); MemberId getId() const; broker::Broker& getBroker() const; Multicaster& getMulticast() { return mcast; } void checkQuorum(); size_t getReadMax() { return readMax; } size_t getWriteEstimate() { return writeEstimate; } void deliverFrame(const EventFrame&); // Called in deliverFrame thread to indicate an error from the broker. void flagError(Connection&, ErrorCheck::ErrorType); void connectionError(); // Called only during update by Connection::shadowReady Decoder& getDecoder() { return decoder; } ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; } private: typedef sys::Monitor::ScopedLock Lock; typedef PollableQueue PollableEventQueue; typedef PollableQueue PollableFrameQueue; typedef std::map ConnectionMap; // NB: A dummy Lock& parameter marks functions that must only be // called with Cluster::lock locked. void leave(Lock&); std::vector getIds(Lock&) const; std::vector getUrls(Lock&) const; // == Called in main thread from Broker destructor. void brokerShutdown(); // == Called in deliverEventQueue thread void deliveredEvent(const Event&); // == Called in deliverFrameQueue thread void deliveredFrame(const EventFrame&); void processFrame(const EventFrame&, Lock&); // Cluster controls implement XML methods from cluster.xml. void updateRequest(const MemberId&, const std::string&, Lock&); void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); void errorCheck(const MemberId&, uint8_t, uint64_t, Lock&); void shutdown(const MemberId&, Lock&); // Helper functions ConnectionPtr getConnection(const ConnectionId&, Lock&); ConnectionVector getConnections(Lock&); void updateStart(const MemberId& updatee, const Url& url, Lock&); void makeOffer(const MemberId&, Lock&); void setReady(Lock&); void memberUpdate(Lock&); void setClusterId(const framing::Uuid&, Lock&); // == Called in CPG dispatch thread void deliver( // CPG deliver callback. cpg_handle_t /*handle*/, struct cpg_name *group, uint32_t /*nodeid*/, uint32_t /*pid*/, void* /*msg*/, int /*msg_len*/); void deliverEvent(const Event&); void configChange( // CPG config change callback. cpg_handle_t /*handle*/, struct cpg_name */*group*/, struct cpg_address */*members*/, int /*nMembers*/, struct cpg_address */*left*/, int /*nLeft*/, struct cpg_address */*joined*/, int /*nJoined*/ ); // == Called in management threads. virtual qpid::management::ManagementObject* GetManagementObject() const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); void stopClusterNode(Lock&); void stopFullCluster(Lock&); // == Called in connection IO threads . void checkUpdateIn(Lock&); // == Called in UpdateClient thread. void updateOutDone(); void updateOutError(const std::exception&); void updateOutDone(Lock&); // Immutable members set on construction, never changed. ClusterSettings settings; broker::Broker& broker; qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle boost::shared_ptr poller; Cpg cpg; const std::string name; Url myUrl; const MemberId self; const size_t readMax; const size_t writeEstimate; framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; qpid::management::ManagementAgent* mAgent; boost::intrusive_ptr expiryPolicy; // Thread safe members Multicaster mcast; PollerDispatch dispatcher; PollableEventQueue deliverEventQueue; PollableFrameQueue deliverFrameQueue; boost::shared_ptr failoverExchange; Quorum quorum; LockedConnectionMap localConnections; // Used only during initialization bool initialized; // Used only in deliverEventQueue thread or when stalled for update. Decoder decoder; bool discarding; // Remaining members are protected by lock. // TODO aconway 2009-03-06: Most of these members are also only used in // deliverFrameQueue thread or during stall. Review and separate members // that require a lock, drop lock when not needed. mutable sys::Monitor lock; // Local cluster state, cluster map enum { INIT, ///< Initial state, no CPG messages received. JOINER, ///< Sent update request, waiting for update offer. UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete. CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event. READY, ///< Fully operational OFFER, ///< Sent an offer, waiting for accept/reject. UPDATER, ///< Offer accepted, sending a state update. LEFT ///< Final state, left the cluster. } state; ConnectionMap connections; ClusterMap map; ClusterMap::Set elders; size_t lastSize; bool lastBroker; sys::Thread updateThread; boost::optional updatedMap; ErrorCheck error; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; }; }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_CLUSTER_H*/