diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
tree | ce493e10baa95f44be8beb5778ce51783463196d /cpp/src/qpid/ha/ReplicatingSubscription.cpp | |
parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 400 |
1 files changed, 255 insertions, 145 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 91a4538bc4..c960758eaf 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -19,13 +19,18 @@ * */ +#include "QueueGuard.h" +#include "QueueRange.h" +#include "QueueReplicator.h" #include "ReplicatingSubscription.h" +#include "Primary.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/ConnectionState.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" +#include "qpid/types/Uuid.h" #include <sstream> namespace qpid { @@ -34,19 +39,90 @@ namespace ha { using namespace framing; using namespace broker; using namespace std; +using sys::Mutex; -const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription"); +const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription"); +const string ReplicatingSubscription::QPID_BACK("qpid.ha-back"); +const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front"); +const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info"); namespace { const string DOLLAR("$"); const string INTERNAL("-internal"); } // namespace +// Scan the queue for gaps and add them to the subscriptions dequed set. +class DequeueScanner +{ + public: + DequeueScanner( + ReplicatingSubscription& rs, + SequenceNumber front_, + SequenceNumber back_ // Inclusive + ) : subscription(rs), front(front_), back(back_) + { + assert(front <= back); + // INVARIANT deques have been added for positions <= at. + at = front - 1; + } + + void operator()(const QueuedMessage& qm) { + if (qm.position >= front && qm.position <= back) { + if (qm.position > at+1) subscription.dequeued(at+1, qm.position-1); + at = qm.position; + } + } + + // Must call after scanning the queue. + void finish() { + if (at < back) subscription.dequeued(at+1, back); + } + + private: + ReplicatingSubscription& subscription; + SequenceNumber front; + SequenceNumber back; + SequenceNumber at; +}; + string mask(const string& in) { return DOLLAR + in + INTERNAL; } + +/** Dummy consumer used to get the front position on the queue */ +class GetPositionConsumer : public Consumer +{ + public: + GetPositionConsumer() : + Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {} + bool deliver(broker::QueuedMessage& ) { return true; } + void notify() {} + bool filter(boost::intrusive_ptr<broker::Message>) { return true; } + bool accept(boost::intrusive_ptr<broker::Message>) { return true; } + void cancel() {} + void acknowledged(const broker::QueuedMessage&) {} + bool browseAcquired() const { return true; } + broker::OwnershipToken* getSession() { return 0; } +}; + + +bool ReplicatingSubscription::getNext( + broker::Queue& q, SequenceNumber from, SequenceNumber& result) +{ + boost::shared_ptr<Consumer> c(new GetPositionConsumer); + c->setPosition(from); + if (!q.dispatch(c)) return false; + result = c->getPosition(); + return true; +} + +bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) { + // FIXME aconway 2012-05-23: won't wrap, assumes 0 is < all messages in queue. + return getNext(q, 0, front); +} + /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr<broker::SemanticState::ConsumerImpl> ReplicatingSubscription::Factory::create( @@ -66,7 +142,7 @@ ReplicatingSubscription::Factory::create( rs.reset(new ReplicatingSubscription( parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); - queue->addObserver(rs); + rs->initialize(); } return rs; } @@ -84,179 +160,223 @@ ReplicatingSubscription::ReplicatingSubscription( const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments), - events(new Queue(mask(name))), - consumer(new DelegatingConsumer(*this)) + dummy(new Queue(mask(name))), + ready(false) { - // Separate the remote part from a "local-remote" address. - string address = parent->getSession().getConnection().getUrl(); - size_t i = address.find('-'); - if (i != string::npos) address = address.substr(i+1); - logPrefix = "HA: Primary "; - stringstream ss; - logSuffix = " (" + address + ")"; - - // FIXME aconway 2011-12-09: Failover optimization removed. - // There was code here to re-use messages already on the backup - // during fail-over. This optimization was removed to simplify - // the logic till we get the basic replication stable, it - // can be re-introduced later. Last revision with the optimization: - // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. - - QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix); - - // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0 - // so we will start consuming from the lowest numbered message. - // This is incorrect if the sequence number wraps around, but - // this is what all consumers currently do. -} - -// Message is delivered in the subscription's connection thread. -bool ReplicatingSubscription::deliver(QueuedMessage& m) { try { - // Add position events for the subscribed queue, not for the internal event queue. - if (m.queue && m.queue == getQueue().get()) { - sys::Mutex::ScopedLock l(lock); - if (position != m.position) - throw Exception( - QPID_MSG("Expected position " << position - << " but got " << m.position)); - // m.position is the position of the newly enqueued m on the local queue. - // backupPosition is latest position on the backup queue (before enqueueing m.) - if (m.position <= backupPosition) - throw Exception( - QPID_MSG("Expected position > " << backupPosition - << " but got " << m.position)); - - if (m.position - backupPosition > 1) { - // Position has advanced because of messages dequeued ahead of us. - SequenceNumber send(m.position); - --send; // Send the position before m was enqueued. - sendPositionEvent(send, l); + FieldTable ft; + if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) + throw Exception("Replicating subscription does not have broker info: " + tag); + info.assign(ft); + + // Set a log prefix message that identifies the remote broker. + ostringstream os; + os << "Primary " << queue->getName() << "@" << info.getLogId() << ": "; + logPrefix = os.str(); + + // NOTE: Once the guard is attached we can have concurrent + // calls to dequeued so we need to lock use of this->dequeues. + // + // However we must attach the guard _before_ we scan for + // initial dequeues to be sure we don't miss any dequeues + // between the scan and attaching the guard. + if (Primary::get()) guard = Primary::get()->getGuard(queue, info); + if (!guard) guard.reset(new QueueGuard(*queue, info)); + guard->attach(*this); + + QueueRange backup(arguments); // Remote backup range. + QueueRange primary(guard->getRange()); // Unguarded range when the guard was set. + backupPosition = backup.back; + + // Sync backup and primary queues, don't send messages already on the backup + + if (backup.front > primary.front || // Missing messages at front + backup.back < primary.front || // No overlap + primary.empty() || backup.empty()) // Empty + { + // No useful overlap - erase backup and start from the beginning + if (!backup.empty()) dequeued(backup.front, backup.back); + position = primary.front-1; + } + else { // backup and primary do overlap. + // Remove messages from backup that are not in primary. + if (primary.back < backup.back) { + dequeued(primary.back+1, backup.back); // Trim excess messages at back + backup.back = primary.back; } - backupPosition = m.position; - QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix); + if (backup.front < primary.front) { + dequeued(backup.front, primary.front-1); // Trim excess messages at front + backup.front = primary.front; + } + DequeueScanner scan(*this, backup.front, backup.back); + // FIXME aconway 2012-06-15: Optimize queue traversal, only in range. + queue->eachMessage(boost::ref(scan)); // Remove missing messages in between. + scan.finish(); + position = backup.back; } - return ConsumerImpl::deliver(m); - } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName() - << logSuffix << ": " << e.what()); + // NOTE: we are assuming that the messages that are on the backup are + // consistent with those on the primary. If the backup is a replica + // queue and hasn't been tampered with then that will be the case. + + QPID_LOG(debug, logPrefix << "Subscribed:" + << " backup:" << backup + << " primary:" << primary + << " catch-up: " << position << "-" << primary.back + << "(" << primary.back-position << ")"); + + // Check if we are ready yet. + if (guard->subscriptionStart(position)) setReady(); + } + catch (const std::exception& e) { + QPID_LOG(error, logPrefix << "Creation error: " << e.what() + << ": arguments=" << getArguments()); throw; } } -ReplicatingSubscription::~ReplicatingSubscription() {} - +ReplicatingSubscription::~ReplicatingSubscription() { + QPID_LOG(debug, logPrefix << "Detroyed replicating subscription"); +} -// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg +// Called in subscription's connection thread when the subscription is created. +// Called separate from ctor because sending events requires +// shared_from_this +// +void ReplicatingSubscription::initialize() { + try { + Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. -// Mark a message completed. May be called by acknowledge or dequeued -void ReplicatingSubscription::complete( - const QueuedMessage& qm, const sys::Mutex::ScopedLock&) -{ - // Handle completions for the subscribed queue, not the internal event queue. - if (qm.queue && qm.queue == getQueue().get()) { - QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix); - Delayed::iterator i= delayed.find(qm.position); - // The same message can be completed twice, by acknowledged and - // dequeued, remove it from the set so it only gets completed - // once. - if (i != delayed.end()) { - assert(i->second.payload == qm.payload); - qm.payload->getIngressCompletion().finishCompleter(); - delayed.erase(i); - } + // Send initial dequeues and position to the backup. + // There must be a shared_ptr(this) when sending. + sendDequeueEvent(l); + sendPositionEvent(position, l); + backupPosition = position; + } + catch (const std::exception& e) { + QPID_LOG(error, logPrefix << "Initialization error: " << e.what() + << ": arguments=" << getArguments()); + throw; } } -// Called before we get notified of the message being available and -// under the message lock in the queue. Called in arbitrary connection thread. -void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { - sys::Mutex::ScopedLock l(lock); - // Delay completion - QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix); - qm.payload->getIngressCompletion().startCompleter(); - assert(delayed.find(qm.position) == delayed.end()); - delayed[qm.position] = qm; +// Message is delivered in the subscription's connection thread. +bool ReplicatingSubscription::deliver(QueuedMessage& qm) { + try { + // Add position events for the subscribed queue, not the internal event queue. + if (qm.queue == getQueue().get()) { + QPID_LOG(trace, logPrefix << "Replicating " << qm); + { + Mutex::ScopedLock l(lock); + assert(position == qm.position); + // qm.position is the position of the newly enqueued qm on local queue. + // backupPosition is latest position on backup queue before enqueueing + if (qm.position <= backupPosition) + throw Exception( + QPID_MSG("Expected position > " << backupPosition + << " but got " << qm.position)); + if (qm.position - backupPosition > 1) { + // Position has advanced because of messages dequeued ahead of us. + // Send the position before qm was enqueued. + sendPositionEvent(qm.position-1, l); + } + // Backup will automatically advance by 1 on delivery of message. + backupPosition = qm.position; + } + } + return ConsumerImpl::deliver(qm); + } catch (const std::exception& e) { + QPID_LOG(critical, logPrefix << "Error replicating " << qm + << ": " << e.what()); + throw; + } } - -// Function to complete a delayed message, called by cancel() -void ReplicatingSubscription::cancelComplete( - const Delayed::value_type& v, const sys::Mutex::ScopedLock&) -{ - QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix); - v.second.payload->getIngressCompletion().finishCompleter(); +void ReplicatingSubscription::setReady() { + { + Mutex::ScopedLock l(lock); + if (ready) return; + ready = true; + } + // Notify Primary that a subscription is ready. + QPID_LOG(debug, logPrefix << "Caught up"); + if (Primary::get()) Primary::get()->readyReplica(*this); } // Called in the subscription's connection thread. void ReplicatingSubscription::cancel() { - getQueue()->removeObserver( - boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); - { - sys::Mutex::ScopedLock l(lock); - QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix); - for_each(delayed.begin(), delayed.end(), - boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l))); - delayed.clear(); - } + guard->cancel(); ConsumerImpl::cancel(); } -// Called on primary in the backups IO thread. -void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) { - sys::Mutex::ScopedLock l(lock); - // Finish completion of message, it has been acknowledged by the backup. - complete(msg, l); +// Consumer override, called on primary in the backup's IO thread. +void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) { + if (qm.queue == getQueue().get()) { // Don't complete messages on the internal queue + // Finish completion of message, it has been acknowledged by the backup. + QPID_LOG(trace, logPrefix << "Acknowledged " << qm); + guard->complete(qm); + // If next message is protected by the guard then we are ready + if (qm.position >= guard->getRange().back) setReady(); + } + ConsumerImpl::acknowledged(qm); } -// Hide the "queue deleted" error for a ReplicatingSubscription when a -// queue is deleted, this is normal and not an error. -bool ReplicatingSubscription::hideDeletedError() { return true; } - // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) +void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) { - QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues - << " from " << getQueue()->getName() << logSuffix); + if (dequeues.empty()) return; + QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); string buf(dequeues.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); dequeues.encode(buffer); dequeues.clear(); buffer.reset(); - sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); + { + Mutex::ScopedUnlock u(lock); + sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer); + } } -// Called after the message has been removed from the deque and under -// the messageLock in the queue. Called in arbitrary connection threads. +// Called via QueueObserver::dequeued override on guard. +// Called after the message has been removed +// from the deque and under the messageLock in the queue. Called in +// arbitrary connection threads. void ReplicatingSubscription::dequeued(const QueuedMessage& qm) { + assert (qm.queue == getQueue().get()); + QPID_LOG(trace, logPrefix << "Dequeued " << qm); { - sys::Mutex::ScopedLock l(lock); - QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix); + Mutex::ScopedLock l(lock); dequeues.add(qm.position); - // If we have not yet sent this message to the backup, then - // complete it now as it will never be accepted. - if (qm.position > position) complete(qm, l); } notify(); // Ensure a call to doDispatch } +// Called during construction while scanning for initial dequeues. +void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) { + QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]"); + { + Mutex::ScopedLock l(lock); + dequeues.add(first,last); + } +} + // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendPositionEvent( - SequenceNumber position, const sys::Mutex::ScopedLock&l ) +void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&) { - QPID_LOG(trace, logPrefix << "sending position " << position - << ", was " << backupPosition << logSuffix); - string buf(backupPosition.encodedSize(),'\0'); + if (pos == backupPosition) return; // No need to send. + QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition); + string buf(pos.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); - position.encode(buffer); + pos.encode(buffer); buffer.reset(); - sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l); + { + Mutex::ScopedUnlock u(lock); + sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer); + } } -void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer, - const sys::Mutex::ScopedLock&) +void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer) { //generate event message boost::intrusive_ptr<Message> event = new Message(); @@ -276,15 +396,14 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& event->getFrames().append(header); event->getFrames().append(content); - DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true); + DeliveryProperties* props = + event->getFrames().getHeaders()->get<DeliveryProperties>(true); props->setRoutingKey(key); - // Send the event using the events queue. Consumer is a - // DelegatingConsumer that delegates to *this for everything but - // has an independnet position. We put an event on events and - // dispatch it through ourselves to send it in line with the - // normal browsing messages. - events->deliver(event); - events->dispatch(consumer); + // Send the event directly to the base consumer implementation. + // We don't really need a queue here but we pass a dummy queue + // to conform to the consumer API. + QueuedMessage qm(dummy.get(), event); + ConsumerImpl::deliver(qm); } @@ -292,19 +411,10 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& bool ReplicatingSubscription::doDispatch() { { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); if (!dequeues.empty()) sendDequeueEvent(l); } return ConsumerImpl::doDispatch(); } -ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {} -ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {} -bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); } -void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } -bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); } -bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } -bool ReplicatingSubscription::DelegatingConsumer::browseAcquired() const { return delegate.browseAcquired(); } -OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } - }} // namespace qpid::ha |