summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /cpp/src/qpid/ha/ReplicatingSubscription.cpp
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp400
1 files changed, 255 insertions, 145 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 91a4538bc4..c960758eaf 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -19,13 +19,18 @@
*
*/
+#include "QueueGuard.h"
+#include "QueueRange.h"
+#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
+#include "Primary.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
+#include "qpid/types/Uuid.h"
#include <sstream>
namespace qpid {
@@ -34,19 +39,90 @@ namespace ha {
using namespace framing;
using namespace broker;
using namespace std;
+using sys::Mutex;
-const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
+const string ReplicatingSubscription::QPID_BACK("qpid.ha-back");
+const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front");
+const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info");
namespace {
const string DOLLAR("$");
const string INTERNAL("-internal");
} // namespace
+// Scan the queue for gaps and add them to the subscriptions dequed set.
+class DequeueScanner
+{
+ public:
+ DequeueScanner(
+ ReplicatingSubscription& rs,
+ SequenceNumber front_,
+ SequenceNumber back_ // Inclusive
+ ) : subscription(rs), front(front_), back(back_)
+ {
+ assert(front <= back);
+ // INVARIANT deques have been added for positions <= at.
+ at = front - 1;
+ }
+
+ void operator()(const QueuedMessage& qm) {
+ if (qm.position >= front && qm.position <= back) {
+ if (qm.position > at+1) subscription.dequeued(at+1, qm.position-1);
+ at = qm.position;
+ }
+ }
+
+ // Must call after scanning the queue.
+ void finish() {
+ if (at < back) subscription.dequeued(at+1, back);
+ }
+
+ private:
+ ReplicatingSubscription& subscription;
+ SequenceNumber front;
+ SequenceNumber back;
+ SequenceNumber at;
+};
+
string mask(const string& in)
{
return DOLLAR + in + INTERNAL;
}
+
+/** Dummy consumer used to get the front position on the queue */
+class GetPositionConsumer : public Consumer
+{
+ public:
+ GetPositionConsumer() :
+ Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {}
+ bool deliver(broker::QueuedMessage& ) { return true; }
+ void notify() {}
+ bool filter(boost::intrusive_ptr<broker::Message>) { return true; }
+ bool accept(boost::intrusive_ptr<broker::Message>) { return true; }
+ void cancel() {}
+ void acknowledged(const broker::QueuedMessage&) {}
+ bool browseAcquired() const { return true; }
+ broker::OwnershipToken* getSession() { return 0; }
+};
+
+
+bool ReplicatingSubscription::getNext(
+ broker::Queue& q, SequenceNumber from, SequenceNumber& result)
+{
+ boost::shared_ptr<Consumer> c(new GetPositionConsumer);
+ c->setPosition(from);
+ if (!q.dispatch(c)) return false;
+ result = c->getPosition();
+ return true;
+}
+
+bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) {
+ // FIXME aconway 2012-05-23: won't wrap, assumes 0 is < all messages in queue.
+ return getNext(q, 0, front);
+}
+
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
@@ -66,7 +142,7 @@ ReplicatingSubscription::Factory::create(
rs.reset(new ReplicatingSubscription(
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
- queue->addObserver(rs);
+ rs->initialize();
}
return rs;
}
@@ -84,179 +160,223 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments),
- events(new Queue(mask(name))),
- consumer(new DelegatingConsumer(*this))
+ dummy(new Queue(mask(name))),
+ ready(false)
{
- // Separate the remote part from a "local-remote" address.
- string address = parent->getSession().getConnection().getUrl();
- size_t i = address.find('-');
- if (i != string::npos) address = address.substr(i+1);
- logPrefix = "HA: Primary ";
- stringstream ss;
- logSuffix = " (" + address + ")";
-
- // FIXME aconway 2011-12-09: Failover optimization removed.
- // There was code here to re-use messages already on the backup
- // during fail-over. This optimization was removed to simplify
- // the logic till we get the basic replication stable, it
- // can be re-introduced later. Last revision with the optimization:
- // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
-
- QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix);
-
- // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
- // so we will start consuming from the lowest numbered message.
- // This is incorrect if the sequence number wraps around, but
- // this is what all consumers currently do.
-}
-
-// Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(QueuedMessage& m) {
try {
- // Add position events for the subscribed queue, not for the internal event queue.
- if (m.queue && m.queue == getQueue().get()) {
- sys::Mutex::ScopedLock l(lock);
- if (position != m.position)
- throw Exception(
- QPID_MSG("Expected position " << position
- << " but got " << m.position));
- // m.position is the position of the newly enqueued m on the local queue.
- // backupPosition is latest position on the backup queue (before enqueueing m.)
- if (m.position <= backupPosition)
- throw Exception(
- QPID_MSG("Expected position > " << backupPosition
- << " but got " << m.position));
-
- if (m.position - backupPosition > 1) {
- // Position has advanced because of messages dequeued ahead of us.
- SequenceNumber send(m.position);
- --send; // Send the position before m was enqueued.
- sendPositionEvent(send, l);
+ FieldTable ft;
+ if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
+ throw Exception("Replicating subscription does not have broker info: " + tag);
+ info.assign(ft);
+
+ // Set a log prefix message that identifies the remote broker.
+ ostringstream os;
+ os << "Primary " << queue->getName() << "@" << info.getLogId() << ": ";
+ logPrefix = os.str();
+
+ // NOTE: Once the guard is attached we can have concurrent
+ // calls to dequeued so we need to lock use of this->dequeues.
+ //
+ // However we must attach the guard _before_ we scan for
+ // initial dequeues to be sure we don't miss any dequeues
+ // between the scan and attaching the guard.
+ if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
+ if (!guard) guard.reset(new QueueGuard(*queue, info));
+ guard->attach(*this);
+
+ QueueRange backup(arguments); // Remote backup range.
+ QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
+ backupPosition = backup.back;
+
+ // Sync backup and primary queues, don't send messages already on the backup
+
+ if (backup.front > primary.front || // Missing messages at front
+ backup.back < primary.front || // No overlap
+ primary.empty() || backup.empty()) // Empty
+ {
+ // No useful overlap - erase backup and start from the beginning
+ if (!backup.empty()) dequeued(backup.front, backup.back);
+ position = primary.front-1;
+ }
+ else { // backup and primary do overlap.
+ // Remove messages from backup that are not in primary.
+ if (primary.back < backup.back) {
+ dequeued(primary.back+1, backup.back); // Trim excess messages at back
+ backup.back = primary.back;
}
- backupPosition = m.position;
- QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix);
+ if (backup.front < primary.front) {
+ dequeued(backup.front, primary.front-1); // Trim excess messages at front
+ backup.front = primary.front;
+ }
+ DequeueScanner scan(*this, backup.front, backup.back);
+ // FIXME aconway 2012-06-15: Optimize queue traversal, only in range.
+ queue->eachMessage(boost::ref(scan)); // Remove missing messages in between.
+ scan.finish();
+ position = backup.back;
}
- return ConsumerImpl::deliver(m);
- } catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName()
- << logSuffix << ": " << e.what());
+ // NOTE: we are assuming that the messages that are on the backup are
+ // consistent with those on the primary. If the backup is a replica
+ // queue and hasn't been tampered with then that will be the case.
+
+ QPID_LOG(debug, logPrefix << "Subscribed:"
+ << " backup:" << backup
+ << " primary:" << primary
+ << " catch-up: " << position << "-" << primary.back
+ << "(" << primary.back-position << ")");
+
+ // Check if we are ready yet.
+ if (guard->subscriptionStart(position)) setReady();
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Creation error: " << e.what()
+ << ": arguments=" << getArguments());
throw;
}
}
-ReplicatingSubscription::~ReplicatingSubscription() {}
-
+ReplicatingSubscription::~ReplicatingSubscription() {
+ QPID_LOG(debug, logPrefix << "Detroyed replicating subscription");
+}
-// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg
+// Called in subscription's connection thread when the subscription is created.
+// Called separate from ctor because sending events requires
+// shared_from_this
+//
+void ReplicatingSubscription::initialize() {
+ try {
+ Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
-// Mark a message completed. May be called by acknowledge or dequeued
-void ReplicatingSubscription::complete(
- const QueuedMessage& qm, const sys::Mutex::ScopedLock&)
-{
- // Handle completions for the subscribed queue, not the internal event queue.
- if (qm.queue && qm.queue == getQueue().get()) {
- QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix);
- Delayed::iterator i= delayed.find(qm.position);
- // The same message can be completed twice, by acknowledged and
- // dequeued, remove it from the set so it only gets completed
- // once.
- if (i != delayed.end()) {
- assert(i->second.payload == qm.payload);
- qm.payload->getIngressCompletion().finishCompleter();
- delayed.erase(i);
- }
+ // Send initial dequeues and position to the backup.
+ // There must be a shared_ptr(this) when sending.
+ sendDequeueEvent(l);
+ sendPositionEvent(position, l);
+ backupPosition = position;
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Initialization error: " << e.what()
+ << ": arguments=" << getArguments());
+ throw;
}
}
-// Called before we get notified of the message being available and
-// under the message lock in the queue. Called in arbitrary connection thread.
-void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
- sys::Mutex::ScopedLock l(lock);
- // Delay completion
- QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix);
- qm.payload->getIngressCompletion().startCompleter();
- assert(delayed.find(qm.position) == delayed.end());
- delayed[qm.position] = qm;
+// Message is delivered in the subscription's connection thread.
+bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
+ try {
+ // Add position events for the subscribed queue, not the internal event queue.
+ if (qm.queue == getQueue().get()) {
+ QPID_LOG(trace, logPrefix << "Replicating " << qm);
+ {
+ Mutex::ScopedLock l(lock);
+ assert(position == qm.position);
+ // qm.position is the position of the newly enqueued qm on local queue.
+ // backupPosition is latest position on backup queue before enqueueing
+ if (qm.position <= backupPosition)
+ throw Exception(
+ QPID_MSG("Expected position > " << backupPosition
+ << " but got " << qm.position));
+ if (qm.position - backupPosition > 1) {
+ // Position has advanced because of messages dequeued ahead of us.
+ // Send the position before qm was enqueued.
+ sendPositionEvent(qm.position-1, l);
+ }
+ // Backup will automatically advance by 1 on delivery of message.
+ backupPosition = qm.position;
+ }
+ }
+ return ConsumerImpl::deliver(qm);
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, logPrefix << "Error replicating " << qm
+ << ": " << e.what());
+ throw;
+ }
}
-
-// Function to complete a delayed message, called by cancel()
-void ReplicatingSubscription::cancelComplete(
- const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
-{
- QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix);
- v.second.payload->getIngressCompletion().finishCompleter();
+void ReplicatingSubscription::setReady() {
+ {
+ Mutex::ScopedLock l(lock);
+ if (ready) return;
+ ready = true;
+ }
+ // Notify Primary that a subscription is ready.
+ QPID_LOG(debug, logPrefix << "Caught up");
+ if (Primary::get()) Primary::get()->readyReplica(*this);
}
// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
{
- getQueue()->removeObserver(
- boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
- {
- sys::Mutex::ScopedLock l(lock);
- QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix);
- for_each(delayed.begin(), delayed.end(),
- boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
- delayed.clear();
- }
+ guard->cancel();
ConsumerImpl::cancel();
}
-// Called on primary in the backups IO thread.
-void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
- sys::Mutex::ScopedLock l(lock);
- // Finish completion of message, it has been acknowledged by the backup.
- complete(msg, l);
+// Consumer override, called on primary in the backup's IO thread.
+void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
+ if (qm.queue == getQueue().get()) { // Don't complete messages on the internal queue
+ // Finish completion of message, it has been acknowledged by the backup.
+ QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
+ guard->complete(qm);
+ // If next message is protected by the guard then we are ready
+ if (qm.position >= guard->getRange().back) setReady();
+ }
+ ConsumerImpl::acknowledged(qm);
}
-// Hide the "queue deleted" error for a ReplicatingSubscription when a
-// queue is deleted, this is normal and not an error.
-bool ReplicatingSubscription::hideDeletedError() { return true; }
-
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
+void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
{
- QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues
- << " from " << getQueue()->getName() << logSuffix);
+ if (dequeues.empty()) return;
+ QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
string buf(dequeues.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
dequeues.encode(buffer);
dequeues.clear();
buffer.reset();
- sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
+ {
+ Mutex::ScopedUnlock u(lock);
+ sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer);
+ }
}
-// Called after the message has been removed from the deque and under
-// the messageLock in the queue. Called in arbitrary connection threads.
+// Called via QueueObserver::dequeued override on guard.
+// Called after the message has been removed
+// from the deque and under the messageLock in the queue. Called in
+// arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
+ assert (qm.queue == getQueue().get());
+ QPID_LOG(trace, logPrefix << "Dequeued " << qm);
{
- sys::Mutex::ScopedLock l(lock);
- QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix);
+ Mutex::ScopedLock l(lock);
dequeues.add(qm.position);
- // If we have not yet sent this message to the backup, then
- // complete it now as it will never be accepted.
- if (qm.position > position) complete(qm, l);
}
notify(); // Ensure a call to doDispatch
}
+// Called during construction while scanning for initial dequeues.
+void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) {
+ QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]");
+ {
+ Mutex::ScopedLock l(lock);
+ dequeues.add(first,last);
+ }
+}
+
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(
- SequenceNumber position, const sys::Mutex::ScopedLock&l )
+void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&)
{
- QPID_LOG(trace, logPrefix << "sending position " << position
- << ", was " << backupPosition << logSuffix);
- string buf(backupPosition.encodedSize(),'\0');
+ if (pos == backupPosition) return; // No need to send.
+ QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition);
+ string buf(pos.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
- position.encode(buffer);
+ pos.encode(buffer);
buffer.reset();
- sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
+ {
+ Mutex::ScopedUnlock u(lock);
+ sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer);
+ }
}
-void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer,
- const sys::Mutex::ScopedLock&)
+void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer)
{
//generate event message
boost::intrusive_ptr<Message> event = new Message();
@@ -276,15 +396,14 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer&
event->getFrames().append(header);
event->getFrames().append(content);
- DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ DeliveryProperties* props =
+ event->getFrames().getHeaders()->get<DeliveryProperties>(true);
props->setRoutingKey(key);
- // Send the event using the events queue. Consumer is a
- // DelegatingConsumer that delegates to *this for everything but
- // has an independnet position. We put an event on events and
- // dispatch it through ourselves to send it in line with the
- // normal browsing messages.
- events->deliver(event);
- events->dispatch(consumer);
+ // Send the event directly to the base consumer implementation.
+ // We don't really need a queue here but we pass a dummy queue
+ // to conform to the consumer API.
+ QueuedMessage qm(dummy.get(), event);
+ ConsumerImpl::deliver(qm);
}
@@ -292,19 +411,10 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer&
bool ReplicatingSubscription::doDispatch()
{
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
if (!dequeues.empty()) sendDequeueEvent(l);
}
return ConsumerImpl::doDispatch();
}
-ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
-ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
-bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); }
-void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
-bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
-bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
-bool ReplicatingSubscription::DelegatingConsumer::browseAcquired() const { return delegate.browseAcquired(); }
-OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
-
}} // namespace qpid::ha