diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.h')
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 146 |
1 files changed, 79 insertions, 67 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 71993bcb12..0df6f0b411 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -25,6 +25,8 @@ #include "BrokerInfo.h" #include "qpid/broker/SemanticState.h" #include "qpid/broker/ConsumerFactory.h" +#include "qpid/broker/QueueObserver.h" +#include <boost/enable_shared_from_this.hpp> #include <iosfwd> namespace qpid { @@ -65,81 +67,91 @@ class Primary; * * ReplicatingSubscription makes calls on QueueGuard, but not vice-versa. */ -class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl +class ReplicatingSubscription : + public broker::SemanticState::ConsumerImpl, + public broker::QueueObserver { - public: - typedef broker::SemanticState::ConsumerImpl ConsumerImpl; - - class Factory : public broker::ConsumerFactory { - public: - Factory(HaBroker& hb) : haBroker(hb) {} - - HaBroker& getHaBroker() const { return haBroker; } - - boost::shared_ptr<broker::SemanticState::ConsumerImpl> create( - broker::SemanticState* parent, - const std::string& name, boost::shared_ptr<broker::Queue> , - bool ack, bool acquire, bool exclusive, const std::string& tag, - const std::string& resumeId, uint64_t resumeTtl, - const framing::FieldTable& arguments); - private: - HaBroker& haBroker; - }; - - // Argument names for consume command. - static const std::string QPID_REPLICATING_SUBSCRIPTION; - static const std::string QPID_BROKER_INFO; - static const std::string QPID_ID_SET; - - ReplicatingSubscription(HaBroker& haBroker, +public: +typedef broker::SemanticState::ConsumerImpl ConsumerImpl; + +class Factory : public broker::ConsumerFactory { +public: +Factory(HaBroker& hb) : haBroker(hb) {} + +HaBroker& getHaBroker() const { return haBroker; } + +boost::shared_ptr<broker::SemanticState::ConsumerImpl> create( +broker::SemanticState* parent, + const std::string& name, boost::shared_ptr<broker::Queue> , + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, + const framing::FieldTable& arguments); +private: +HaBroker& haBroker; +}; + +// Argument names for consume command. +static const std::string QPID_REPLICATING_SUBSCRIPTION; +static const std::string QPID_BROKER_INFO; +static const std::string QPID_ID_SET; +// Replicator types: argument values for QPID_REPLICATING_SUBSCRIPTION argument. +static const std::string QPID_QUEUE_REPLICATOR; +static const std::string QPID_TX_REPLICATOR; + +ReplicatingSubscription(HaBroker& haBroker, broker::SemanticState* parent, const std::string& name, boost::shared_ptr<broker::Queue> , bool ack, bool acquire, bool exclusive, const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); - ~ReplicatingSubscription(); - - - // Consumer overrides. - bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg); - void cancel(); - void acknowledged(const broker::DeliveryRecord&); - bool browseAcquired() const { return true; } - // Hide the "queue deleted" error for a ReplicatingSubscription when a - // queue is deleted, this is normal and not an error. - bool hideDeletedError() { return true; } - // Not counted for auto deletion and immediate message purposes. - bool isCounted() { return false; } - - /** Initialization that must be done separately from construction - * because it requires a shared_ptr to this to exist. - */ - void initialize(); - - BrokerInfo getBrokerInfo() const { return info; } - - /** Skip replicating enqueue of of ids. */ - void addSkip(const ReplicationIdSet& ids); - - protected: - bool doDispatch(); - - private: - class QueueObserver; - friend class QueueObserver; - - std::string logPrefix; - QueuePosition position; - ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. - ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues. - ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged. - bool ready; - bool cancelled; - BrokerInfo info; - boost::shared_ptr<QueueGuard> guard; +~ReplicatingSubscription(); + + +// Consumer overrides. +bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg); +void cancel(); +void acknowledged(const broker::DeliveryRecord&); +bool browseAcquired() const { return true; } +// Hide the "queue deleted" error for a ReplicatingSubscription when a +// queue is deleted, this is normal and not an error. +bool hideDeletedError() { return true; } + +// QueueObserver overrides +void enqueued(const broker::Message&) {} +void dequeued(const broker::Message&); +void acquired(const broker::Message&) {} +void requeued(const broker::Message&) {} + +/** A ReplicatingSubscription is a passive observer, not counted for auto + * deletion and immediate message purposes. + */ +bool isCounted() { return false; } + +/** Initialization that must be done separately from construction + * because it requires a shared_ptr to this to exist. + */ +void initialize(); + +BrokerInfo getBrokerInfo() const { return info; } + +/** Skip replicating enqueue of of ids. */ +void addSkip(const ReplicationIdSet& ids); + +protected: +bool doDispatch(); + +private: +std::string logPrefix; +QueuePosition position; +ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. +ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues. +ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged. +bool ready; +bool cancelled; +BrokerInfo info; +boost::shared_ptr<QueueGuard> guard; HaBroker& haBroker; - boost::shared_ptr<QueueObserver> observer; boost::shared_ptr<Primary> primary; bool isGuarded(sys::Mutex::ScopedLock&); |