diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 42 |
1 files changed, 17 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 2001ec5332..9f464f8066 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -19,7 +19,7 @@ * */ -#include "makeMessage.h" +#include "Event.h" #include "IdSetter.h" #include "QueueGuard.h" #include "QueueReplicator.h" @@ -36,6 +36,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include "qpid/types/Uuid.h" +#include <boost/pointer_cast.hpp> #include <sstream> @@ -45,6 +46,7 @@ namespace ha { using namespace framing; using namespace broker; using namespace std; +using namespace boost; using sys::Mutex; using broker::amqp_0_10::MessageTransfer; @@ -111,7 +113,8 @@ ReplicatingSubscription::ReplicatingSubscription( ) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag, resumeId, resumeTtl, arguments), position(0), ready(false), cancelled(false), - haBroker(hb) + haBroker(hb), + primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole())) { try { FieldTable ft; @@ -137,8 +140,6 @@ ReplicatingSubscription::ReplicatingSubscription( } // If there's already a guard (we are in failover) use it, else create one. - boost::shared_ptr<Primary> primary = - boost::dynamic_pointer_cast<Primary>(haBroker.getRole()); if (primary) guard = primary->getGuard(queue, info); if (!guard) guard.reset(new QueueGuard(*queue, info)); @@ -163,7 +164,6 @@ ReplicatingSubscription::ReplicatingSubscription( sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued() dequeues += initDequeues; // Messages on backup that are not on primary. skip = backupIds - initDequeues; // Messages already on the backup. - // Queue front is moving but we know this subscriptions will start at a // position >= front so if front is safe then position must be. position = front; @@ -191,6 +191,7 @@ ReplicatingSubscription::~ReplicatingSubscription() {} // void ReplicatingSubscription::initialize() { try { + if (primary) primary->addReplica(*this); Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. // Send initial dequeues to the backup. // There must be a shared_ptr(this) when sending. @@ -218,9 +219,8 @@ bool ReplicatingSubscription::deliver( try { bool result = false; if (skip.contains(id)) { + QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m)); skip -= id; - QPID_LOG(trace, logPrefix << "On backup, skip " << - LogMessageId(*getQueue(), m)); guard->complete(id); // This will never be acknowledged. notify(); result = true; @@ -240,17 +240,12 @@ bool ReplicatingSubscription::deliver( } } -/** - *@param position: must be <= last position seen by subscription. - */ void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) { if (!ready && isGuarded(l) && unready.empty()) { ready = true; sys::Mutex::ScopedUnlock u(lock); // Notify Primary that a subscription is ready. QPID_LOG(debug, logPrefix << "Caught up"); - boost::shared_ptr<Primary> primary = - boost::dynamic_pointer_cast<Primary>(haBroker.getRole()); if (primary) primary->readyReplica(*this); } } @@ -264,6 +259,7 @@ void ReplicatingSubscription::cancel() cancelled = true; } QPID_LOG(debug, logPrefix << "Cancelled"); + if (primary) primary->removeReplica(*this); getQueue()->removeObserver(observer); guard->cancel(); ConsumerImpl::cancel(); @@ -289,9 +285,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l) { if (dequeues.empty()) return; QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); - string buffer = encodeStr(dequeues); - dequeues.clear(); - sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); + sendEvent(DequeueEvent(dequeues), l); } // Called after the message has been removed @@ -311,23 +305,16 @@ void ReplicatingSubscription::dequeued(ReplicationId id) // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l) { - sendEvent(QueueReplicator::ID_EVENT_KEY, encodeStr(pos), l); + sendEvent(IdEvent(pos), l); } -void ReplicatingSubscription::sendEvent(const std::string& key, - const std::string& buffer, - Mutex::ScopedLock&) +void ReplicatingSubscription::sendEvent(const Event& event, Mutex::ScopedLock&) { Mutex::ScopedUnlock u(lock); - broker::Message message = makeMessage(buffer); - MessageTransfer& transfer = MessageTransfer::get(message); - DeliveryProperties* props = - transfer.getFrames().getHeaders()->get<DeliveryProperties>(true); - props->setRoutingKey(key); // Send the event directly to the base consumer implementation. The dummy // consumer prevents acknowledgements being handled, which is what we want // for events - ConsumerImpl::deliver(QueueCursor(), message, boost::shared_ptr<Consumer>()); + ConsumerImpl::deliver(QueueCursor(), event.message(), boost::shared_ptr<Consumer>()); } // Called in subscription's connection thread. @@ -346,4 +333,9 @@ bool ReplicatingSubscription::doDispatch() } } +void ReplicatingSubscription::addSkip(const ReplicationIdSet& ids) { + Mutex::ScopedLock l(lock); + skip += ids; +} + }} // namespace qpid::ha |