summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-06-17 14:19:10 +0000
committerAlan Conway <aconway@apache.org>2013-06-17 14:19:10 +0000
commit1093062db03077998823cbefb9ca9645076694ea (patch)
tree577707a6dafe630794aaafd69a7d127ee88a9dbd /cpp/src/qpid/ha/ReplicatingSubscription.cpp
parent1ac5dfc01dbfaa648d5f75eec8b066696f307e61 (diff)
downloadqpid-python-1093062db03077998823cbefb9ca9645076694ea.tar.gz
QPID-4348: HA Use independent sequence numbers for identifying messages
Previously HA code used queue sequence numbers to identify messasges. This assumes that message sequence is identical on primary and backup. Implementing new features (for example transactions) requires that we tolerate ordering differences between primary and backups. This patch introduces a new, queue-scoped HA sequence number managed by the HA plugin. The HA ID is set *before* the message is enqueued and assigned a queue sequence number. This means it is possible to identify messages before they are enqueued, e.g. messages in an open transaction. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1493771 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp289
1 files changed, 127 insertions, 162 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index a9bd7b49f8..7b153f90ca 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -20,12 +20,16 @@
*/
#include "makeMessage.h"
+#include "IdSetter.h"
#include "QueueGuard.h"
-#include "QueueRange.h"
#include "QueueReplicator.h"
+#include "QueueSnapshots.h"
#include "ReplicatingSubscription.h"
#include "Primary.h"
+#include "HaBroker.h"
+#include "qpid/assert.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
@@ -35,6 +39,7 @@
#include "qpid/types/Uuid.h"
#include <sstream>
+
namespace qpid {
namespace ha {
@@ -45,53 +50,20 @@ using sys::Mutex;
using broker::amqp_0_10::MessageTransfer;
const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
-const string ReplicatingSubscription::QPID_BACK("qpid.ha-back");
-const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front");
const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info");
+const string ReplicatingSubscription::QPID_ID_SET("qpid.ha-info");
-namespace {
-const string DOLLAR("$");
-const string INTERNAL("-internal");
-} // namespace
-
-// Scan the queue for gaps and add them to the subscriptions dequed set.
-class DequeueScanner
-{
+class ReplicatingSubscription::QueueObserver : public broker::QueueObserver {
public:
- DequeueScanner(
- ReplicatingSubscription& rs,
- SequenceNumber front_,
- SequenceNumber back_ // Inclusive
- ) : subscription(rs), front(front_), back(back_)
- {
- assert(front <= back);
- // INVARIANT deques have been added for positions <= at.
- at = front - 1;
- }
-
- void operator()(const Message& m) {
- if (m.getSequence() >= front && m.getSequence() <= back) {
- if (m.getSequence() > at+1) subscription.dequeued(at+1, m.getSequence()-1);
- at = m.getSequence();
- }
- }
-
- // Must call after scanning the queue.
- void finish() {
- if (at < back) subscription.dequeued(at+1, back);
- }
-
+ QueueObserver(ReplicatingSubscription& rs_) : rs(rs_) {}
+ void enqueued(const broker::Message&) {}
+ void dequeued(const broker::Message& m) { rs.dequeued(m.getReplicationId()); }
+ void acquired(const broker::Message&) {}
+ void requeued(const broker::Message&) {}
private:
- ReplicatingSubscription& subscription;
- SequenceNumber front;
- SequenceNumber back;
- SequenceNumber at;
+ ReplicatingSubscription& rs;
};
-string mask(const string& in)
-{
- return DOLLAR + in + INTERNAL;
-}
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
@@ -110,6 +82,7 @@ ReplicatingSubscription::Factory::create(
boost::shared_ptr<ReplicatingSubscription> rs;
if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
rs.reset(new ReplicatingSubscription(
+ haBroker,
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
rs->initialize();
@@ -117,7 +90,15 @@ ReplicatingSubscription::Factory::create(
return rs;
}
+namespace {
+void copyIf(boost::shared_ptr<MessageInterceptor> from, boost::shared_ptr<IdSetter>& to) {
+ boost::shared_ptr<IdSetter> result = boost::dynamic_pointer_cast<IdSetter>(from);
+ if (result) to = result;
+}
+} // namespace
+
ReplicatingSubscription::ReplicatingSubscription(
+ HaBroker& hb,
SemanticState* parent,
const string& name,
Queue::shared_ptr queue,
@@ -130,7 +111,8 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
- ready(false)
+ position(0), ready(false), cancelled(false),
+ haBroker(hb)
{
try {
FieldTable ft;
@@ -140,64 +122,57 @@ ReplicatingSubscription::ReplicatingSubscription(
// Set a log prefix message that identifies the remote broker.
ostringstream os;
- os << "Primary " << queue->getName() << "@" << info << ": ";
+ os << "Subscription to " << queue->getName() << " at " << info << ": ";
logPrefix = os.str();
- // NOTE: Once the guard is attached we can have concurrent
- // calls to dequeued so we need to lock use of this->dequeues.
- //
- // However we must attach the guard _before_ we scan for
- // initial dequeues to be sure we don't miss any dequeues
- // between the scan and attaching the guard.
+ // If this is a non-cluster standalone replication then we need to
+ // set up an IdSetter if there is not already one.
+ boost::shared_ptr<IdSetter> idSetter;
+ queue->getMessageInterceptors().each(
+ boost::bind(&copyIf, _1, boost::ref(idSetter)));
+ if (!idSetter) {
+ QPID_LOG(debug, logPrefix << "Standalone replication");
+ queue->getMessageInterceptors().add(
+ boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), 1)));
+ }
+
+ // If there's already a guard (we are in failover) use it, else create one.
if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info));
- guard->attach(*this);
- QueueRange backup(arguments); // Remote backup range.
- QueueRange backupOriginal(backup);
- QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
- backupPosition = backup.back;
-
- // Sync backup and primary queues, don't send messages already on the backup
-
- if (backup.front > primary.front || // Missing messages at front
- backup.back < primary.front || // No overlap
- primary.empty() || backup.empty()) // Empty
+ // NOTE: Once the observer is attached we can have concurrent
+ // calls to dequeued so we need to lock use of this->dequeues.
+ //
+ // However we must attach the observer _before_ we snapshot for
+ // initial dequeues to be sure we don't miss any dequeues
+ // between the snapshot and attaching the observer.
+ observer.reset(new QueueObserver(*this));
+ queue->addObserver(observer);
+ ReplicationIdSet primary = haBroker.getQueueSnapshots()->get(queue)->snapshot();
+ std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET);
+ ReplicationIdSet backup;
+ if (!backupStr.empty()) backup = decodeStr<ReplicationIdSet>(backupStr);
+
+ // Initial dequeues are messages on backup but not on primary.
+ ReplicationIdSet initDequeues = backup - primary;
+ QueuePosition front,back;
+ queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue
{
- // No useful overlap - erase backup and start from the beginning
- if (!backup.empty()) dequeued(backup.front, backup.back);
- position = primary.front-1;
+ sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
+ dequeues += initDequeues; // Messages on backup that are not on primary.
+ skip = backup - initDequeues; // Messages already on the backup.
+
+ // Queue front is moving but we know this subscriptions will start at a
+ // position >= front so if front is safe then position must be.
+ position = front;
+
+ QPID_LOG(debug, logPrefix << "Subscribed: front " << front
+ << ", back " << back
+ << ", start " << position
+ << ", guarded " << guard->getFirst()
+ << ", on backup " << skip);
+ checkReady(l);
}
- else { // backup and primary do overlap.
- // Remove messages from backup that are not in primary.
- if (primary.back < backup.back) {
- dequeued(primary.back+1, backup.back); // Trim excess messages at back
- backup.back = primary.back;
- }
- if (backup.front < primary.front) {
- dequeued(backup.front, primary.front-1); // Trim excess messages at front
- backup.front = primary.front;
- }
- DequeueScanner scan(*this, backup.front, backup.back);
- // FIXME aconway 2012-06-15: Optimize queue traversal, only in range.
- queue->eachMessage(boost::ref(scan)); // Remove missing messages in between.
- scan.finish();
- position = backup.back;
- //move cursor to position
- queue->seek(*this, position);
- }
- // NOTE: we are assuming that the messages that are on the backup are
- // consistent with those on the primary. If the backup is a replica
- // queue and hasn't been tampered with then that will be the case.
-
- QPID_LOG(debug, logPrefix << "Subscribed:"
- << " backup:" << backupOriginal << " adjusted backup:" << backup
- << " primary:" << primary
- << " catch-up: " << position << "-" << primary.back
- << "(" << primary.back-position << ")");
-
- // Check if we are ready yet.
- if (guard->subscriptionStart(position)) setReady();
}
catch (const std::exception& e) {
QPID_LOG(error, logPrefix << "Creation error: " << e.what()
@@ -208,6 +183,7 @@ ReplicatingSubscription::ReplicatingSubscription(
ReplicatingSubscription::~ReplicatingSubscription() {}
+
// Called in subscription's connection thread when the subscription is created.
// Called separate from ctor because sending events requires
// shared_from_this
@@ -215,12 +191,9 @@ ReplicatingSubscription::~ReplicatingSubscription() {}
void ReplicatingSubscription::initialize() {
try {
Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
-
- // Send initial dequeues and position to the backup.
+ // Send initial dequeues to the backup.
// There must be a shared_ptr(this) when sending.
sendDequeueEvent(l);
- sendPositionEvent(position, l);
- backupPosition = position;
}
catch (const std::exception& e) {
QPID_LOG(error, logPrefix << "Initialization error: " << e.what()
@@ -229,53 +202,64 @@ void ReplicatingSubscription::initialize() {
}
}
+// True if the next position for the ReplicatingSubscription is a guarded position.
+bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) {
+ return position+1 >= guard->getFirst();
+}
+
// Message is delivered in the subscription's connection thread.
bool ReplicatingSubscription::deliver(
const qpid::broker::QueueCursor& c, const qpid::broker::Message& m)
{
+ Mutex::ScopedLock l(lock);
+ ReplicationId id = m.getReplicationId();
+ position = m.getSequence();
try {
- QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence());
- {
- Mutex::ScopedLock l(lock);
- position = m.getSequence();
-
- // m.getSequence() is the position of the new message on local queue.
- // backupPosition is latest position on backup queue before enqueueing
- if (m.getSequence() <= backupPosition)
- throw Exception(
- QPID_MSG(logPrefix << "Expected position > " << backupPosition
- << " but got " << m.getSequence()));
- if (m.getSequence() - backupPosition > 1) {
- // Position has advanced because of messages dequeued ahead of us.
- // Send the position before message was enqueued.
- sendPositionEvent(m.getSequence()-1, l);
- }
- // Backup will automatically advance by 1 on delivery of message.
- backupPosition = m.getSequence();
+ bool result = false;
+ if (skip.contains(id)) {
+ skip -= id;
+ guard->complete(id); // This will never be acknowledged.
+ result = false;
+ }
+ else {
+ QPID_LOG(trace, logPrefix << "Replicated " << LogMessageId(*getQueue(), m));
+ // Only consider unguarded messages for ready status.
+ if (!ready && !isGuarded(l)) unacked += id;
+ sendIdEvent(id, l);
+ result = ConsumerImpl::deliver(c, m);
}
- return ConsumerImpl::deliver(c, m);
+ checkReady(l);
+ return result;
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence()
+ QPID_LOG(critical, logPrefix << "Error replicating " << LogMessageId(*getQueue(), m)
<< ": " << e.what());
throw;
}
}
-void ReplicatingSubscription::setReady() {
- {
- Mutex::ScopedLock l(lock);
- if (ready) return;
+/**
+ *@param position: must be <= last position seen by subscription.
+ */
+void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
+ if (!ready && isGuarded(l) && unacked.empty()) {
ready = true;
+ sys::Mutex::ScopedUnlock u(lock);
+ // Notify Primary that a subscription is ready.
+ QPID_LOG(debug, logPrefix << "Caught up");
+ if (Primary::get()) Primary::get()->readyReplica(*this);
}
- // Notify Primary that a subscription is ready.
- QPID_LOG(debug, logPrefix << "Caught up");
- if (Primary::get()) Primary::get()->readyReplica(*this);
}
// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
{
+ {
+ Mutex::ScopedLock l(lock);
+ if (cancelled) return;
+ cancelled = true;
+ }
QPID_LOG(debug, logPrefix << "Cancelled");
+ getQueue()->removeObserver(observer);
guard->cancel();
ConsumerImpl::cancel();
}
@@ -283,10 +267,15 @@ void ReplicatingSubscription::cancel()
// Consumer override, called on primary in the backup's IO thread.
void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
// Finish completion of message, it has been acknowledged by the backup.
- QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId());
- guard->complete(r.getMessageId());
- // If next message is protected by the guard then we are ready
- if (r.getMessageId() >= guard->getRange().back) setReady();
+ ReplicationId id = r.getReplicationId();
+ QPID_LOG(trace, logPrefix << "Acknowledged " <<
+ LogMessageId(*getQueue(), r.getMessageId(), r.getReplicationId()));
+ guard->complete(id);
+ {
+ Mutex::ScopedLock l(lock);
+ unacked -= id;
+ checkReady(l);
+ }
ConsumerImpl::acknowledged(r);
}
@@ -295,59 +284,36 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
{
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
- string buf(dequeues.encodedSize(),'\0');
- framing::Buffer buffer(&buf[0], buf.size());
- dequeues.encode(buffer);
+ string buffer = encodeStr(dequeues);
dequeues.clear();
- buffer.reset();
- {
- Mutex::ScopedUnlock u(lock);
- sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
- }
+ sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
}
-// Called via QueueObserver::dequeued override on guard.
// Called after the message has been removed
// from the deque and under the messageLock in the queue. Called in
// arbitrary connection threads.
-void ReplicatingSubscription::dequeued(const Message& m)
+void ReplicatingSubscription::dequeued(ReplicationId id)
{
- QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence());
+ QPID_LOG(trace, logPrefix << "Dequeued ID " << id);
{
Mutex::ScopedLock l(lock);
- dequeues.add(m.getSequence());
+ dequeues.add(id);
}
notify(); // Ensure a call to doDispatch
}
-// Called during construction while scanning for initial dequeues.
-void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) {
- QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]");
- {
- Mutex::ScopedLock l(lock);
- dequeues.add(first,last);
- }
-}
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock& l)
+void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l)
{
- if (pos == backupPosition) return; // No need to send.
- QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition);
- string buf(pos.encodedSize(),'\0');
- framing::Buffer buffer(&buf[0], buf.size());
- pos.encode(buffer);
- buffer.reset();
- {
- Mutex::ScopedUnlock u(lock);
- sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
- }
+ sendEvent(QueueReplicator::ID_EVENT_KEY, encodeStr(pos), l);
}
void ReplicatingSubscription::sendEvent(const std::string& key,
- const framing::Buffer& buffer,
+ const std::string& buffer,
Mutex::ScopedLock&)
{
+ Mutex::ScopedUnlock u(lock);
broker::Message message = makeMessage(buffer);
MessageTransfer& transfer = MessageTransfer::get(message);
DeliveryProperties* props =
@@ -370,7 +336,6 @@ bool ReplicatingSubscription::doDispatch()
return ConsumerImpl::doDispatch();
}
catch (const std::exception& e) {
- // FIXME aconway 2012-10-05: detect queue deletion, no warning.
QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what());
return false;
}