diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 95 |
1 files changed, 47 insertions, 48 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index cdfe9dd888..d0b93da85f 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -25,15 +25,16 @@ #include "QueueReplicator.h" #include "QueueSnapshots.h" #include "ReplicatingSubscription.h" +#include "TxReplicatingSubscription.h" #include "Primary.h" #include "HaBroker.h" #include "qpid/assert.h" #include "qpid/broker/Queue.h" -#include "qpid/broker/QueueObserver.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" #include "qpid/types/Uuid.h" #include <sstream> @@ -47,22 +48,12 @@ using namespace broker; using namespace std; using sys::Mutex; using broker::amqp_0_10::MessageTransfer; - -const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription"); -const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info"); -const string ReplicatingSubscription::QPID_ID_SET("qpid.ha-info"); - -class ReplicatingSubscription::QueueObserver : public broker::QueueObserver { - public: - QueueObserver(ReplicatingSubscription& rs_) : rs(rs_) {} - void enqueued(const broker::Message&) {} - void dequeued(const broker::Message& m) { rs.dequeued(m.getReplicationId()); } - void acquired(const broker::Message&) {} - void requeued(const broker::Message&) {} - private: - ReplicatingSubscription& rs; -}; - +namespace { const string QPID_HA(QPID_HA_PREFIX); } +const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION(QPID_HA+"repsub"); +const string ReplicatingSubscription::QPID_BROKER_INFO(QPID_HA+"info"); +const string ReplicatingSubscription::QPID_ID_SET(QPID_HA+"ids"); +const string ReplicatingSubscription::QPID_QUEUE_REPLICATOR(QPID_HA+"qrep"); +const string ReplicatingSubscription::QPID_TX_REPLICATOR(QPID_HA+"txrep"); /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr<broker::SemanticState::ConsumerImpl> @@ -79,13 +70,20 @@ ReplicatingSubscription::Factory::create( const framing::FieldTable& arguments ) { boost::shared_ptr<ReplicatingSubscription> rs; - if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { + std::string type = arguments.getAsString(QPID_REPLICATING_SUBSCRIPTION); + if (type == QPID_QUEUE_REPLICATOR) { rs.reset(new ReplicatingSubscription( haBroker, parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); - rs->initialize(); } + else if (type == QPID_TX_REPLICATOR) { + rs.reset(new TxReplicatingSubscription( + haBroker, + parent, name, queue, ack, acquire, exclusive, tag, + resumeId, resumeTtl, arguments)); + } + if (rs) rs->initialize(); return rs; } @@ -100,7 +98,7 @@ ReplicatingSubscription::ReplicatingSubscription( HaBroker& hb, SemanticState* parent, const string& name, - Queue::shared_ptr queue, + Queue::shared_ptr queue_, bool ack, bool /*acquire*/, bool exclusive, @@ -108,16 +106,22 @@ ReplicatingSubscription::ReplicatingSubscription( const string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments -) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag, +) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag, resumeId, resumeTtl, arguments), position(0), ready(false), cancelled(false), haBroker(hb), primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole())) -{ +{} + +// Called in subscription's connection thread when the subscription is created. +// Separate from ctor because we need to use shared_from_this +// +void ReplicatingSubscription::initialize() { try { FieldTable ft; - if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) - throw Exception("Replicating subscription does not have broker info: " + tag); + if (!getArguments().getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) + throw InvalidArgumentException( + logPrefix+"Can't subscribe, no broker info: "+getTag()); info.assign(ft); // Set a log prefix message that identifies the remote broker. @@ -147,10 +151,17 @@ ReplicatingSubscription::ReplicatingSubscription( // However we must attach the observer _before_ we snapshot for // initial dequeues to be sure we don't miss any dequeues // between the snapshot and attaching the observer. - observer.reset(new QueueObserver(*this)); - queue->addObserver(observer); - ReplicationIdSet primaryIds = haBroker.getQueueSnapshots()->get(queue)->snapshot(); - std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET); + queue->addObserver( + boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this())); + boost::shared_ptr<QueueSnapshot> snapshot = haBroker.getQueueSnapshots()->get(queue); + // There may be no snapshot if the queue is being deleted concurrently. + if (!snapshot) { + queue->removeObserver( + boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this())); + throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted"); + } + ReplicationIdSet primaryIds = snapshot->snapshot(); + std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET); ReplicationIdSet backupIds; if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr); @@ -172,23 +183,7 @@ ReplicatingSubscription::ReplicatingSubscription( << ", on backup " << skip); checkReady(l); } - } - catch (const std::exception& e) { - QPID_LOG(error, logPrefix << "Creation error: " << e.what() - << ": arguments=" << getArguments()); - throw; - } -} -ReplicatingSubscription::~ReplicatingSubscription() {} - - -// Called in subscription's connection thread when the subscription is created. -// Called separate from ctor because sending events requires -// shared_from_this -// -void ReplicatingSubscription::initialize() { - try { if (primary) primary->addReplica(*this); Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. // Send initial dequeues to the backup. @@ -196,12 +191,14 @@ void ReplicatingSubscription::initialize() { sendDequeueEvent(l); } catch (const std::exception& e) { - QPID_LOG(error, logPrefix << "Initialization error: " << e.what() - << ": arguments=" << getArguments()); + QPID_LOG(error, logPrefix << "Subscribe failed: " << e.what()); throw; } } +ReplicatingSubscription::~ReplicatingSubscription() {} + + // True if the next position for the ReplicatingSubscription is a guarded position. bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) { return position+1 >= guard->getFirst(); @@ -258,7 +255,8 @@ void ReplicatingSubscription::cancel() } QPID_LOG(debug, logPrefix << "Cancelled"); if (primary) primary->removeReplica(*this); - getQueue()->removeObserver(observer); + getQueue()->removeObserver( + boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this())); guard->cancel(); ConsumerImpl::cancel(); } @@ -289,8 +287,9 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l) // Called after the message has been removed // from the deque and under the messageLock in the queue. Called in // arbitrary connection threads. -void ReplicatingSubscription::dequeued(ReplicationId id) +void ReplicatingSubscription::dequeued(const broker::Message& m) { + ReplicationId id = m.getReplicationId(); QPID_LOG(trace, logPrefix << "Dequeued ID " << id); { Mutex::ScopedLock l(lock); |