summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp289
1 files changed, 127 insertions, 162 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index a9bd7b49f8..7b153f90ca 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -20,12 +20,16 @@
*/
#include "makeMessage.h"
+#include "IdSetter.h"
#include "QueueGuard.h"
-#include "QueueRange.h"
#include "QueueReplicator.h"
+#include "QueueSnapshots.h"
#include "ReplicatingSubscription.h"
#include "Primary.h"
+#include "HaBroker.h"
+#include "qpid/assert.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
@@ -35,6 +39,7 @@
#include "qpid/types/Uuid.h"
#include <sstream>
+
namespace qpid {
namespace ha {
@@ -45,53 +50,20 @@ using sys::Mutex;
using broker::amqp_0_10::MessageTransfer;
const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
-const string ReplicatingSubscription::QPID_BACK("qpid.ha-back");
-const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front");
const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info");
+const string ReplicatingSubscription::QPID_ID_SET("qpid.ha-info");
-namespace {
-const string DOLLAR("$");
-const string INTERNAL("-internal");
-} // namespace
-
-// Scan the queue for gaps and add them to the subscriptions dequed set.
-class DequeueScanner
-{
+class ReplicatingSubscription::QueueObserver : public broker::QueueObserver {
public:
- DequeueScanner(
- ReplicatingSubscription& rs,
- SequenceNumber front_,
- SequenceNumber back_ // Inclusive
- ) : subscription(rs), front(front_), back(back_)
- {
- assert(front <= back);
- // INVARIANT deques have been added for positions <= at.
- at = front - 1;
- }
-
- void operator()(const Message& m) {
- if (m.getSequence() >= front && m.getSequence() <= back) {
- if (m.getSequence() > at+1) subscription.dequeued(at+1, m.getSequence()-1);
- at = m.getSequence();
- }
- }
-
- // Must call after scanning the queue.
- void finish() {
- if (at < back) subscription.dequeued(at+1, back);
- }
-
+ QueueObserver(ReplicatingSubscription& rs_) : rs(rs_) {}
+ void enqueued(const broker::Message&) {}
+ void dequeued(const broker::Message& m) { rs.dequeued(m.getReplicationId()); }
+ void acquired(const broker::Message&) {}
+ void requeued(const broker::Message&) {}
private:
- ReplicatingSubscription& subscription;
- SequenceNumber front;
- SequenceNumber back;
- SequenceNumber at;
+ ReplicatingSubscription& rs;
};
-string mask(const string& in)
-{
- return DOLLAR + in + INTERNAL;
-}
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
@@ -110,6 +82,7 @@ ReplicatingSubscription::Factory::create(
boost::shared_ptr<ReplicatingSubscription> rs;
if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
rs.reset(new ReplicatingSubscription(
+ haBroker,
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
rs->initialize();
@@ -117,7 +90,15 @@ ReplicatingSubscription::Factory::create(
return rs;
}
+namespace {
+void copyIf(boost::shared_ptr<MessageInterceptor> from, boost::shared_ptr<IdSetter>& to) {
+ boost::shared_ptr<IdSetter> result = boost::dynamic_pointer_cast<IdSetter>(from);
+ if (result) to = result;
+}
+} // namespace
+
ReplicatingSubscription::ReplicatingSubscription(
+ HaBroker& hb,
SemanticState* parent,
const string& name,
Queue::shared_ptr queue,
@@ -130,7 +111,8 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
- ready(false)
+ position(0), ready(false), cancelled(false),
+ haBroker(hb)
{
try {
FieldTable ft;
@@ -140,64 +122,57 @@ ReplicatingSubscription::ReplicatingSubscription(
// Set a log prefix message that identifies the remote broker.
ostringstream os;
- os << "Primary " << queue->getName() << "@" << info << ": ";
+ os << "Subscription to " << queue->getName() << " at " << info << ": ";
logPrefix = os.str();
- // NOTE: Once the guard is attached we can have concurrent
- // calls to dequeued so we need to lock use of this->dequeues.
- //
- // However we must attach the guard _before_ we scan for
- // initial dequeues to be sure we don't miss any dequeues
- // between the scan and attaching the guard.
+ // If this is a non-cluster standalone replication then we need to
+ // set up an IdSetter if there is not already one.
+ boost::shared_ptr<IdSetter> idSetter;
+ queue->getMessageInterceptors().each(
+ boost::bind(&copyIf, _1, boost::ref(idSetter)));
+ if (!idSetter) {
+ QPID_LOG(debug, logPrefix << "Standalone replication");
+ queue->getMessageInterceptors().add(
+ boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), 1)));
+ }
+
+ // If there's already a guard (we are in failover) use it, else create one.
if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info));
- guard->attach(*this);
- QueueRange backup(arguments); // Remote backup range.
- QueueRange backupOriginal(backup);
- QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
- backupPosition = backup.back;
-
- // Sync backup and primary queues, don't send messages already on the backup
-
- if (backup.front > primary.front || // Missing messages at front
- backup.back < primary.front || // No overlap
- primary.empty() || backup.empty()) // Empty
+ // NOTE: Once the observer is attached we can have concurrent
+ // calls to dequeued so we need to lock use of this->dequeues.
+ //
+ // However we must attach the observer _before_ we snapshot for
+ // initial dequeues to be sure we don't miss any dequeues
+ // between the snapshot and attaching the observer.
+ observer.reset(new QueueObserver(*this));
+ queue->addObserver(observer);
+ ReplicationIdSet primary = haBroker.getQueueSnapshots()->get(queue)->snapshot();
+ std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET);
+ ReplicationIdSet backup;
+ if (!backupStr.empty()) backup = decodeStr<ReplicationIdSet>(backupStr);
+
+ // Initial dequeues are messages on backup but not on primary.
+ ReplicationIdSet initDequeues = backup - primary;
+ QueuePosition front,back;
+ queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue
{
- // No useful overlap - erase backup and start from the beginning
- if (!backup.empty()) dequeued(backup.front, backup.back);
- position = primary.front-1;
+ sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
+ dequeues += initDequeues; // Messages on backup that are not on primary.
+ skip = backup - initDequeues; // Messages already on the backup.
+
+ // Queue front is moving but we know this subscriptions will start at a
+ // position >= front so if front is safe then position must be.
+ position = front;
+
+ QPID_LOG(debug, logPrefix << "Subscribed: front " << front
+ << ", back " << back
+ << ", start " << position
+ << ", guarded " << guard->getFirst()
+ << ", on backup " << skip);
+ checkReady(l);
}
- else { // backup and primary do overlap.
- // Remove messages from backup that are not in primary.
- if (primary.back < backup.back) {
- dequeued(primary.back+1, backup.back); // Trim excess messages at back
- backup.back = primary.back;
- }
- if (backup.front < primary.front) {
- dequeued(backup.front, primary.front-1); // Trim excess messages at front
- backup.front = primary.front;
- }
- DequeueScanner scan(*this, backup.front, backup.back);
- // FIXME aconway 2012-06-15: Optimize queue traversal, only in range.
- queue->eachMessage(boost::ref(scan)); // Remove missing messages in between.
- scan.finish();
- position = backup.back;
- //move cursor to position
- queue->seek(*this, position);
- }
- // NOTE: we are assuming that the messages that are on the backup are
- // consistent with those on the primary. If the backup is a replica
- // queue and hasn't been tampered with then that will be the case.
-
- QPID_LOG(debug, logPrefix << "Subscribed:"
- << " backup:" << backupOriginal << " adjusted backup:" << backup
- << " primary:" << primary
- << " catch-up: " << position << "-" << primary.back
- << "(" << primary.back-position << ")");
-
- // Check if we are ready yet.
- if (guard->subscriptionStart(position)) setReady();
}
catch (const std::exception& e) {
QPID_LOG(error, logPrefix << "Creation error: " << e.what()
@@ -208,6 +183,7 @@ ReplicatingSubscription::ReplicatingSubscription(
ReplicatingSubscription::~ReplicatingSubscription() {}
+
// Called in subscription's connection thread when the subscription is created.
// Called separate from ctor because sending events requires
// shared_from_this
@@ -215,12 +191,9 @@ ReplicatingSubscription::~ReplicatingSubscription() {}
void ReplicatingSubscription::initialize() {
try {
Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
-
- // Send initial dequeues and position to the backup.
+ // Send initial dequeues to the backup.
// There must be a shared_ptr(this) when sending.
sendDequeueEvent(l);
- sendPositionEvent(position, l);
- backupPosition = position;
}
catch (const std::exception& e) {
QPID_LOG(error, logPrefix << "Initialization error: " << e.what()
@@ -229,53 +202,64 @@ void ReplicatingSubscription::initialize() {
}
}
+// True if the next position for the ReplicatingSubscription is a guarded position.
+bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) {
+ return position+1 >= guard->getFirst();
+}
+
// Message is delivered in the subscription's connection thread.
bool ReplicatingSubscription::deliver(
const qpid::broker::QueueCursor& c, const qpid::broker::Message& m)
{
+ Mutex::ScopedLock l(lock);
+ ReplicationId id = m.getReplicationId();
+ position = m.getSequence();
try {
- QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence());
- {
- Mutex::ScopedLock l(lock);
- position = m.getSequence();
-
- // m.getSequence() is the position of the new message on local queue.
- // backupPosition is latest position on backup queue before enqueueing
- if (m.getSequence() <= backupPosition)
- throw Exception(
- QPID_MSG(logPrefix << "Expected position > " << backupPosition
- << " but got " << m.getSequence()));
- if (m.getSequence() - backupPosition > 1) {
- // Position has advanced because of messages dequeued ahead of us.
- // Send the position before message was enqueued.
- sendPositionEvent(m.getSequence()-1, l);
- }
- // Backup will automatically advance by 1 on delivery of message.
- backupPosition = m.getSequence();
+ bool result = false;
+ if (skip.contains(id)) {
+ skip -= id;
+ guard->complete(id); // This will never be acknowledged.
+ result = false;
+ }
+ else {
+ QPID_LOG(trace, logPrefix << "Replicated " << LogMessageId(*getQueue(), m));
+ // Only consider unguarded messages for ready status.
+ if (!ready && !isGuarded(l)) unacked += id;
+ sendIdEvent(id, l);
+ result = ConsumerImpl::deliver(c, m);
}
- return ConsumerImpl::deliver(c, m);
+ checkReady(l);
+ return result;
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence()
+ QPID_LOG(critical, logPrefix << "Error replicating " << LogMessageId(*getQueue(), m)
<< ": " << e.what());
throw;
}
}
-void ReplicatingSubscription::setReady() {
- {
- Mutex::ScopedLock l(lock);
- if (ready) return;
+/**
+ *@param position: must be <= last position seen by subscription.
+ */
+void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
+ if (!ready && isGuarded(l) && unacked.empty()) {
ready = true;
+ sys::Mutex::ScopedUnlock u(lock);
+ // Notify Primary that a subscription is ready.
+ QPID_LOG(debug, logPrefix << "Caught up");
+ if (Primary::get()) Primary::get()->readyReplica(*this);
}
- // Notify Primary that a subscription is ready.
- QPID_LOG(debug, logPrefix << "Caught up");
- if (Primary::get()) Primary::get()->readyReplica(*this);
}
// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
{
+ {
+ Mutex::ScopedLock l(lock);
+ if (cancelled) return;
+ cancelled = true;
+ }
QPID_LOG(debug, logPrefix << "Cancelled");
+ getQueue()->removeObserver(observer);
guard->cancel();
ConsumerImpl::cancel();
}
@@ -283,10 +267,15 @@ void ReplicatingSubscription::cancel()
// Consumer override, called on primary in the backup's IO thread.
void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
// Finish completion of message, it has been acknowledged by the backup.
- QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId());
- guard->complete(r.getMessageId());
- // If next message is protected by the guard then we are ready
- if (r.getMessageId() >= guard->getRange().back) setReady();
+ ReplicationId id = r.getReplicationId();
+ QPID_LOG(trace, logPrefix << "Acknowledged " <<
+ LogMessageId(*getQueue(), r.getMessageId(), r.getReplicationId()));
+ guard->complete(id);
+ {
+ Mutex::ScopedLock l(lock);
+ unacked -= id;
+ checkReady(l);
+ }
ConsumerImpl::acknowledged(r);
}
@@ -295,59 +284,36 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
{
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
- string buf(dequeues.encodedSize(),'\0');
- framing::Buffer buffer(&buf[0], buf.size());
- dequeues.encode(buffer);
+ string buffer = encodeStr(dequeues);
dequeues.clear();
- buffer.reset();
- {
- Mutex::ScopedUnlock u(lock);
- sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
- }
+ sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
}
-// Called via QueueObserver::dequeued override on guard.
// Called after the message has been removed
// from the deque and under the messageLock in the queue. Called in
// arbitrary connection threads.
-void ReplicatingSubscription::dequeued(const Message& m)
+void ReplicatingSubscription::dequeued(ReplicationId id)
{
- QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence());
+ QPID_LOG(trace, logPrefix << "Dequeued ID " << id);
{
Mutex::ScopedLock l(lock);
- dequeues.add(m.getSequence());
+ dequeues.add(id);
}
notify(); // Ensure a call to doDispatch
}
-// Called during construction while scanning for initial dequeues.
-void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) {
- QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]");
- {
- Mutex::ScopedLock l(lock);
- dequeues.add(first,last);
- }
-}
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock& l)
+void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l)
{
- if (pos == backupPosition) return; // No need to send.
- QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition);
- string buf(pos.encodedSize(),'\0');
- framing::Buffer buffer(&buf[0], buf.size());
- pos.encode(buffer);
- buffer.reset();
- {
- Mutex::ScopedUnlock u(lock);
- sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
- }
+ sendEvent(QueueReplicator::ID_EVENT_KEY, encodeStr(pos), l);
}
void ReplicatingSubscription::sendEvent(const std::string& key,
- const framing::Buffer& buffer,
+ const std::string& buffer,
Mutex::ScopedLock&)
{
+ Mutex::ScopedUnlock u(lock);
broker::Message message = makeMessage(buffer);
MessageTransfer& transfer = MessageTransfer::get(message);
DeliveryProperties* props =
@@ -370,7 +336,6 @@ bool ReplicatingSubscription::doDispatch()
return ConsumerImpl::doDispatch();
}
catch (const std::exception& e) {
- // FIXME aconway 2012-10-05: detect queue deletion, no warning.
QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what());
return false;
}