summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-17 14:07:08 +0000
committerAlan Conway <aconway@apache.org>2012-02-17 14:07:08 +0000
commit022fdd390704f097f7c14ce090235066834b7f8b (patch)
treecf7ce15c1c6b81bf2a812e06fcb8668b8f7a2dd8
parentce96fd6d592d1fb48f233eee9badc8fee3b861a0 (diff)
downloadqpid-python-022fdd390704f097f7c14ce090235066834b7f8b.tar.gz
QPID-3603: Use position events to synchronize queue positions between primary and backup
Previous code used dequeues to synchronize queue, but dequeue events are generated in a different thread to message delivery which led to race conditions. Position events are generated in-line with message delivery. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245494 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp79
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h6
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp87
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h13
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py15
5 files changed, 135 insertions, 65 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 515c3f4185..86712b4bdc 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -42,12 +42,14 @@ const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
namespace qpid {
namespace ha {
using namespace broker;
+using namespace framing;
const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
+const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event");
QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
: Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management?
- queue(q), link(l), current(queue->getPosition())
+ queue(q), link(l)
{
QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings());
// Declare the replicator bridge.
@@ -96,49 +98,54 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to " << args.i_dest);
}
-void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/)
+namespace {
+template <class T> T decodeContent(Message& m) {
+ std::string content;
+ m.getFrames().getContent(content);
+ Buffer buffer(const_cast<char*>(content.c_str()), content.size());
+ T result;
+ result.decode(buffer);
+ return result;
+}
+}
+
+void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) {
+ // Thread safe: only calls thread safe Queue functions.
+ if (queue->getPosition() >= n) { // Ignore dequeus we haven't reached yet
+ QueuedMessage message;
+ if (queue->acquireMessageAt(n, message)) {
+ queue->dequeue(0, message);
+ QPID_LOG(trace, "HA: Backup dequeued: "<< QueuePos(message));
+ }
+ }
+}
+
+void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable* /*args*/)
{
+ sys::Mutex::ScopedLock l(lock);
if (key == DEQUEUE_EVENT_KEY) {
- std::string content;
- msg.getMessage().getFrames().getContent(content);
- qpid::framing::Buffer buffer(const_cast<char*>(content.c_str()), content.size());
- qpid::framing::SequenceSet latest;
- latest.decode(buffer);
-
- QPID_LOG(trace, "HA: Backup received dequeues: " << latest);
+ SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
+ QPID_LOG(trace, "HA: Backup received dequeues: " << dequeues);
//TODO: should be able to optimise the following
- for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) {
- if (current < *i) {
- //haven't got that far yet, record the dequeue
- dequeued.add(*i);
- QPID_LOG(trace, "HA: Recording dequeue of " << QueuePos(queue.get(), *i));
- } else {
- QueuedMessage message;
- if (queue->acquireMessageAt(*i, message)) {
- queue->dequeue(0, message);
- QPID_LOG(trace, "HA: Backup dequeued: "<< QueuePos(message));
- } else {
- // This can happen if we're replicating a queue that has initial dequeues.
- QPID_LOG(trace, "HA: Backup message already dequeued: "<< QueuePos(queue.get(), *i));
- }
- }
- }
+ for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
+ dequeue(*i, l);
+ } else if (key == POSITION_EVENT_KEY) {
+ SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
+ assert(queue->getPosition() <= position);
+ //TODO aconway 2011-12-14: Optimize this?
+ for (SequenceNumber i = queue->getPosition(); i < position; ++i)
+ dequeue(i,l);
+ queue->setPosition(position);
+ QPID_LOG(trace, "HA: Backup advanced to: " << QueuePos(queue.get(), queue->getPosition()));
} else {
- //take account of any gaps in sequence created by messages
- //dequeued before our subscription reached them
- while (dequeued.contains(++current)) {
- dequeued.remove(current);
- QPID_LOG(trace, "HA: Backup skipping dequeued message: " << QueuePos(queue.get(), current));
- queue->setPosition(current);
- }
- QPID_LOG(trace, "HA: Backup enqueued message: " << QueuePos(queue.get(), current));
+ QPID_LOG(trace, "HA: Backup enqueued message: " << QueuePos(queue.get(), queue->getPosition()+1));
msg.deliverTo(queue);
}
}
-bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
+bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
+bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
+bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
std::string QueueReplicator::getType() const { return TYPE_NAME; }
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 02acf34886..e864d6b130 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -44,12 +44,13 @@ namespace ha {
* Creates a ReplicatingSubscription on the primary by passing special
* arguments to the consume command.
*
- * THREAD SAFE.
+ * THREAD SAFE: Called in arbitrary connection threads.
*/
class QueueReplicator : public broker::Exchange
{
public:
static const std::string DEQUEUE_EVENT_KEY;
+ static const std::string POSITION_EVENT_KEY;
QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
~QueueReplicator();
@@ -61,12 +62,11 @@ class QueueReplicator : public broker::Exchange
private:
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
+ void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
- framing::SequenceNumber current;
- framing::SequenceSet dequeued;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 50d5fc55c7..00be587fe4 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -94,19 +94,43 @@ ReplicatingSubscription::ReplicatingSubscription(
// r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName());
+
+ // Note that broker::Queue::getPosition() returns the sequence
+ // number that will be assigned to the next message *minus 1*.
+
+ // this->position is inherited from ConsumerImpl. It tracks the
+ // position of the last message browsed on the local (primary)
+ // queue, or more exactly the next sequence number to browse
+ // *minus 1*
qpid::framing::SequenceNumber oldest;
- if (queue->getOldest(oldest))
- dequeues.add(0, --oldest);
- else //local queue (i.e. master) is empty
- dequeues.add(0, queue->getPosition());
-
- QPID_LOG(debug, "HA: Initial dequeues for " << queue->getName() << ": " << dequeues);
- // Set 'cursor' on backup queue. Will be updated by dequeue event sent above.
- position = 0;
+ position = queue->getOldest(oldest) ? --oldest : queue->getPosition();
+
+ // this->backupPosition tracks the position of the remote backup
+ // queue, i.e. the sequence number for the next delivered message
+ // *minus one*
+ backupPosition = 0;
}
-bool ReplicatingSubscription::deliver(QueuedMessage& m)
-{
+// Message is delivered in the subscription's connection thread.
+bool ReplicatingSubscription::deliver(QueuedMessage& m) {
+ // Add position events for the subscribed queue, not for the internal event queue.
+ if (m.queue && m.queue->getName() == getQueue()->getName()) {
+ QPID_LOG(trace, "HA: replicating message to backup: " << QueuePos(m));
+ assert(position == m.position);
+ {
+ sys::Mutex::ScopedLock l(lock);
+ // this->position is the new position after enqueueing m locally.
+ // this->backupPosition is the backup position before enqueueing m.
+ assert(position > backupPosition);
+ if (position - backupPosition > 1) {
+ // Position has advanced because of messages dequeued ahead of us.
+ SequenceNumber send(position);
+ // Send the position before m was enqueued.
+ sendPositionEvent(--send, l);
+ }
+ backupPosition = position;
+ }
+ }
return ConsumerImpl::deliver(m);
}
@@ -121,20 +145,38 @@ ReplicatingSubscription::~ReplicatingSubscription() {}
//under the message lock in the queue
void ReplicatingSubscription::enqueued(const QueuedMessage& m)
{
- QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m) << " on " << getName());
//delay completion
m.payload->getIngressCompletion().startCompleter();
}
// Called with lock held.
-void ReplicatingSubscription::generateDequeueEvent()
+void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
{
- QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " " << dequeues << " on " << getName());
+ QPID_LOG(trace, "HA: Sending dequeues " << getQueue()->getName() << " " << dequeues << " on " << getName());
string buf(dequeues.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
dequeues.encode(buffer);
dequeues.clear();
buffer.reset();
+ sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
+}
+
+// Called with lock held.
+void ReplicatingSubscription::sendPositionEvent(
+ SequenceNumber position, const sys::Mutex::ScopedLock&l )
+{
+ QPID_LOG(trace, "HA: Sending position " << QueuePos(getQueue().get(), position)
+ << " on " << getName());
+ string buf(backupPosition.encodedSize(),'\0');
+ framing::Buffer buffer(&buf[0], buf.size());
+ position.encode(buffer);
+ buffer.reset();
+ sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
+}
+
+void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer,
+ const sys::Mutex::ScopedLock&)
+{
//generate event message
boost::intrusive_ptr<Message> event = new Message();
AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
@@ -154,8 +196,14 @@ void ReplicatingSubscription::generateDequeueEvent()
event->getFrames().append(content);
DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
- props->setRoutingKey(QueueReplicator::DEQUEUE_EVENT_KEY);
+ props->setRoutingKey(key);
+ // Send the event using the events queue. Consumer is a
+ // DelegatingConsumer that delegates to *this for everything but
+ // has an independnet position. We put an event on events and
+ // dispatch it through ourselves to send it in line with the
+ // normal browsing messages.
events->deliver(event);
+ events->dispatch(consumer);
}
// Called after the message has been removed from the deque and under
@@ -165,8 +213,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& m)
{
sys::Mutex::ScopedLock l(lock);
dequeues.add(m.position);
- QPID_LOG(trace, "HA: Added " << QueuePos(m)
- << " to dequeue event; subscription at " << position);
+ QPID_LOG(trace, "HA: Will dequeue " << QueuePos(m) << " on " << getName());
}
notify(); // Ensure a call to doDispatch
if (m.position > position) {
@@ -179,13 +226,9 @@ bool ReplicatingSubscription::doDispatch()
{
{
sys::Mutex::ScopedLock l(lock);
- if (!dequeues.empty()) {
- generateDequeueEvent();
- }
+ if (!dequeues.empty()) sendDequeueEvent(l);
}
- bool r1 = events->dispatch(consumer);
- bool r2 = ConsumerImpl::doDispatch();
- return r1 || r2;
+ return ConsumerImpl::doDispatch();
}
ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 6d75d6fb73..8c2f1c4e86 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -36,6 +36,10 @@ class QueuedMessage;
class OwnershipToken;
}
+namespace framing {
+class Buffer;
+}
+
namespace ha {
/**
@@ -44,7 +48,8 @@ namespace ha {
* Runs on the primary. Delays completion of messages till the backup
* has acknowledged, informs backup of locally dequeued messages.
*
- * THREAD UNSAFE: used only in broker connection thread.
+ * THREAD SAFE: Used as a consume in subscription's connection
+ * thread, and as a QueueObserver in arbitrary connection threads.
*/
class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
public broker::QueueObserver
@@ -85,8 +90,12 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
boost::shared_ptr<broker::Queue> events;
boost::shared_ptr<broker::Consumer> consumer;
qpid::framing::SequenceSet dequeues;
+ framing::SequenceNumber backupPosition;
- void generateDequeueEvent();
+ void sendDequeueEvent(const sys::Mutex::ScopedLock&);
+ void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
+ void sendEvent(const std::string& key, framing::Buffer&,
+ const sys::Mutex::ScopedLock&);
class DelegatingConsumer : public Consumer
{
public:
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index f0668d43f1..e9b84050b9 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -106,18 +106,28 @@ class ShortTests(BrokerTest):
verify(b, "1", p)
verify(b, "2", p)
- # Test a series of messages, enqueue and dequeue.
+ # Test a series of messages, enqueue all then dequeue all.
s = p.sender(queue("foo","all"))
msgs = [str(i) for i in range(10)]
for m in msgs: s.send(Message(m))
- self.assert_browse_retry(b, "foo", msgs)
self.assert_browse_retry(p, "foo", msgs)
+ self.assert_browse_retry(b, "foo", msgs)
r = p.receiver("foo")
for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content)
p.acknowledge()
self.assert_browse_retry(p, "foo", [])
self.assert_browse_retry(b, "foo", [])
+ # Another series, this time verify each dequeue individually.
+ for m in msgs: s.send(Message(m))
+ self.assert_browse_retry(p, "foo", msgs)
+ self.assert_browse_retry(b, "foo", msgs)
+ for i in range(len(msgs)):
+ self.assertEqual(msgs[i], r.fetch(timeout=0).content)
+ p.acknowledge()
+ self.assert_browse_retry(p, "foo", msgs[i+1:])
+ self.assert_browse_retry(b, "foo", msgs[i+1:])
+
def qpid_replicate(self, value="all"):
return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
@@ -172,6 +182,7 @@ class ShortTests(BrokerTest):
except:
print self.browse(primary.connect().session(), "q", transform=sn)
print self.browse(backup1.connect().session(), "q", transform=sn)
+ print self.browse(backup2.connect().session(), "q", transform=sn)
raise
if __name__ == "__main__":