diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.h')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 308 |
1 files changed, 308 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h new file mode 100644 index 0000000000..78d325cdf9 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -0,0 +1,308 @@ +#ifndef QPID_CLUSTER_CLUSTER_H +#define QPID_CLUSTER_CLUSTER_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "ClusterMap.h" +#include "ClusterSettings.h" +#include "Cpg.h" +#include "Decoder.h" +#include "ErrorCheck.h" +#include "Event.h" +#include "EventFrame.h" +#include "ExpiryPolicy.h" +#include "FailoverExchange.h" +#include "InitialStatusMap.h" +#include "LockedConnectionMap.h" +#include "Multicaster.h" +#include "NoOpConnectionOutputHandler.h" +#include "PollableQueue.h" +#include "PollerDispatch.h" +#include "Quorum.h" +#include "StoreStatus.h" +#include "UpdateReceiver.h" + +#include "qmf/org/apache/qpid/cluster/Cluster.h" +#include "qpid/Url.h" +#include "qpid/broker/Broker.h" +#include "qpid/management/Manageable.h" +#include "qpid/sys/Monitor.h" + +#include <boost/bind.hpp> +#include <boost/intrusive_ptr.hpp> +#include <boost/optional.hpp> + +#include <algorithm> +#include <map> +#include <vector> + +namespace qpid { + +namespace broker { +class Message; +} + +namespace framing { +class AMQBody; +struct Uuid; +} + +namespace cluster { + +class Connection; +struct EventFrame; +class ClusterTimer; +class UpdateDataExchange; + +/** + * Connection to the cluster + */ +class Cluster : private Cpg::Handler, public management::Manageable { + public: + typedef boost::intrusive_ptr<Connection> ConnectionPtr; + typedef std::vector<ConnectionPtr> ConnectionVector; + + // Public functions are thread safe unless otherwise mentioned in a comment. + + // Construct the cluster in plugin earlyInitialize. + Cluster(const ClusterSettings&, broker::Broker&); + virtual ~Cluster(); + + // Called by plugin initialize: cluster start-up requires transport plugins . + // Thread safety: only called by plugin initialize. + void initialize(); + + // Connection map. + void addLocalConnection(const ConnectionPtr&); + void addShadowConnection(const ConnectionPtr&); + void erase(const ConnectionId&); + + // URLs of current cluster members. + std::vector<std::string> getIds() const; + std::vector<Url> getUrls() const; + boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; } + + // Leave the cluster - called when fatal errors occur. + void leave(); + + // Update completed - called in update thread + void updateInClosed(); + void updateInDone(const ClusterMap&); + void updateInRetracted(); + // True if we are expecting to receive catch-up connections. + bool isExpectingUpdate(); + + MemberId getId() const; + broker::Broker& getBroker() const; + Multicaster& getMulticast() { return mcast; } + + const ClusterSettings& getSettings() const { return settings; } + + void deliverFrame(const EventFrame&); + + // Called in deliverFrame thread to indicate an error from the broker. + void flagError(Connection&, ErrorCheck::ErrorType, const std::string& msg); + + // Called only during update by Connection::shadowReady + Decoder& getDecoder() { return decoder; } + + ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; } + + UpdateReceiver& getUpdateReceiver() { return updateReceiver; } + + bool isElder() const; + + // Generates a log message for debugging purposes. + std::string debugSnapshot(); + + // Defer messages delivered in an unsafe context by multicasting. + bool deferDeliveryImpl(const std::string& queue, + const boost::intrusive_ptr<broker::Message>& msg); + + private: + typedef sys::Monitor::ScopedLock Lock; + + typedef PollableQueue<Event> PollableEventQueue; + typedef PollableQueue<EventFrame> PollableFrameQueue; + typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap; + + /** Version number of the cluster protocol, to avoid mixed versions. */ + static const uint32_t CLUSTER_VERSION; + + // NB: A dummy Lock& parameter marks functions that must only be + // called with Cluster::lock locked. + + void leave(Lock&); + std::vector<std::string> getIds(Lock&) const; + std::vector<Url> getUrls(Lock&) const; + + // == Called in main thread from Broker destructor. + void brokerShutdown(); + + // == Called in deliverEventQueue thread + void deliveredEvent(const Event&); + + // == Called in deliverFrameQueue thread + void deliveredFrame(const EventFrame&); + void processFrame(const EventFrame&, Lock&); + + // Cluster controls implement XML methods from cluster.xml. + void updateRequest(const MemberId&, const std::string&, Lock&); + void updateOffer(const MemberId& updater, uint64_t updatee, Lock&); + void retractOffer(const MemberId& updater, uint64_t updatee, Lock&); + void initialStatus(const MemberId&, + uint32_t version, + bool active, + const framing::Uuid& clusterId, + framing::cluster::StoreState, + const framing::Uuid& shutdownId, + const std::string& firstConfig, + Lock&); + void ready(const MemberId&, const std::string&, Lock&); + void configChange(const MemberId&, + const std::string& members, + const std::string& left, + const std::string& joined, + Lock& l); + void messageExpired(const MemberId&, uint64_t, Lock& l); + void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); + void timerWakeup(const MemberId&, const std::string& name, Lock&); + void timerDrop(const MemberId&, const std::string& name, Lock&); + void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); + void deliverToQueue(const std::string& queue, const std::string& message, Lock&); + + // Helper functions + ConnectionPtr getConnection(const EventFrame&, Lock&); + ConnectionVector getConnections(Lock&); + void updateStart(const MemberId& updatee, const Url& url, Lock&); + void makeOffer(const MemberId&, Lock&); + void setReady(Lock&); + void memberUpdate(Lock&); + void setClusterId(const framing::Uuid&, Lock&); + void erase(const ConnectionId&, Lock&); + void requestUpdate(Lock& ); + void initMapCompleted(Lock&); + void becomeElder(Lock&); + void setMgmtStatus(Lock&); + void updateMgmtMembership(Lock&); + + // == Called in CPG dispatch thread + void deliver( // CPG deliver callback. + cpg_handle_t /*handle*/, + const struct cpg_name *group, + uint32_t /*nodeid*/, + uint32_t /*pid*/, + void* /*msg*/, + int /*msg_len*/); + + void deliverEvent(const Event&); + + void configChange( // CPG config change callback. + cpg_handle_t /*handle*/, + const struct cpg_name */*group*/, + const struct cpg_address */*members*/, int /*nMembers*/, + const struct cpg_address */*left*/, int /*nLeft*/, + const struct cpg_address */*joined*/, int /*nJoined*/ + ); + + // == Called in management threads. + virtual qpid::management::ManagementObject* GetManagementObject() const; + virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); + + void stopClusterNode(Lock&); + void stopFullCluster(Lock&); + + // == Called in connection IO threads . + void checkUpdateIn(Lock&); + + // == Called in UpdateClient thread. + void updateOutDone(); + void updateOutError(const std::exception&); + void updateOutDone(Lock&); + + // Immutable members set on construction, never changed. + const ClusterSettings settings; + broker::Broker& broker; + qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle + boost::shared_ptr<sys::Poller> poller; + Cpg cpg; + const std::string name; + Url myUrl; + const MemberId self; + framing::Uuid clusterId; + NoOpConnectionOutputHandler shadowOut; + qpid::management::ManagementAgent* mAgent; + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; + + // Thread safe members + Multicaster mcast; + PollerDispatch dispatcher; + PollableEventQueue deliverEventQueue; + PollableFrameQueue deliverFrameQueue; + boost::shared_ptr<FailoverExchange> failoverExchange; + boost::shared_ptr<UpdateDataExchange> updateDataExchange; + Quorum quorum; + LockedConnectionMap localConnections; + + // Used only in deliverEventQueue thread or when stalled for update. + Decoder decoder; + bool discarding; + + + // Remaining members are protected by lock. + mutable sys::Monitor lock; + + + // Local cluster state, cluster map + enum { + PRE_INIT,///< Have not yet received complete initial status map. + INIT, ///< Waiting to reach cluster-size. + JOINER, ///< Sent update request, waiting for update offer. + UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete. + CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event. + READY, ///< Fully operational + OFFER, ///< Sent an offer, waiting for accept/reject. + UPDATER, ///< Offer accepted, sending a state update. + LEFT ///< Final state, left the cluster. + } state; + + ConnectionMap connections; + InitialStatusMap initMap; + StoreStatus store; + ClusterMap map; + MemberSet elders; + bool elder; + size_t lastAliveCount; + bool lastBroker; + sys::Thread updateThread; + boost::optional<ClusterMap> updatedMap; + bool updateRetracted, updateClosed; + ErrorCheck error; + UpdateReceiver updateReceiver; + ClusterTimer* timer; + + friend std::ostream& operator<<(std::ostream&, const Cluster&); + friend struct ClusterDispatcher; +}; + +}} // namespace qpid::cluster + + + +#endif /*!QPID_CLUSTER_CLUSTER_H*/ |