summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp124
1 files changed, 53 insertions, 71 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index c960758eaf..6f7519cd1f 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -27,6 +27,7 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
@@ -66,10 +67,10 @@ class DequeueScanner
at = front - 1;
}
- void operator()(const QueuedMessage& qm) {
- if (qm.position >= front && qm.position <= back) {
- if (qm.position > at+1) subscription.dequeued(at+1, qm.position-1);
- at = qm.position;
+ void operator()(const Message& m) {
+ if (m.getSequence() >= front && m.getSequence() <= back) {
+ if (m.getSequence() > at+1) subscription.dequeued(at+1, m.getSequence()-1);
+ at = m.getSequence();
}
}
@@ -90,37 +91,23 @@ string mask(const string& in)
return DOLLAR + in + INTERNAL;
}
-
-/** Dummy consumer used to get the front position on the queue */
-class GetPositionConsumer : public Consumer
+namespace {
+bool getSequence(const Message& message, SequenceNumber& result)
{
- public:
- GetPositionConsumer() :
- Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {}
- bool deliver(broker::QueuedMessage& ) { return true; }
- void notify() {}
- bool filter(boost::intrusive_ptr<broker::Message>) { return true; }
- bool accept(boost::intrusive_ptr<broker::Message>) { return true; }
- void cancel() {}
- void acknowledged(const broker::QueuedMessage&) {}
- bool browseAcquired() const { return true; }
- broker::OwnershipToken* getSession() { return 0; }
-};
-
-
+ result = message.getSequence();
+ return true;
+}
+}
bool ReplicatingSubscription::getNext(
broker::Queue& q, SequenceNumber from, SequenceNumber& result)
{
- boost::shared_ptr<Consumer> c(new GetPositionConsumer);
- c->setPosition(from);
- if (!q.dispatch(c)) return false;
- result = c->getPosition();
- return true;
+ QueueCursor cursor(REPLICATOR);
+ return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), from);
}
bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) {
- // FIXME aconway 2012-05-23: won't wrap, assumes 0 is < all messages in queue.
- return getNext(q, 0, front);
+ QueueCursor cursor(REPLICATOR);
+ return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(front)));
}
/* Called by SemanticState::consume to create a consumer */
@@ -152,15 +139,14 @@ ReplicatingSubscription::ReplicatingSubscription(
const string& name,
Queue::shared_ptr queue,
bool ack,
- bool acquire,
+ bool /*acquire*/,
bool exclusive,
const string& tag,
const string& resumeId,
uint64_t resumeTtl,
const framing::FieldTable& arguments
-) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
+) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
- dummy(new Queue(mask(name))),
ready(false)
{
try {
@@ -213,6 +199,8 @@ ReplicatingSubscription::ReplicatingSubscription(
queue->eachMessage(boost::ref(scan)); // Remove missing messages in between.
scan.finish();
position = backup.back;
+ //move cursor to position
+ queue->seek(*this, position);
}
// NOTE: we are assuming that the messages that are on the backup are
// consistent with those on the primary. If the backup is a replica
@@ -260,32 +248,31 @@ void ReplicatingSubscription::initialize() {
}
// Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
+bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) {
+ position = m.getSequence();
try {
- // Add position events for the subscribed queue, not the internal event queue.
- if (qm.queue == getQueue().get()) {
- QPID_LOG(trace, logPrefix << "Replicating " << qm);
- {
- Mutex::ScopedLock l(lock);
- assert(position == qm.position);
- // qm.position is the position of the newly enqueued qm on local queue.
- // backupPosition is latest position on backup queue before enqueueing
- if (qm.position <= backupPosition)
- throw Exception(
- QPID_MSG("Expected position > " << backupPosition
- << " but got " << qm.position));
- if (qm.position - backupPosition > 1) {
- // Position has advanced because of messages dequeued ahead of us.
- // Send the position before qm was enqueued.
- sendPositionEvent(qm.position-1, l);
- }
- // Backup will automatically advance by 1 on delivery of message.
- backupPosition = qm.position;
+ QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName() << "[" << m.getSequence() << "]");
+ {
+ Mutex::ScopedLock l(lock);
+ //FIXME GRS: position is no longer set//assert(position == m.getSequence());
+
+ // m.getSequence() is the position of the newly enqueued message on local queue.
+ // backupPosition is latest position on backup queue before enqueueing
+ if (m.getSequence() <= backupPosition)
+ throw Exception(
+ QPID_MSG("Expected position > " << backupPosition
+ << " but got " << m.getSequence()));
+ if (m.getSequence() - backupPosition > 1) {
+ // Position has advanced because of messages dequeued ahead of us.
+ // Send the position before message was enqueued.
+ sendPositionEvent(m.getSequence()-1, l);
}
+ // Backup will automatically advance by 1 on delivery of message.
+ backupPosition = m.getSequence();
}
- return ConsumerImpl::deliver(qm);
+ return ConsumerImpl::deliver(c, m);
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Error replicating " << qm
+ QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName() << "[" << m.getSequence() << "]"
<< ": " << e.what());
throw;
}
@@ -310,15 +297,13 @@ void ReplicatingSubscription::cancel()
}
// Consumer override, called on primary in the backup's IO thread.
-void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
- if (qm.queue == getQueue().get()) { // Don't complete messages on the internal queue
- // Finish completion of message, it has been acknowledged by the backup.
- QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
- guard->complete(qm);
- // If next message is protected by the guard then we are ready
- if (qm.position >= guard->getRange().back) setReady();
- }
- ConsumerImpl::acknowledged(qm);
+void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
+ // Finish completion of message, it has been acknowledged by the backup.
+ QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName() << "[" << r.getMessageId() << "]");
+ guard->complete(r.getMessageId());
+ // If next message is protected by the guard then we are ready
+ if (r.getMessageId() >= guard->getRange().back) setReady();
+ ConsumerImpl::acknowledged(r);
}
// Called with lock held. Called in subscription's connection thread.
@@ -341,13 +326,12 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
// Called after the message has been removed
// from the deque and under the messageLock in the queue. Called in
// arbitrary connection threads.
-void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
+void ReplicatingSubscription::dequeued(const Message& m)
{
- assert (qm.queue == getQueue().get());
- QPID_LOG(trace, logPrefix << "Dequeued " << qm);
+ QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() << "[" << m.getSequence() << "]");
{
Mutex::ScopedLock l(lock);
- dequeues.add(qm.position);
+ dequeues.add(m.getSequence());
}
notify(); // Ensure a call to doDispatch
}
@@ -379,7 +363,7 @@ void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::Scope
void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer)
{
//generate event message
- boost::intrusive_ptr<Message> event = new Message();
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> event(new qpid::broker::amqp_0_10::MessageTransfer());
AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody()));
@@ -400,10 +384,8 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer&
event->getFrames().getHeaders()->get<DeliveryProperties>(true);
props->setRoutingKey(key);
// Send the event directly to the base consumer implementation.
- // We don't really need a queue here but we pass a dummy queue
- // to conform to the consumer API.
- QueuedMessage qm(dummy.get(), event);
- ConsumerImpl::deliver(qm);
+ //dummy consumer prevents acknowledgements being handled, which is what we want for events
+ ConsumerImpl::deliver(QueueCursor(), Message(event, 0), boost::shared_ptr<Consumer>());
}