summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:07:38 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:07:38 +0000
commitc167f61280bdf7349cb822d5326ec0de2d3ea067 (patch)
treeb3dc19ef2754983951a79111c6c5a4422ce68983
parentda5682adea86ae2d13a69931d84aec2c80c3f248 (diff)
downloadqpid-python-c167f61280bdf7349cb822d5326ec0de2d3ea067.tar.gz
QPID-3603: Code cleanup to make ReplicatingSubscription more readable.
Clarified deliver() and dequeued() logic and locking. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233675 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp52
1 files changed, 23 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index a77154c595..3dae5fd0a7 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -103,14 +103,6 @@ ReplicatingSubscription::ReplicatingSubscription(
QPID_LOG(debug, logPrefix << "Created subscription " << name);
- // Note that broker::Queue::getPosition() returns the sequence
- // number that will be assigned to the next message *minus 1*.
-
- // this->backupPosition tracks the position of the remote backup
- // queue, i.e. the sequence number for the next delivered message
- // *minus one*
- backupPosition = 0;
-
// FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
// so we will start consuming from the lowest numbered message.
// This is incorrect if the sequence number wraps around, but
@@ -121,22 +113,20 @@ ReplicatingSubscription::ReplicatingSubscription(
bool ReplicatingSubscription::deliver(QueuedMessage& m) {
// Add position events for the subscribed queue, not for the internal event queue.
if (m.queue && m.queue == getQueue().get()) {
+ sys::Mutex::ScopedLock l(lock);
assert(position == m.position);
- {
- sys::Mutex::ScopedLock l(lock);
- // this->position is the new position after enqueueing m locally.
- // this->backupPosition is the backup position before enqueueing m.
- assert(position > backupPosition);
- if (position - backupPosition > 1) {
- // Position has advanced because of messages dequeued ahead of us.
- SequenceNumber send(position);
- --send; // Send the position before m was enqueued.
- sendPositionEvent(send, l);
- QPID_LOG(trace, logPrefix << "Sending position " << send
- << ", was " << backupPosition);
- }
- backupPosition = position;
+ // m.position is the position of the newly enqueued m on the local queue.
+ // backupPosition is latest position on the backup queue (before enqueueing m.)
+ assert(m.position > backupPosition);
+ if (m.position - backupPosition > 1) {
+ // Position has advanced because of messages dequeued ahead of us.
+ SequenceNumber send(m.position);
+ --send; // Send the position before m was enqueued.
+ sendPositionEvent(send, l);
+ QPID_LOG(trace, logPrefix << "Sending position " << send
+ << ", was " << backupPosition);
}
+ backupPosition = m.position;
QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
}
return ConsumerImpl::deliver(m);
@@ -215,21 +205,25 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer&
}
// Called after the message has been removed from the deque and under
-// the message lock in the queue. Called in arbitrary connection threads.
+// the messageLock in the queue. Called in arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& m)
{
QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
{
sys::Mutex::ScopedLock l(lock);
dequeues.add(m.position);
+ // If we have not yet sent this message to the backup, then
+ // complete it now as it will never be accepted.
+
+ // FIXME aconway 2012-01-05: suspect use of position in
+ // foreign connection thread. Race with deliver() which is
+ // not under the message lock?
+ if (m.position > position) {
+ m.payload->getIngressCompletion().finishCompleter();
+ QPID_LOG(trace, logPrefix << "Completed message " << m.position << " early");
+ }
}
notify(); // Ensure a call to doDispatch
- // FIXME aconway 2011-12-20: not thread safe to access position here,
- // we're not in the dispatch thread.
- if (m.position > position) {
- m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(trace, logPrefix << "Completed message " << m.position << " early");
- }
}
// Called in subscription's connection thread.