summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp17
1 files changed, 10 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 733492db81..6c33002b5c 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -47,6 +47,7 @@ string mask(const string& in)
return DOLLAR + in + INTERNAL;
}
+/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
SemanticState* parent,
@@ -122,8 +123,6 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m) {
SequenceNumber send(m.position);
--send; // Send the position before m was enqueued.
sendPositionEvent(send, l);
- QPID_LOG(trace, logPrefix << "Sending position " << send
- << ", was " << backupPosition);
}
backupPosition = m.position;
QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
@@ -137,13 +136,14 @@ ReplicatingSubscription::~ReplicatingSubscription() {}
void ReplicatingSubscription::cancel()
{
QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
- getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+ getQueue()->removeObserver(
+ boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+ ConsumerImpl::cancel();
}
// Called before we get notified of the message being available and
// under the message lock in the queue. Called in arbitrary connection thread.
-void ReplicatingSubscription::enqueued(const QueuedMessage& m)
-{
+void ReplicatingSubscription::enqueued(const QueuedMessage& m) {
//delay completion
m.payload->getIngressCompletion().startCompleter();
}
@@ -164,6 +164,8 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
void ReplicatingSubscription::sendPositionEvent(
SequenceNumber position, const sys::Mutex::ScopedLock&l )
{
+ QPID_LOG(trace, logPrefix << "Sending position " << position
+ << ", was " << backupPosition);
string buf(backupPosition.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
position.encode(buffer);
@@ -207,7 +209,6 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer&
// the messageLock in the queue. Called in arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& m)
{
- QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
{
sys::Mutex::ScopedLock l(lock);
dequeues.add(m.position);
@@ -219,8 +220,10 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& m)
// not under the message lock?
if (m.position > position) {
m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(trace, logPrefix << "Completed message " << m.position << " early");
+ QPID_LOG(trace, logPrefix << "Dequeued and completed message " << m.position << " early");
}
+ else
+ QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
}
notify(); // Ensure a call to doDispatch
}