diff options
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 289 |
1 files changed, 127 insertions, 162 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index a9bd7b49f8..7b153f90ca 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -20,12 +20,16 @@ */ #include "makeMessage.h" +#include "IdSetter.h" #include "QueueGuard.h" -#include "QueueRange.h" #include "QueueReplicator.h" +#include "QueueSnapshots.h" #include "ReplicatingSubscription.h" #include "Primary.h" +#include "HaBroker.h" +#include "qpid/assert.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" @@ -35,6 +39,7 @@ #include "qpid/types/Uuid.h" #include <sstream> + namespace qpid { namespace ha { @@ -45,53 +50,20 @@ using sys::Mutex; using broker::amqp_0_10::MessageTransfer; const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription"); -const string ReplicatingSubscription::QPID_BACK("qpid.ha-back"); -const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front"); const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info"); +const string ReplicatingSubscription::QPID_ID_SET("qpid.ha-info"); -namespace { -const string DOLLAR("$"); -const string INTERNAL("-internal"); -} // namespace - -// Scan the queue for gaps and add them to the subscriptions dequed set. -class DequeueScanner -{ +class ReplicatingSubscription::QueueObserver : public broker::QueueObserver { public: - DequeueScanner( - ReplicatingSubscription& rs, - SequenceNumber front_, - SequenceNumber back_ // Inclusive - ) : subscription(rs), front(front_), back(back_) - { - assert(front <= back); - // INVARIANT deques have been added for positions <= at. - at = front - 1; - } - - void operator()(const Message& m) { - if (m.getSequence() >= front && m.getSequence() <= back) { - if (m.getSequence() > at+1) subscription.dequeued(at+1, m.getSequence()-1); - at = m.getSequence(); - } - } - - // Must call after scanning the queue. - void finish() { - if (at < back) subscription.dequeued(at+1, back); - } - + QueueObserver(ReplicatingSubscription& rs_) : rs(rs_) {} + void enqueued(const broker::Message&) {} + void dequeued(const broker::Message& m) { rs.dequeued(m.getReplicationId()); } + void acquired(const broker::Message&) {} + void requeued(const broker::Message&) {} private: - ReplicatingSubscription& subscription; - SequenceNumber front; - SequenceNumber back; - SequenceNumber at; + ReplicatingSubscription& rs; }; -string mask(const string& in) -{ - return DOLLAR + in + INTERNAL; -} /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr<broker::SemanticState::ConsumerImpl> @@ -110,6 +82,7 @@ ReplicatingSubscription::Factory::create( boost::shared_ptr<ReplicatingSubscription> rs; if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { rs.reset(new ReplicatingSubscription( + haBroker, parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); rs->initialize(); @@ -117,7 +90,15 @@ ReplicatingSubscription::Factory::create( return rs; } +namespace { +void copyIf(boost::shared_ptr<MessageInterceptor> from, boost::shared_ptr<IdSetter>& to) { + boost::shared_ptr<IdSetter> result = boost::dynamic_pointer_cast<IdSetter>(from); + if (result) to = result; +} +} // namespace + ReplicatingSubscription::ReplicatingSubscription( + HaBroker& hb, SemanticState* parent, const string& name, Queue::shared_ptr queue, @@ -130,7 +111,8 @@ ReplicatingSubscription::ReplicatingSubscription( const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag, resumeId, resumeTtl, arguments), - ready(false) + position(0), ready(false), cancelled(false), + haBroker(hb) { try { FieldTable ft; @@ -140,64 +122,57 @@ ReplicatingSubscription::ReplicatingSubscription( // Set a log prefix message that identifies the remote broker. ostringstream os; - os << "Primary " << queue->getName() << "@" << info << ": "; + os << "Subscription to " << queue->getName() << " at " << info << ": "; logPrefix = os.str(); - // NOTE: Once the guard is attached we can have concurrent - // calls to dequeued so we need to lock use of this->dequeues. - // - // However we must attach the guard _before_ we scan for - // initial dequeues to be sure we don't miss any dequeues - // between the scan and attaching the guard. + // If this is a non-cluster standalone replication then we need to + // set up an IdSetter if there is not already one. + boost::shared_ptr<IdSetter> idSetter; + queue->getMessageInterceptors().each( + boost::bind(©If, _1, boost::ref(idSetter))); + if (!idSetter) { + QPID_LOG(debug, logPrefix << "Standalone replication"); + queue->getMessageInterceptors().add( + boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), 1))); + } + + // If there's already a guard (we are in failover) use it, else create one. if (Primary::get()) guard = Primary::get()->getGuard(queue, info); if (!guard) guard.reset(new QueueGuard(*queue, info)); - guard->attach(*this); - QueueRange backup(arguments); // Remote backup range. - QueueRange backupOriginal(backup); - QueueRange primary(guard->getRange()); // Unguarded range when the guard was set. - backupPosition = backup.back; - - // Sync backup and primary queues, don't send messages already on the backup - - if (backup.front > primary.front || // Missing messages at front - backup.back < primary.front || // No overlap - primary.empty() || backup.empty()) // Empty + // NOTE: Once the observer is attached we can have concurrent + // calls to dequeued so we need to lock use of this->dequeues. + // + // However we must attach the observer _before_ we snapshot for + // initial dequeues to be sure we don't miss any dequeues + // between the snapshot and attaching the observer. + observer.reset(new QueueObserver(*this)); + queue->addObserver(observer); + ReplicationIdSet primary = haBroker.getQueueSnapshots()->get(queue)->snapshot(); + std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET); + ReplicationIdSet backup; + if (!backupStr.empty()) backup = decodeStr<ReplicationIdSet>(backupStr); + + // Initial dequeues are messages on backup but not on primary. + ReplicationIdSet initDequeues = backup - primary; + QueuePosition front,back; + queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue { - // No useful overlap - erase backup and start from the beginning - if (!backup.empty()) dequeued(backup.front, backup.back); - position = primary.front-1; + sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued() + dequeues += initDequeues; // Messages on backup that are not on primary. + skip = backup - initDequeues; // Messages already on the backup. + + // Queue front is moving but we know this subscriptions will start at a + // position >= front so if front is safe then position must be. + position = front; + + QPID_LOG(debug, logPrefix << "Subscribed: front " << front + << ", back " << back + << ", start " << position + << ", guarded " << guard->getFirst() + << ", on backup " << skip); + checkReady(l); } - else { // backup and primary do overlap. - // Remove messages from backup that are not in primary. - if (primary.back < backup.back) { - dequeued(primary.back+1, backup.back); // Trim excess messages at back - backup.back = primary.back; - } - if (backup.front < primary.front) { - dequeued(backup.front, primary.front-1); // Trim excess messages at front - backup.front = primary.front; - } - DequeueScanner scan(*this, backup.front, backup.back); - // FIXME aconway 2012-06-15: Optimize queue traversal, only in range. - queue->eachMessage(boost::ref(scan)); // Remove missing messages in between. - scan.finish(); - position = backup.back; - //move cursor to position - queue->seek(*this, position); - } - // NOTE: we are assuming that the messages that are on the backup are - // consistent with those on the primary. If the backup is a replica - // queue and hasn't been tampered with then that will be the case. - - QPID_LOG(debug, logPrefix << "Subscribed:" - << " backup:" << backupOriginal << " adjusted backup:" << backup - << " primary:" << primary - << " catch-up: " << position << "-" << primary.back - << "(" << primary.back-position << ")"); - - // Check if we are ready yet. - if (guard->subscriptionStart(position)) setReady(); } catch (const std::exception& e) { QPID_LOG(error, logPrefix << "Creation error: " << e.what() @@ -208,6 +183,7 @@ ReplicatingSubscription::ReplicatingSubscription( ReplicatingSubscription::~ReplicatingSubscription() {} + // Called in subscription's connection thread when the subscription is created. // Called separate from ctor because sending events requires // shared_from_this @@ -215,12 +191,9 @@ ReplicatingSubscription::~ReplicatingSubscription() {} void ReplicatingSubscription::initialize() { try { Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. - - // Send initial dequeues and position to the backup. + // Send initial dequeues to the backup. // There must be a shared_ptr(this) when sending. sendDequeueEvent(l); - sendPositionEvent(position, l); - backupPosition = position; } catch (const std::exception& e) { QPID_LOG(error, logPrefix << "Initialization error: " << e.what() @@ -229,53 +202,64 @@ void ReplicatingSubscription::initialize() { } } +// True if the next position for the ReplicatingSubscription is a guarded position. +bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) { + return position+1 >= guard->getFirst(); +} + // Message is delivered in the subscription's connection thread. bool ReplicatingSubscription::deliver( const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) { + Mutex::ScopedLock l(lock); + ReplicationId id = m.getReplicationId(); + position = m.getSequence(); try { - QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence()); - { - Mutex::ScopedLock l(lock); - position = m.getSequence(); - - // m.getSequence() is the position of the new message on local queue. - // backupPosition is latest position on backup queue before enqueueing - if (m.getSequence() <= backupPosition) - throw Exception( - QPID_MSG(logPrefix << "Expected position > " << backupPosition - << " but got " << m.getSequence())); - if (m.getSequence() - backupPosition > 1) { - // Position has advanced because of messages dequeued ahead of us. - // Send the position before message was enqueued. - sendPositionEvent(m.getSequence()-1, l); - } - // Backup will automatically advance by 1 on delivery of message. - backupPosition = m.getSequence(); + bool result = false; + if (skip.contains(id)) { + skip -= id; + guard->complete(id); // This will never be acknowledged. + result = false; + } + else { + QPID_LOG(trace, logPrefix << "Replicated " << LogMessageId(*getQueue(), m)); + // Only consider unguarded messages for ready status. + if (!ready && !isGuarded(l)) unacked += id; + sendIdEvent(id, l); + result = ConsumerImpl::deliver(c, m); } - return ConsumerImpl::deliver(c, m); + checkReady(l); + return result; } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence() + QPID_LOG(critical, logPrefix << "Error replicating " << LogMessageId(*getQueue(), m) << ": " << e.what()); throw; } } -void ReplicatingSubscription::setReady() { - { - Mutex::ScopedLock l(lock); - if (ready) return; +/** + *@param position: must be <= last position seen by subscription. + */ +void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) { + if (!ready && isGuarded(l) && unacked.empty()) { ready = true; + sys::Mutex::ScopedUnlock u(lock); + // Notify Primary that a subscription is ready. + QPID_LOG(debug, logPrefix << "Caught up"); + if (Primary::get()) Primary::get()->readyReplica(*this); } - // Notify Primary that a subscription is ready. - QPID_LOG(debug, logPrefix << "Caught up"); - if (Primary::get()) Primary::get()->readyReplica(*this); } // Called in the subscription's connection thread. void ReplicatingSubscription::cancel() { + { + Mutex::ScopedLock l(lock); + if (cancelled) return; + cancelled = true; + } QPID_LOG(debug, logPrefix << "Cancelled"); + getQueue()->removeObserver(observer); guard->cancel(); ConsumerImpl::cancel(); } @@ -283,10 +267,15 @@ void ReplicatingSubscription::cancel() // Consumer override, called on primary in the backup's IO thread. void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { // Finish completion of message, it has been acknowledged by the backup. - QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId()); - guard->complete(r.getMessageId()); - // If next message is protected by the guard then we are ready - if (r.getMessageId() >= guard->getRange().back) setReady(); + ReplicationId id = r.getReplicationId(); + QPID_LOG(trace, logPrefix << "Acknowledged " << + LogMessageId(*getQueue(), r.getMessageId(), r.getReplicationId())); + guard->complete(id); + { + Mutex::ScopedLock l(lock); + unacked -= id; + checkReady(l); + } ConsumerImpl::acknowledged(r); } @@ -295,59 +284,36 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l) { if (dequeues.empty()) return; QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); - string buf(dequeues.encodedSize(),'\0'); - framing::Buffer buffer(&buf[0], buf.size()); - dequeues.encode(buffer); + string buffer = encodeStr(dequeues); dequeues.clear(); - buffer.reset(); - { - Mutex::ScopedUnlock u(lock); - sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); - } + sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); } -// Called via QueueObserver::dequeued override on guard. // Called after the message has been removed // from the deque and under the messageLock in the queue. Called in // arbitrary connection threads. -void ReplicatingSubscription::dequeued(const Message& m) +void ReplicatingSubscription::dequeued(ReplicationId id) { - QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence()); + QPID_LOG(trace, logPrefix << "Dequeued ID " << id); { Mutex::ScopedLock l(lock); - dequeues.add(m.getSequence()); + dequeues.add(id); } notify(); // Ensure a call to doDispatch } -// Called during construction while scanning for initial dequeues. -void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) { - QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]"); - { - Mutex::ScopedLock l(lock); - dequeues.add(first,last); - } -} // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock& l) +void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l) { - if (pos == backupPosition) return; // No need to send. - QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition); - string buf(pos.encodedSize(),'\0'); - framing::Buffer buffer(&buf[0], buf.size()); - pos.encode(buffer); - buffer.reset(); - { - Mutex::ScopedUnlock u(lock); - sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l); - } + sendEvent(QueueReplicator::ID_EVENT_KEY, encodeStr(pos), l); } void ReplicatingSubscription::sendEvent(const std::string& key, - const framing::Buffer& buffer, + const std::string& buffer, Mutex::ScopedLock&) { + Mutex::ScopedUnlock u(lock); broker::Message message = makeMessage(buffer); MessageTransfer& transfer = MessageTransfer::get(message); DeliveryProperties* props = @@ -370,7 +336,6 @@ bool ReplicatingSubscription::doDispatch() return ConsumerImpl::doDispatch(); } catch (const std::exception& e) { - // FIXME aconway 2012-10-05: detect queue deletion, no warning. QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what()); return false; } |
