diff options
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 124 |
1 files changed, 53 insertions, 71 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index c960758eaf..6f7519cd1f 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -27,6 +27,7 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/ConnectionState.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" @@ -66,10 +67,10 @@ class DequeueScanner at = front - 1; } - void operator()(const QueuedMessage& qm) { - if (qm.position >= front && qm.position <= back) { - if (qm.position > at+1) subscription.dequeued(at+1, qm.position-1); - at = qm.position; + void operator()(const Message& m) { + if (m.getSequence() >= front && m.getSequence() <= back) { + if (m.getSequence() > at+1) subscription.dequeued(at+1, m.getSequence()-1); + at = m.getSequence(); } } @@ -90,37 +91,23 @@ string mask(const string& in) return DOLLAR + in + INTERNAL; } - -/** Dummy consumer used to get the front position on the queue */ -class GetPositionConsumer : public Consumer +namespace { +bool getSequence(const Message& message, SequenceNumber& result) { - public: - GetPositionConsumer() : - Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {} - bool deliver(broker::QueuedMessage& ) { return true; } - void notify() {} - bool filter(boost::intrusive_ptr<broker::Message>) { return true; } - bool accept(boost::intrusive_ptr<broker::Message>) { return true; } - void cancel() {} - void acknowledged(const broker::QueuedMessage&) {} - bool browseAcquired() const { return true; } - broker::OwnershipToken* getSession() { return 0; } -}; - - + result = message.getSequence(); + return true; +} +} bool ReplicatingSubscription::getNext( broker::Queue& q, SequenceNumber from, SequenceNumber& result) { - boost::shared_ptr<Consumer> c(new GetPositionConsumer); - c->setPosition(from); - if (!q.dispatch(c)) return false; - result = c->getPosition(); - return true; + QueueCursor cursor(REPLICATOR); + return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), from); } bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) { - // FIXME aconway 2012-05-23: won't wrap, assumes 0 is < all messages in queue. - return getNext(q, 0, front); + QueueCursor cursor(REPLICATOR); + return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(front))); } /* Called by SemanticState::consume to create a consumer */ @@ -152,15 +139,14 @@ ReplicatingSubscription::ReplicatingSubscription( const string& name, Queue::shared_ptr queue, bool ack, - bool acquire, + bool /*acquire*/, bool exclusive, const string& tag, const string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments -) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, +) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag, resumeId, resumeTtl, arguments), - dummy(new Queue(mask(name))), ready(false) { try { @@ -213,6 +199,8 @@ ReplicatingSubscription::ReplicatingSubscription( queue->eachMessage(boost::ref(scan)); // Remove missing messages in between. scan.finish(); position = backup.back; + //move cursor to position + queue->seek(*this, position); } // NOTE: we are assuming that the messages that are on the backup are // consistent with those on the primary. If the backup is a replica @@ -260,32 +248,31 @@ void ReplicatingSubscription::initialize() { } // Message is delivered in the subscription's connection thread. -bool ReplicatingSubscription::deliver(QueuedMessage& qm) { +bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) { + position = m.getSequence(); try { - // Add position events for the subscribed queue, not the internal event queue. - if (qm.queue == getQueue().get()) { - QPID_LOG(trace, logPrefix << "Replicating " << qm); - { - Mutex::ScopedLock l(lock); - assert(position == qm.position); - // qm.position is the position of the newly enqueued qm on local queue. - // backupPosition is latest position on backup queue before enqueueing - if (qm.position <= backupPosition) - throw Exception( - QPID_MSG("Expected position > " << backupPosition - << " but got " << qm.position)); - if (qm.position - backupPosition > 1) { - // Position has advanced because of messages dequeued ahead of us. - // Send the position before qm was enqueued. - sendPositionEvent(qm.position-1, l); - } - // Backup will automatically advance by 1 on delivery of message. - backupPosition = qm.position; + QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName() << "[" << m.getSequence() << "]"); + { + Mutex::ScopedLock l(lock); + //FIXME GRS: position is no longer set//assert(position == m.getSequence()); + + // m.getSequence() is the position of the newly enqueued message on local queue. + // backupPosition is latest position on backup queue before enqueueing + if (m.getSequence() <= backupPosition) + throw Exception( + QPID_MSG("Expected position > " << backupPosition + << " but got " << m.getSequence())); + if (m.getSequence() - backupPosition > 1) { + // Position has advanced because of messages dequeued ahead of us. + // Send the position before message was enqueued. + sendPositionEvent(m.getSequence()-1, l); } + // Backup will automatically advance by 1 on delivery of message. + backupPosition = m.getSequence(); } - return ConsumerImpl::deliver(qm); + return ConsumerImpl::deliver(c, m); } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Error replicating " << qm + QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName() << "[" << m.getSequence() << "]" << ": " << e.what()); throw; } @@ -310,15 +297,13 @@ void ReplicatingSubscription::cancel() } // Consumer override, called on primary in the backup's IO thread. -void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) { - if (qm.queue == getQueue().get()) { // Don't complete messages on the internal queue - // Finish completion of message, it has been acknowledged by the backup. - QPID_LOG(trace, logPrefix << "Acknowledged " << qm); - guard->complete(qm); - // If next message is protected by the guard then we are ready - if (qm.position >= guard->getRange().back) setReady(); - } - ConsumerImpl::acknowledged(qm); +void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { + // Finish completion of message, it has been acknowledged by the backup. + QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName() << "[" << r.getMessageId() << "]"); + guard->complete(r.getMessageId()); + // If next message is protected by the guard then we are ready + if (r.getMessageId() >= guard->getRange().back) setReady(); + ConsumerImpl::acknowledged(r); } // Called with lock held. Called in subscription's connection thread. @@ -341,13 +326,12 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) // Called after the message has been removed // from the deque and under the messageLock in the queue. Called in // arbitrary connection threads. -void ReplicatingSubscription::dequeued(const QueuedMessage& qm) +void ReplicatingSubscription::dequeued(const Message& m) { - assert (qm.queue == getQueue().get()); - QPID_LOG(trace, logPrefix << "Dequeued " << qm); + QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() << "[" << m.getSequence() << "]"); { Mutex::ScopedLock l(lock); - dequeues.add(qm.position); + dequeues.add(m.getSequence()); } notify(); // Ensure a call to doDispatch } @@ -379,7 +363,7 @@ void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::Scope void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer) { //generate event message - boost::intrusive_ptr<Message> event = new Message(); + boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> event(new qpid::broker::amqp_0_10::MessageTransfer()); AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0))); AMQFrame header((AMQHeaderBody())); AMQFrame content((AMQContentBody())); @@ -400,10 +384,8 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& event->getFrames().getHeaders()->get<DeliveryProperties>(true); props->setRoutingKey(key); // Send the event directly to the base consumer implementation. - // We don't really need a queue here but we pass a dummy queue - // to conform to the consumer API. - QueuedMessage qm(dummy.get(), event); - ConsumerImpl::deliver(qm); + //dummy consumer prevents acknowledgements being handled, which is what we want for events + ConsumerImpl::deliver(QueueCursor(), Message(event, 0), boost::shared_ptr<Consumer>()); } |