diff options
author | Alan Conway <aconway@apache.org> | 2013-12-12 20:09:20 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-12-12 20:09:20 +0000 |
commit | d58196fdcfb959d7722d7b65de12eadc6da2e9b5 (patch) | |
tree | f11405ac525f946fee1a751fa48ee9fcb2853ad2 | |
parent | 32e8ceb9e0cdb7862f4513fb6c5553246e95a8e2 (diff) | |
download | qpid-python-d58196fdcfb959d7722d7b65de12eadc6da2e9b5.tar.gz |
NO-JIRA: Expanded the HA overview writeup, integrated with doxygen docs.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1550507 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/docs/api/doxygen_developer_mainpage.h | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/README.h | 149 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 152 |
3 files changed, 230 insertions, 80 deletions
diff --git a/qpid/cpp/docs/api/doxygen_developer_mainpage.h b/qpid/cpp/docs/api/doxygen_developer_mainpage.h index 107b72260b..81b61ec728 100644 --- a/qpid/cpp/docs/api/doxygen_developer_mainpage.h +++ b/qpid/cpp/docs/api/doxygen_developer_mainpage.h @@ -54,6 +54,11 @@ * <li><p>Subclasses of Queue and Exchange</p></li> * </ul> * + * Add-on modules to the broker: + * + * - \ref ha-module "High Availability" + * + * * * <h2>Starting Points: Client</h2> * @@ -64,7 +69,3 @@ * <p>TBD</p> * */ - - - - diff --git a/qpid/cpp/src/qpid/ha/README.h b/qpid/cpp/src/qpid/ha/README.h new file mode 100644 index 0000000000..9e8eda7e0d --- /dev/null +++ b/qpid/cpp/src/qpid/ha/README.h @@ -0,0 +1,149 @@ +// This header file provides an overview of the ha module for doxygen. + +namespace qpid { +namespace ha { // Auto-links for names in the ha namespace. + +/** \defgroup ha-module + \brief High Availability Module + +This is a brief overview of how HA replication works, intended as a starting +point to understand the code. You should _first_ read the HA chapter of the "C++ +broker book" at http://qpid.apache.org/documentation.html for a general +understanding of the active/passive, hot-standby strategy. + +Class names without a prefix are in the qpid::ha namespace. See the +documentation of individual classes for more detail. + +## Overview of HA classes. ## + +HaBroker is the main container for HA code, it holds a Role which can be Primary +or Backup. It is responsible for implementing QMF management functions. + +Backup implements the back-up Role. It holds a BrokerReplicator which subscribes +to management events on the primary e.g. create/delete queue/exchange/binding. +It uses a ReplicationTest to check if an object is configured for replication. +For replicated objects it creates corresponding broker::Queue, broker::Exchange +etc. on the local broker. + +For replicated queues the BrokerReplicator also creates a QueueReplicator. The +QueueReplicator subscribes to the primary queue with arguments to tell the +primary to use a ReplicatingSubscription. See "Queue Replication" below for +more on the ReplicatingSubscription/QueueReplicator interaction. + +Primary implements the primary Role. For each replicated queue it creates an +IdSetter to set replication IDs on messages delivered to the queue. Each backup +that connects is tracked by a RemoteBackup. + +For each (queue,backup) pair, the Primary creates a ReplicatingSubscription and +a QueueGuard. The QueueGuard delays completion of messages delivered to the +queue until they are replicated to and acknowledged by the backup. + +When the primary fails, one of the backups is promoted by an external cluster +resource manager. The other backups fail-over to the new primary. See "Queue +Fail Over" below. + +## Locating the Primary ## + +Clients connect to the cluster via one of: +- a virtual IP address that is assigned to the primary broker +- a list of addresses for all brokers. + +If the connection fails the client re-tries its address or list of addresses +till it re-connects. Backup brokers have a ConnectionObserver that rejects +client connections by aborting the connection, causing the client to re-try. + +## Message Identifiers ## + +Replication IDs are sequence numbers assigned to a message *before* it is +enqueued. We use the IDs to: +- identify messages to dequeue on a backup. +- remove extra messages from backup on failover. +- avoid downloading messages already on backup on failover. + +Aside: Originally the queue position number was used as the ID, but that was +insufficient for two reasons: +- We sometimes need to identify messages that are not yet enqueued, for example messages in an open transaction. +- We don't want to require maintaining identical message sequences on every broker e.g. so transactions can be committed independently by each broker. + +## At-least-once and delayed completion. ## + +We guarantee no message loss, aka at-least-once delivery. Any message received +_and acknowledged_ by the primary will not be lost. Clients must keep sent +messages until they are acknowledged. In a failure the client re-connects and +re-sends all unacknowledged (or "in-doubt") messages. + +This ensures no messages are lost, but it is possible for in-doubt messages to +be duplicated if the broker did receive them but failed before sending the +acknowledgment. It is up to the application to handle duplicates correctly. + +We implement the broker's side of the bargain using _delayed completion_. When +the primary receives a message it does not complete (acknowledge) the message +until the message is replicated to and acknowledged by all the backups. In the +case of persistent messages, that includes writing the message to persistent +store. + +## Queue Replication ## + +Life-cycle of a message on a replicated queue, on the primary: + +Record: Received for primary queue. +- IdSetter sets a replication ID. + +Enqueue: Published to primary queue. +- QueueGuard delays completion (increments completion count). + +Deliver: Backup is ready to receive data. +- ReplicatingSubscription sends message to backup QueueReplicator + +Acknowledge: Received from backup QueueReplicator. +- ReplicatingSubscription completes (decrements completion count). + +Dequeue: Removed from queue +- QueueGuard completes if not already completed by ReplicatingSubscription. +- ReplicatingSubscription sends dequeued message ID to backup QueueReplicator. + +There is a simple protocol between ReplicatingSubscription and QueueReplicator +(see Event sub-classes) so ReplicatingSubscription can send +- replication IDs of messages as they are replicated. +- replication IDs of messages that have been dequeued on the primary. + +## Queue Failover ## + +On failover, for each queue, the backup gets the QueueSnapshot of messages on +the queue and sends it with the subscription to the primary queue. The primary +responds with its own snapshot. + +Both parties use this information to determine: + +- Messages that are on the backup and don't need to be replicated again. +- Messages that are dequeued on the primary and can be discarded by the backup. +- Queue position for the QueueGuard to start delaying completion. +- Queue position for the ReplicatingSubscription to start replicating from. + +After that handshake queue replication as described above begins. + +## Standalone replication ## + +The HA module supports ad-hoc replication between standalone brokers that are +not in a cluster, using the same basic queue replication techniques. + +## Queue and Backup "ready" ## + +A QueueGuard is set on the queue when a backup first subscribes or when a backup +fails over to a new primary. Messages before the _first guarded position_ cannot +be delayed because they may have already been acknowledged to clients. + +A backup sends a set of acknowledged message ids when subscribing, messages that +are already on the backup and therefore safe. + +A ReplicatingSubscription is _ready_ when all messages are safe or delayed. We +know this is the case when both the following conditions hold: + +- The ReplicatingSubscription has reached the position preceding the first guarded position AND +- All messages prior to the first guarded position are safe. + + +*/ +/** \namespace qpid::ha \brief High Availability \ingroup ha-module */ +}} // namespace qpid::ha + diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 0df6f0b411..28ab98f73b 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -68,89 +68,89 @@ class Primary; * ReplicatingSubscription makes calls on QueueGuard, but not vice-versa. */ class ReplicatingSubscription : - public broker::SemanticState::ConsumerImpl, - public broker::QueueObserver + public broker::SemanticState::ConsumerImpl, + public broker::QueueObserver { -public: -typedef broker::SemanticState::ConsumerImpl ConsumerImpl; - -class Factory : public broker::ConsumerFactory { -public: -Factory(HaBroker& hb) : haBroker(hb) {} - -HaBroker& getHaBroker() const { return haBroker; } - -boost::shared_ptr<broker::SemanticState::ConsumerImpl> create( -broker::SemanticState* parent, - const std::string& name, boost::shared_ptr<broker::Queue> , - bool ack, bool acquire, bool exclusive, const std::string& tag, - const std::string& resumeId, uint64_t resumeTtl, - const framing::FieldTable& arguments); -private: -HaBroker& haBroker; -}; - -// Argument names for consume command. -static const std::string QPID_REPLICATING_SUBSCRIPTION; -static const std::string QPID_BROKER_INFO; -static const std::string QPID_ID_SET; -// Replicator types: argument values for QPID_REPLICATING_SUBSCRIPTION argument. -static const std::string QPID_QUEUE_REPLICATOR; -static const std::string QPID_TX_REPLICATOR; - -ReplicatingSubscription(HaBroker& haBroker, + public: + typedef broker::SemanticState::ConsumerImpl ConsumerImpl; + + class Factory : public broker::ConsumerFactory { + public: + Factory(HaBroker& hb) : haBroker(hb) {} + + HaBroker& getHaBroker() const { return haBroker; } + + boost::shared_ptr<broker::SemanticState::ConsumerImpl> create( + broker::SemanticState* parent, + const std::string& name, boost::shared_ptr<broker::Queue> , + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, + const framing::FieldTable& arguments); + private: + HaBroker& haBroker; + }; + + // Argument names for consume command. + static const std::string QPID_REPLICATING_SUBSCRIPTION; + static const std::string QPID_BROKER_INFO; + static const std::string QPID_ID_SET; + // Replicator types: argument values for QPID_REPLICATING_SUBSCRIPTION argument. + static const std::string QPID_QUEUE_REPLICATOR; + static const std::string QPID_TX_REPLICATOR; + + ReplicatingSubscription(HaBroker& haBroker, broker::SemanticState* parent, const std::string& name, boost::shared_ptr<broker::Queue> , bool ack, bool acquire, bool exclusive, const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); -~ReplicatingSubscription(); - - -// Consumer overrides. -bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg); -void cancel(); -void acknowledged(const broker::DeliveryRecord&); -bool browseAcquired() const { return true; } -// Hide the "queue deleted" error for a ReplicatingSubscription when a -// queue is deleted, this is normal and not an error. -bool hideDeletedError() { return true; } - -// QueueObserver overrides -void enqueued(const broker::Message&) {} -void dequeued(const broker::Message&); -void acquired(const broker::Message&) {} -void requeued(const broker::Message&) {} - -/** A ReplicatingSubscription is a passive observer, not counted for auto - * deletion and immediate message purposes. - */ -bool isCounted() { return false; } - -/** Initialization that must be done separately from construction - * because it requires a shared_ptr to this to exist. - */ -void initialize(); - -BrokerInfo getBrokerInfo() const { return info; } - -/** Skip replicating enqueue of of ids. */ -void addSkip(const ReplicationIdSet& ids); - -protected: -bool doDispatch(); - -private: -std::string logPrefix; -QueuePosition position; -ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. -ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues. -ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged. -bool ready; -bool cancelled; -BrokerInfo info; -boost::shared_ptr<QueueGuard> guard; + ~ReplicatingSubscription(); + + + // Consumer overrides. + bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg); + void cancel(); + void acknowledged(const broker::DeliveryRecord&); + bool browseAcquired() const { return true; } + // Hide the "queue deleted" error for a ReplicatingSubscription when a + // queue is deleted, this is normal and not an error. + bool hideDeletedError() { return true; } + + // QueueObserver overrides + void enqueued(const broker::Message&) {} + void dequeued(const broker::Message&); + void acquired(const broker::Message&) {} + void requeued(const broker::Message&) {} + + /** A ReplicatingSubscription is a passive observer, not counted for auto + * deletion and immediate message purposes. + */ + bool isCounted() { return false; } + + /** Initialization that must be done separately from construction + * because it requires a shared_ptr to this to exist. + */ + void initialize(); + + BrokerInfo getBrokerInfo() const { return info; } + + /** Skip replicating enqueue of of ids. */ + void addSkip(const ReplicationIdSet& ids); + + protected: + bool doDispatch(); + + private: + std::string logPrefix; + QueuePosition position; + ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. + ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues. + ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged. + bool ready; + bool cancelled; + BrokerInfo info; + boost::shared_ptr<QueueGuard> guard; HaBroker& haBroker; boost::shared_ptr<Primary> primary; |