diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 17 |
1 files changed, 10 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 733492db81..6c33002b5c 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -47,6 +47,7 @@ string mask(const string& in) return DOLLAR + in + INTERNAL; } +/* Called by SemanticState::consume to create a consumer */ boost::shared_ptr<broker::SemanticState::ConsumerImpl> ReplicatingSubscription::Factory::create( SemanticState* parent, @@ -122,8 +123,6 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m) { SequenceNumber send(m.position); --send; // Send the position before m was enqueued. sendPositionEvent(send, l); - QPID_LOG(trace, logPrefix << "Sending position " << send - << ", was " << backupPosition); } backupPosition = m.position; QPID_LOG(trace, logPrefix << "Replicating message " << m.position); @@ -137,13 +136,14 @@ ReplicatingSubscription::~ReplicatingSubscription() {} void ReplicatingSubscription::cancel() { QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName()); - getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); + getQueue()->removeObserver( + boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); + ConsumerImpl::cancel(); } // Called before we get notified of the message being available and // under the message lock in the queue. Called in arbitrary connection thread. -void ReplicatingSubscription::enqueued(const QueuedMessage& m) -{ +void ReplicatingSubscription::enqueued(const QueuedMessage& m) { //delay completion m.payload->getIngressCompletion().startCompleter(); } @@ -164,6 +164,8 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) void ReplicatingSubscription::sendPositionEvent( SequenceNumber position, const sys::Mutex::ScopedLock&l ) { + QPID_LOG(trace, logPrefix << "Sending position " << position + << ", was " << backupPosition); string buf(backupPosition.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); position.encode(buffer); @@ -207,7 +209,6 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& // the messageLock in the queue. Called in arbitrary connection threads. void ReplicatingSubscription::dequeued(const QueuedMessage& m) { - QPID_LOG(trace, logPrefix << "Dequeued message " << m.position); { sys::Mutex::ScopedLock l(lock); dequeues.add(m.position); @@ -219,8 +220,10 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& m) // not under the message lock? if (m.position > position) { m.payload->getIngressCompletion().finishCompleter(); - QPID_LOG(trace, logPrefix << "Completed message " << m.position << " early"); + QPID_LOG(trace, logPrefix << "Dequeued and completed message " << m.position << " early"); } + else + QPID_LOG(trace, logPrefix << "Dequeued message " << m.position); } notify(); // Ensure a call to doDispatch } |