diff options
author | Alan Conway <aconway@apache.org> | 2012-02-17 14:07:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-17 14:07:08 +0000 |
commit | 022fdd390704f097f7c14ce090235066834b7f8b (patch) | |
tree | cf7ce15c1c6b81bf2a812e06fcb8668b8f7a2dd8 | |
parent | ce96fd6d592d1fb48f233eee9badc8fee3b861a0 (diff) | |
download | qpid-python-022fdd390704f097f7c14ce090235066834b7f8b.tar.gz |
QPID-3603: Use position events to synchronize queue positions between primary and backup
Previous code used dequeues to synchronize queue, but dequeue events
are generated in a different thread to message delivery which led to
race conditions. Position events are generated in-line with message
delivery.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245494 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 79 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 87 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 13 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 15 |
5 files changed, 135 insertions, 65 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 515c3f4185..86712b4bdc 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -42,12 +42,14 @@ const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); namespace qpid { namespace ha { using namespace broker; +using namespace framing; const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event"); +const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event"); QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management? - queue(q), link(l), current(queue->getPosition()) + queue(q), link(l) { QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings()); // Declare the replicator bridge. @@ -96,49 +98,54 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to " << args.i_dest); } -void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/) +namespace { +template <class T> T decodeContent(Message& m) { + std::string content; + m.getFrames().getContent(content); + Buffer buffer(const_cast<char*>(content.c_str()), content.size()); + T result; + result.decode(buffer); + return result; +} +} + +void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) { + // Thread safe: only calls thread safe Queue functions. + if (queue->getPosition() >= n) { // Ignore dequeus we haven't reached yet + QueuedMessage message; + if (queue->acquireMessageAt(n, message)) { + queue->dequeue(0, message); + QPID_LOG(trace, "HA: Backup dequeued: "<< QueuePos(message)); + } + } +} + +void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable* /*args*/) { + sys::Mutex::ScopedLock l(lock); if (key == DEQUEUE_EVENT_KEY) { - std::string content; - msg.getMessage().getFrames().getContent(content); - qpid::framing::Buffer buffer(const_cast<char*>(content.c_str()), content.size()); - qpid::framing::SequenceSet latest; - latest.decode(buffer); - - QPID_LOG(trace, "HA: Backup received dequeues: " << latest); + SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); + QPID_LOG(trace, "HA: Backup received dequeues: " << dequeues); //TODO: should be able to optimise the following - for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) { - if (current < *i) { - //haven't got that far yet, record the dequeue - dequeued.add(*i); - QPID_LOG(trace, "HA: Recording dequeue of " << QueuePos(queue.get(), *i)); - } else { - QueuedMessage message; - if (queue->acquireMessageAt(*i, message)) { - queue->dequeue(0, message); - QPID_LOG(trace, "HA: Backup dequeued: "<< QueuePos(message)); - } else { - // This can happen if we're replicating a queue that has initial dequeues. - QPID_LOG(trace, "HA: Backup message already dequeued: "<< QueuePos(queue.get(), *i)); - } - } - } + for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) + dequeue(*i, l); + } else if (key == POSITION_EVENT_KEY) { + SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); + assert(queue->getPosition() <= position); + //TODO aconway 2011-12-14: Optimize this? + for (SequenceNumber i = queue->getPosition(); i < position; ++i) + dequeue(i,l); + queue->setPosition(position); + QPID_LOG(trace, "HA: Backup advanced to: " << QueuePos(queue.get(), queue->getPosition())); } else { - //take account of any gaps in sequence created by messages - //dequeued before our subscription reached them - while (dequeued.contains(++current)) { - dequeued.remove(current); - QPID_LOG(trace, "HA: Backup skipping dequeued message: " << QueuePos(queue.get(), current)); - queue->setPosition(current); - } - QPID_LOG(trace, "HA: Backup enqueued message: " << QueuePos(queue.get(), current)); + QPID_LOG(trace, "HA: Backup enqueued message: " << QueuePos(queue.get(), queue->getPosition()+1)); msg.deliverTo(queue); } } -bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; } -bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; } -bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; } +bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; } +bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; } +bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; } std::string QueueReplicator::getType() const { return TYPE_NAME; } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 02acf34886..e864d6b130 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -44,12 +44,13 @@ namespace ha { * Creates a ReplicatingSubscription on the primary by passing special * arguments to the consume command. * - * THREAD SAFE. + * THREAD SAFE: Called in arbitrary connection threads. */ class QueueReplicator : public broker::Exchange { public: static const std::string DEQUEUE_EVENT_KEY; + static const std::string POSITION_EVENT_KEY; QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l); ~QueueReplicator(); @@ -61,12 +62,11 @@ class QueueReplicator : public broker::Exchange private: void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); + void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&); sys::Mutex lock; boost::shared_ptr<broker::Queue> queue; boost::shared_ptr<broker::Link> link; - framing::SequenceNumber current; - framing::SequenceSet dequeued; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 50d5fc55c7..00be587fe4 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -94,19 +94,43 @@ ReplicatingSubscription::ReplicatingSubscription( // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName()); + + // Note that broker::Queue::getPosition() returns the sequence + // number that will be assigned to the next message *minus 1*. + + // this->position is inherited from ConsumerImpl. It tracks the + // position of the last message browsed on the local (primary) + // queue, or more exactly the next sequence number to browse + // *minus 1* qpid::framing::SequenceNumber oldest; - if (queue->getOldest(oldest)) - dequeues.add(0, --oldest); - else //local queue (i.e. master) is empty - dequeues.add(0, queue->getPosition()); - - QPID_LOG(debug, "HA: Initial dequeues for " << queue->getName() << ": " << dequeues); - // Set 'cursor' on backup queue. Will be updated by dequeue event sent above. - position = 0; + position = queue->getOldest(oldest) ? --oldest : queue->getPosition(); + + // this->backupPosition tracks the position of the remote backup + // queue, i.e. the sequence number for the next delivered message + // *minus one* + backupPosition = 0; } -bool ReplicatingSubscription::deliver(QueuedMessage& m) -{ +// Message is delivered in the subscription's connection thread. +bool ReplicatingSubscription::deliver(QueuedMessage& m) { + // Add position events for the subscribed queue, not for the internal event queue. + if (m.queue && m.queue->getName() == getQueue()->getName()) { + QPID_LOG(trace, "HA: replicating message to backup: " << QueuePos(m)); + assert(position == m.position); + { + sys::Mutex::ScopedLock l(lock); + // this->position is the new position after enqueueing m locally. + // this->backupPosition is the backup position before enqueueing m. + assert(position > backupPosition); + if (position - backupPosition > 1) { + // Position has advanced because of messages dequeued ahead of us. + SequenceNumber send(position); + // Send the position before m was enqueued. + sendPositionEvent(--send, l); + } + backupPosition = position; + } + } return ConsumerImpl::deliver(m); } @@ -121,20 +145,38 @@ ReplicatingSubscription::~ReplicatingSubscription() {} //under the message lock in the queue void ReplicatingSubscription::enqueued(const QueuedMessage& m) { - QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m) << " on " << getName()); //delay completion m.payload->getIngressCompletion().startCompleter(); } // Called with lock held. -void ReplicatingSubscription::generateDequeueEvent() +void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) { - QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " " << dequeues << " on " << getName()); + QPID_LOG(trace, "HA: Sending dequeues " << getQueue()->getName() << " " << dequeues << " on " << getName()); string buf(dequeues.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); dequeues.encode(buffer); dequeues.clear(); buffer.reset(); + sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); +} + +// Called with lock held. +void ReplicatingSubscription::sendPositionEvent( + SequenceNumber position, const sys::Mutex::ScopedLock&l ) +{ + QPID_LOG(trace, "HA: Sending position " << QueuePos(getQueue().get(), position) + << " on " << getName()); + string buf(backupPosition.encodedSize(),'\0'); + framing::Buffer buffer(&buf[0], buf.size()); + position.encode(buffer); + buffer.reset(); + sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l); +} + +void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer, + const sys::Mutex::ScopedLock&) +{ //generate event message boost::intrusive_ptr<Message> event = new Message(); AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0))); @@ -154,8 +196,14 @@ void ReplicatingSubscription::generateDequeueEvent() event->getFrames().append(content); DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true); - props->setRoutingKey(QueueReplicator::DEQUEUE_EVENT_KEY); + props->setRoutingKey(key); + // Send the event using the events queue. Consumer is a + // DelegatingConsumer that delegates to *this for everything but + // has an independnet position. We put an event on events and + // dispatch it through ourselves to send it in line with the + // normal browsing messages. events->deliver(event); + events->dispatch(consumer); } // Called after the message has been removed from the deque and under @@ -165,8 +213,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& m) { sys::Mutex::ScopedLock l(lock); dequeues.add(m.position); - QPID_LOG(trace, "HA: Added " << QueuePos(m) - << " to dequeue event; subscription at " << position); + QPID_LOG(trace, "HA: Will dequeue " << QueuePos(m) << " on " << getName()); } notify(); // Ensure a call to doDispatch if (m.position > position) { @@ -179,13 +226,9 @@ bool ReplicatingSubscription::doDispatch() { { sys::Mutex::ScopedLock l(lock); - if (!dequeues.empty()) { - generateDequeueEvent(); - } + if (!dequeues.empty()) sendDequeueEvent(l); } - bool r1 = events->dispatch(consumer); - bool r2 = ConsumerImpl::doDispatch(); - return r1 || r2; + return ConsumerImpl::doDispatch(); } ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {} diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 6d75d6fb73..8c2f1c4e86 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -36,6 +36,10 @@ class QueuedMessage; class OwnershipToken; } +namespace framing { +class Buffer; +} + namespace ha { /** @@ -44,7 +48,8 @@ namespace ha { * Runs on the primary. Delays completion of messages till the backup * has acknowledged, informs backup of locally dequeued messages. * - * THREAD UNSAFE: used only in broker connection thread. + * THREAD SAFE: Used as a consume in subscription's connection + * thread, and as a QueueObserver in arbitrary connection threads. */ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, public broker::QueueObserver @@ -85,8 +90,12 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, boost::shared_ptr<broker::Queue> events; boost::shared_ptr<broker::Consumer> consumer; qpid::framing::SequenceSet dequeues; + framing::SequenceNumber backupPosition; - void generateDequeueEvent(); + void sendDequeueEvent(const sys::Mutex::ScopedLock&); + void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&); + void sendEvent(const std::string& key, framing::Buffer&, + const sys::Mutex::ScopedLock&); class DelegatingConsumer : public Consumer { public: diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index f0668d43f1..e9b84050b9 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -106,18 +106,28 @@ class ShortTests(BrokerTest): verify(b, "1", p) verify(b, "2", p) - # Test a series of messages, enqueue and dequeue. + # Test a series of messages, enqueue all then dequeue all. s = p.sender(queue("foo","all")) msgs = [str(i) for i in range(10)] for m in msgs: s.send(Message(m)) - self.assert_browse_retry(b, "foo", msgs) self.assert_browse_retry(p, "foo", msgs) + self.assert_browse_retry(b, "foo", msgs) r = p.receiver("foo") for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content) p.acknowledge() self.assert_browse_retry(p, "foo", []) self.assert_browse_retry(b, "foo", []) + # Another series, this time verify each dequeue individually. + for m in msgs: s.send(Message(m)) + self.assert_browse_retry(p, "foo", msgs) + self.assert_browse_retry(b, "foo", msgs) + for i in range(len(msgs)): + self.assertEqual(msgs[i], r.fetch(timeout=0).content) + p.acknowledge() + self.assert_browse_retry(p, "foo", msgs[i+1:]) + self.assert_browse_retry(b, "foo", msgs[i+1:]) + def qpid_replicate(self, value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value @@ -172,6 +182,7 @@ class ShortTests(BrokerTest): except: print self.browse(primary.connect().session(), "q", transform=sn) print self.browse(backup1.connect().session(), "q", transform=sn) + print self.browse(backup2.connect().session(), "q", transform=sn) raise if __name__ == "__main__": |