summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp177
1 files changed, 91 insertions, 86 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index c74abb6cdd..08f6fb7dcc 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -49,30 +49,38 @@ const string DOLLAR("$");
const string INTERNAL("-internal");
} // namespace
-class DequeueRemover
+// Scan the queue for gaps and add them to the subscriptions dequed set.
+class DequeueScanner
{
public:
- DequeueRemover(
- SequenceSet& r,
- const SequenceNumber& s,
- const SequenceNumber& e
- ) : dequeues(r), start(s), end(e)
+ DequeueScanner(
+ ReplicatingSubscription& rs,
+ const SequenceNumber& first_,
+ const SequenceNumber& last_ // Inclusive
+ ) : subscription(rs), first(first_), last(last_)
{
- dequeues.add(start, end);
+ assert(first <= last);
+ // INVARIANT no deques are needed for positions <= at.
+ at = first - 1;
}
- void operator()(const QueuedMessage& message) {
- if (message.position >= start && message.position <= end) {
- //i.e. message is within the intial range and has not been dequeued,
- //so remove it from the dequeues
- dequeues.remove(message.position);
+ void operator()(const QueuedMessage& qm) {
+ if (qm.position >= first && qm.position <= last) {
+ if (qm.position > at+1)
+ subscription.dequeued(at+1, qm.position-1);
+ at = qm.position;
}
}
+ // Must call after scanning the queue.
+ void finish() {
+ if (at < last) subscription.dequeued(at+1, last);
+ }
private:
- SequenceSet& dequeues;
- const SequenceNumber start;
- const SequenceNumber end;
+ ReplicatingSubscription& subscription;
+ SequenceNumber first;
+ SequenceNumber last;
+ SequenceNumber at;
};
string mask(const string& in)
@@ -113,11 +121,6 @@ bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front)
return getNext(q, 0, front);
}
-bool ReplicatingSubscription::isEmpty(broker::Queue& q) {
- SequenceNumber front;
- return getFront(q, front);
-}
-
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
@@ -149,11 +152,11 @@ struct QueueRange {
QueueRange() { }
- // FIXME aconway 2012-05-26: fix front calculation
QueueRange(broker::Queue& q) {
back = q.getPosition();
- front = back+1;
+ front = back+1; // Assume empty
empty = !ReplicatingSubscription::getFront(q, front);
+ assert(empty || front <= back);
}
QueueRange(const framing::FieldTable args) {
@@ -163,7 +166,8 @@ struct QueueRange {
if (!empty) {
front = args.getAsInt(ReplicatingSubscription::QPID_FRONT);
if (back < front)
- throw InvalidArgumentException("Invalid bounds for backup queue");
+ throw InvalidArgumentException(
+ QPID_MSG("Invalid range [" << front << "," << back <<"]"));
}
}
};
@@ -192,68 +196,72 @@ ReplicatingSubscription::ReplicatingSubscription(
try {
FieldTable ft;
if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
- throw Exception("Replicating subscription does not have broker info");
+ throw Exception("Replicating subscription does not have broker info: " + tag);
info.assign(ft);
// Set a log prefix message that identifies the remote broker.
ostringstream os;
- os << "HA primary replicate " << queue->getName() << "@" << info.getLogId() << ": ";
+ os << "HA primary replica " << queue->getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
- QueueRange primary(*queue);
- QueueRange backup(arguments);
+ // FIXME aconway 2012-06-10: unsafe to rely in queue front or position they are changing?
+
+ QueueRange primary(*queue); // The local primary queue.
+ QueueRange backup(arguments); // The remote backup state.
backupPosition = backup.back;
+
+ // NOTE: Once the guard is attached we can have concurrent
+ // calles to dequeued so we need to lock use of this->deques.
+ //
+
+ // However we must attach the guard _before_ we scan for
+ // initial dequeues to be sure we don't miss any dequeues
+ // between the scan and attaching the guard.
+ if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo());
+ if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo()));
+ guard->attach(*this);
+
// We can re-use some backup messages if backup and primary queues
// overlap and the backup is not missing messages at the front of the queue.
- if (!primary.empty && // Primary not empty
+ // FIXME aconway 2012-06-10: disable re-use of backup queue till stall problem is solved.
+ /* if (!primary.empty && // Primary not empty
!backup.empty && // Backup not empty
primary.front >= backup.front && // Not missing messages at the front
primary.front <= backup.back // Overlap
)
{
- // Remove messages that are still on the primary queue from dequeues
- // FIXME aconway 2012-05-22: optimize to iterate only the relevant
- // section of the queue
- DequeueRemover remover(dequeues, backup.front, backup.back);
- queue->eachMessage(remover);
- position = std::min(primary.back, backup.back);
+ // Scan primary queue for gaps that should be dequeued on the backup.
+ // NOTE: this runs concurrently with the guard calling dequeued()
+ // FIXME aconway 2012-05-22: optimize queue iteration
+ DequeueScanner scan(*this, backup.front, backup.back);
+ queue->eachMessage(scan);
+ scan.finish();
+ // If the backup was ahead it has been pruned back to the primary.
+ position = std::min(guard->getFirstSafe(), backup.back);
}
- else {
+ else */ {
// Clear the backup queue and reset to start browsing at the
// front of the primary queue.
- if (!backup.empty) dequeues.add(backup.front, backup.back);
+ if (!backup.empty) dequeued(backup.front, backup.back);
position = primary.front - 1; // Start consuming from front.
-
}
QPID_LOG(debug, logPrefix << "Subscribed: "
- << " backup" << backup
- << " primary" << primary
- << " position=" << position
- << " dequeues=" << dequeues);
-
- // Set the guard
- if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo());
- if (!guard) {
- QPID_LOG(debug, logPrefix << "No pre-set guard found, creating one.");
- guard.reset(new QueueGuard(*queue, getBrokerInfo()));
- }
- guard->attach(*this);
-
- // Guard is active, dequeued can be called concurrently.
- Mutex::ScopedLock l(lock);
-
- // Set the ready position. All messages after this position have
- // been seen by the guard.
- readyPosition = guard->getReadyPosition();
- if (position >= readyPosition || isEmpty(*getQueue()))
- setReady(l);
+ << " backup:" << backup
+ << " backup position:" << backupPosition
+ << " primary:" << primary
+ << " position:" << position
+ );
+
+ // Are we ready yet?
+ if (position+1 >= guard->getFirstSafe()) // Next message will be safe.
+ setReady();
else
QPID_LOG(debug, logPrefix << "Catching up from "
- << position << " to " << readyPosition);
+ << position << " to " << guard->getFirstSafe());
}
catch (const std::exception& e) {
- throw Exception(QPID_MSG(logPrefix << "Error setting up replication: "
- << e.what()));
+ throw InvalidArgumentException(QPID_MSG(logPrefix << e.what()
+ << ": arguments=" << arguments));
}
}
@@ -295,16 +303,13 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
// Send the position before qm was enqueued.
sendPositionEvent(qm.position-1, l);
}
- // Backup will automaticall advance by 1 on delivery of message.
+ // Backup will automatically advance by 1 on delivery of message.
backupPosition = qm.position;
}
// Deliver the message
bool delivered = ConsumerImpl::deliver(qm);
- {
- Mutex::ScopedLock l(lock);
- // If we have advanced to the initial position, the backup is ready.
- if (qm.position >= readyPosition) setReady(l);
- }
+ // If we have advanced past the initial position, the backup is ready.
+ if (qm.position >= guard->getFirstSafe()) setReady();
return delivered;
}
else
@@ -316,15 +321,15 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
}
}
-void ReplicatingSubscription::setReady(Mutex::ScopedLock&) {
- if (ready) return;
- ready = true;
- // Notify Primary that a subscription is ready.
+void ReplicatingSubscription::setReady() {
{
- Mutex::ScopedUnlock u(lock);
- QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
- if (Primary::get()) Primary::get()->readyReplica(*this);
+ Mutex::ScopedLock l(lock);
+ if (ready) return;
+ ready = true;
}
+ // Notify Primary that a subscription is ready.
+ QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
+ if (Primary::get()) Primary::get()->readyReplica(*this);
}
// Called in the subscription's connection thread.
@@ -341,12 +346,9 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
guard->complete(qm);
}
+ ConsumerImpl::acknowledged(qm);
}
-// Hide the "queue deleted" error for a ReplicatingSubscription when a
-// queue is deleted, this is normal and not an error.
-bool ReplicatingSubscription::hideDeletedError() { return true; }
-
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
{
@@ -370,25 +372,28 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
assert (qm.queue == getQueue().get());
- bool doComplete = false;
+ QPID_LOG(trace, logPrefix << "Dequeued " << qm);
{
Mutex::ScopedLock l(lock);
- assert(!dequeues.contains(qm.position));
dequeues.add(qm.position);
- // If we have not yet sent this message to the backup, then
- // complete it now as it will never be accepted.
- if (qm.position > position) doComplete = true;
}
- if (doComplete) guard->complete(qm);
notify(); // Ensure a call to doDispatch
}
+// Called during construction while scanning for initial dequeues.
+void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) {
+ QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]");
+ {
+ Mutex::ScopedLock l(lock);
+ dequeues.add(first,last);
+ }
+}
+
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&)
{
if (pos == backupPosition) return; // No need to send.
- QPID_LOG(trace, logPrefix << "Sending position " << pos
- << ", was " << backupPosition);
+ QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition);
string buf(pos.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
pos.encode(buffer);