summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.h')
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h14
1 files changed, 10 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 147c40ee6d..8af273e4d8 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -85,17 +85,21 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
// Consumer overrides.
void cancel();
- bool isDelayedCompletion() const { return true; }
+ void acknowledged(const broker::QueuedMessage&);
protected:
bool doDispatch();
private:
+ typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
std::string logPrefix;
boost::shared_ptr<broker::Queue> events;
boost::shared_ptr<broker::Consumer> consumer;
- qpid::framing::SequenceSet dequeues;
+ Delayed delayed;
+ framing::SequenceSet dequeues;
framing::SequenceNumber backupPosition;
+ void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&);
+ void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&);
void sendDequeueEvent(const sys::Mutex::ScopedLock&);
void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
void sendEvent(const std::string& key, framing::Buffer&,
@@ -110,9 +114,11 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
void notify();
bool filter(boost::intrusive_ptr<broker::Message>);
bool accept(boost::intrusive_ptr<broker::Message>);
- void cancel();
- bool isDelayedCompletion() const { return false; }
+ void cancel() {}
+ void acknowledged(const broker::QueuedMessage&) {}
+
broker::OwnershipToken* getSession();
+
private:
ReplicatingSubscription& delegate;
};