summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.h')
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h146
1 files changed, 79 insertions, 67 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 71993bcb12..0df6f0b411 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -25,6 +25,8 @@
#include "BrokerInfo.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/ConsumerFactory.h"
+#include "qpid/broker/QueueObserver.h"
+#include <boost/enable_shared_from_this.hpp>
#include <iosfwd>
namespace qpid {
@@ -65,81 +67,91 @@ class Primary;
*
* ReplicatingSubscription makes calls on QueueGuard, but not vice-versa.
*/
-class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
+class ReplicatingSubscription :
+ public broker::SemanticState::ConsumerImpl,
+ public broker::QueueObserver
{
- public:
- typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
-
- class Factory : public broker::ConsumerFactory {
- public:
- Factory(HaBroker& hb) : haBroker(hb) {}
-
- HaBroker& getHaBroker() const { return haBroker; }
-
- boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
- broker::SemanticState* parent,
- const std::string& name, boost::shared_ptr<broker::Queue> ,
- bool ack, bool acquire, bool exclusive, const std::string& tag,
- const std::string& resumeId, uint64_t resumeTtl,
- const framing::FieldTable& arguments);
- private:
- HaBroker& haBroker;
- };
-
- // Argument names for consume command.
- static const std::string QPID_REPLICATING_SUBSCRIPTION;
- static const std::string QPID_BROKER_INFO;
- static const std::string QPID_ID_SET;
-
- ReplicatingSubscription(HaBroker& haBroker,
+public:
+typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
+
+class Factory : public broker::ConsumerFactory {
+public:
+Factory(HaBroker& hb) : haBroker(hb) {}
+
+HaBroker& getHaBroker() const { return haBroker; }
+
+boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
+broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+private:
+HaBroker& haBroker;
+};
+
+// Argument names for consume command.
+static const std::string QPID_REPLICATING_SUBSCRIPTION;
+static const std::string QPID_BROKER_INFO;
+static const std::string QPID_ID_SET;
+// Replicator types: argument values for QPID_REPLICATING_SUBSCRIPTION argument.
+static const std::string QPID_QUEUE_REPLICATOR;
+static const std::string QPID_TX_REPLICATOR;
+
+ReplicatingSubscription(HaBroker& haBroker,
broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
const std::string& resumeId, uint64_t resumeTtl,
const framing::FieldTable& arguments);
- ~ReplicatingSubscription();
-
-
- // Consumer overrides.
- bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
- void cancel();
- void acknowledged(const broker::DeliveryRecord&);
- bool browseAcquired() const { return true; }
- // Hide the "queue deleted" error for a ReplicatingSubscription when a
- // queue is deleted, this is normal and not an error.
- bool hideDeletedError() { return true; }
- // Not counted for auto deletion and immediate message purposes.
- bool isCounted() { return false; }
-
- /** Initialization that must be done separately from construction
- * because it requires a shared_ptr to this to exist.
- */
- void initialize();
-
- BrokerInfo getBrokerInfo() const { return info; }
-
- /** Skip replicating enqueue of of ids. */
- void addSkip(const ReplicationIdSet& ids);
-
- protected:
- bool doDispatch();
-
- private:
- class QueueObserver;
- friend class QueueObserver;
-
- std::string logPrefix;
- QueuePosition position;
- ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
- ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues.
- ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
- bool ready;
- bool cancelled;
- BrokerInfo info;
- boost::shared_ptr<QueueGuard> guard;
+~ReplicatingSubscription();
+
+
+// Consumer overrides.
+bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
+void cancel();
+void acknowledged(const broker::DeliveryRecord&);
+bool browseAcquired() const { return true; }
+// Hide the "queue deleted" error for a ReplicatingSubscription when a
+// queue is deleted, this is normal and not an error.
+bool hideDeletedError() { return true; }
+
+// QueueObserver overrides
+void enqueued(const broker::Message&) {}
+void dequeued(const broker::Message&);
+void acquired(const broker::Message&) {}
+void requeued(const broker::Message&) {}
+
+/** A ReplicatingSubscription is a passive observer, not counted for auto
+ * deletion and immediate message purposes.
+ */
+bool isCounted() { return false; }
+
+/** Initialization that must be done separately from construction
+ * because it requires a shared_ptr to this to exist.
+ */
+void initialize();
+
+BrokerInfo getBrokerInfo() const { return info; }
+
+/** Skip replicating enqueue of of ids. */
+void addSkip(const ReplicationIdSet& ids);
+
+protected:
+bool doDispatch();
+
+private:
+std::string logPrefix;
+QueuePosition position;
+ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
+ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues.
+ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
+bool ready;
+bool cancelled;
+BrokerInfo info;
+boost::shared_ptr<QueueGuard> guard;
HaBroker& haBroker;
- boost::shared_ptr<QueueObserver> observer;
boost::shared_ptr<Primary> primary;
bool isGuarded(sys::Mutex::ScopedLock&);