diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.h')
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 147c40ee6d..8af273e4d8 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -85,17 +85,21 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, // Consumer overrides. void cancel(); - bool isDelayedCompletion() const { return true; } + void acknowledged(const broker::QueuedMessage&); protected: bool doDispatch(); private: + typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed; std::string logPrefix; boost::shared_ptr<broker::Queue> events; boost::shared_ptr<broker::Consumer> consumer; - qpid::framing::SequenceSet dequeues; + Delayed delayed; + framing::SequenceSet dequeues; framing::SequenceNumber backupPosition; + void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&); + void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&); void sendDequeueEvent(const sys::Mutex::ScopedLock&); void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&); void sendEvent(const std::string& key, framing::Buffer&, @@ -110,9 +114,11 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, void notify(); bool filter(boost::intrusive_ptr<broker::Message>); bool accept(boost::intrusive_ptr<broker::Message>); - void cancel(); - bool isDelayedCompletion() const { return false; } + void cancel() {} + void acknowledged(const broker::QueuedMessage&) {} + broker::OwnershipToken* getSession(); + private: ReplicatingSubscription& delegate; }; |