diff options
author | Alan Conway <aconway@apache.org> | 2014-08-08 09:24:03 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2014-08-08 09:24:03 +0000 |
commit | a833f714a4de983bce8fb1c2f6b87070bd3b4309 (patch) | |
tree | b480782830390484038c9b62e45c71958f58571c | |
parent | d06ee666b50104bcd7cc42f656a68cce8636f79c (diff) | |
download | qpid-python-a833f714a4de983bce8fb1c2f6b87070bd3b4309.tar.gz |
QPID-5974: HA qpid-txtest2 can bring down a cluster (JERR_MAP_LOCKED))
Problem: transactional dequeues can be sent via two paths as part of the transaction and
via the normal queue replication. If journal is involved this can result result in store errors
if the normal replication path attempts to dequeue before the transaction.
Solution: this is also the case for enqueues, and we already have code in place to skip replication
of tx enqueues via the normal route. Copied the same logic for dequeues.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616703 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 35 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 7 |
6 files changed, 61 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index af4ae12177..dd41f74790 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -265,14 +265,24 @@ void Primary::addReplica(ReplicatingSubscription& rs) { replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs; } -void Primary::skip( +void Primary::skipEnqueues( const types::Uuid& backup, const boost::shared_ptr<broker::Queue>& queue, const ReplicationIdSet& ids) { sys::Mutex::ScopedLock l(lock); ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue)); - if (i != replicas.end()) i->second->addSkip(ids); + if (i != replicas.end()) i->second->skipEnqueues(ids); +} + +void Primary::skipDequeues( + const types::Uuid& backup, + const boost::shared_ptr<broker::Queue>& queue, + const ReplicationIdSet& ids) +{ + sys::Mutex::ScopedLock l(lock); + ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue)); + if (i != replicas.end()) i->second->skipDequeues(ids); } // Called from ReplicatingSubscription::cancel diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index af368bca0f..46cf990834 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -90,9 +90,14 @@ class Primary : public Role void removeReplica(const ReplicatingSubscription&); /** Skip replication of ids to queue on backup. */ - void skip(const types::Uuid& backup, - const boost::shared_ptr<broker::Queue>& queue, - const ReplicationIdSet& ids); + void skipEnqueues(const types::Uuid& backup, + const boost::shared_ptr<broker::Queue>& queue, + const ReplicationIdSet& ids); + + /** Skip replication of dequeue of ids to queue on backup. */ + void skipDequeues(const types::Uuid& backup, + const boost::shared_ptr<broker::Queue>& queue, + const ReplicationIdSet& ids); // Called via BrokerObserver void queueCreate(const QueuePtr&); diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index c94ced7024..be3dc25653 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -165,6 +165,7 @@ void PrimaryTxObserver::dequeue( if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues. QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id)); empty = false; + dequeues[q] += id; txQueue->deliver(TxDequeueEvent(q->getName(), id).message()); } } @@ -180,25 +181,30 @@ struct Skip { const ReplicationIdSet& ids_) : backup(backup_), queue(queue_), ids(ids_) {} - void skip(Primary& p) const { p.skip(backup, queue, ids); } + void skipEnqueues(Primary& p) const { p.skipEnqueues(backup, queue, ids); } + void skipDequeues(Primary& p) const { p.skipDequeues(backup, queue, ids); } }; } // namespace +void PrimaryTxObserver::skip(Mutex::ScopedLock&) { + // Tell replicating subscriptions to skip IDs in the transaction. + vector<Skip> skipEnq, skipDeq; + for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b) { + for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q) + skipEnq.push_back(Skip(*b, q->first, q->second)); + for (QueueIdsMap::iterator q = dequeues.begin(); q != dequeues.end(); ++q) + skipDeq.push_back(Skip(*b, q->first, q->second)); + } + Mutex::ScopedUnlock u(lock); // Outside lock + for_each(skipEnq.begin(), skipEnq.end(), boost::bind(&Skip::skipEnqueues, _1, boost::ref(primary))); + for_each(skipDeq.begin(), skipDeq.end(), boost::bind(&Skip::skipDequeues, _1, boost::ref(primary))); +} + bool PrimaryTxObserver::prepare() { QPID_LOG(debug, logPrefix << "Prepare " << backups); - vector<Skip> skips; - { - Mutex::ScopedLock l(lock); - checkState(SENDING, "Too late for prepare"); - state = PREPARING; - // Tell replicating subscriptions to skip IDs in the transaction. - for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b) - for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q) - skips.push_back(Skip(*b, q->first, q->second)); - } - // Outside lock - for_each(skips.begin(), skips.end(), - boost::bind(&Skip::skip, _1, boost::ref(primary))); + Mutex::ScopedLock l(lock); + checkState(SENDING, "Too late for prepare"); + state = PREPARING; txQueue->deliver(TxPrepareEvent().message()); return true; } @@ -208,6 +214,7 @@ void PrimaryTxObserver::commit() { Mutex::ScopedLock l(lock); checkState(PREPARING, "Cannot commit, not preparing"); if (incomplete.size() == 0) { + skip(l); // Tell local replicating subscriptions to skip tx enqueue/dequeue. txQueue->deliver(TxCommitEvent().message()); end(l); } else { diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h index 5b7c2e3e93..6ea1ba185b 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h @@ -99,6 +99,7 @@ class PrimaryTxObserver : public broker::TransactionObserver, PrimaryTxObserver(Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&); void initialize(); + void skip(sys::Mutex::ScopedLock&); void checkState(State expect, const std::string& msg); void end(sys::Mutex::ScopedLock&); void txPrepareOkEvent(const std::string& data); @@ -120,7 +121,7 @@ class PrimaryTxObserver : public broker::TransactionObserver, types::Uuid id; std::string exchangeName; QueuePtr txQueue; - QueueIdsMap enqueues; + QueueIdsMap enqueues, dequeues; UuidSet backups; // All backups of transaction. UuidSet incomplete; // Incomplete backups (not yet responded to prepare) bool empty; // True if the transaction is empty - no enqueues/dequeues. diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index a0cfa393aa..908458fad3 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -161,7 +161,7 @@ void ReplicatingSubscription::initialize() { { sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued() dequeues += initDequeues; // Messages on backup that are not on primary. - skip = backupIds - initDequeues; // Messages already on the backup. + skipEnqueue = backupIds - initDequeues; // Messages already on the backup. // Queue front is moving but we know this subscriptions will start at a // position >= front so if front is safe then position must be. position = front; @@ -169,7 +169,7 @@ void ReplicatingSubscription::initialize() { QPID_LOG(debug, logPrefix << "Subscribed: front " << front << ", back " << back << ", guarded " << guard->getFirst() - << ", on backup " << skip); + << ", on backup " << skipEnqueue); checkReady(l); } @@ -215,9 +215,9 @@ bool ReplicatingSubscription::deliver( position = m.getSequence(); try { bool result = false; - if (skip.contains(id)) { + if (skipEnqueue.contains(id)) { QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m)); - skip -= id; + skipEnqueue -= id; guard->complete(id); // This will never be acknowledged. notify(); result = true; @@ -281,6 +281,9 @@ void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l) { + ReplicationIdSet oldDequeues = dequeues; + dequeues -= skipDequeue; // Don't send skipped dequeues + skipDequeue -= oldDequeues; // Forget dequeues that would have been sent. if (dequeues.empty()) return; QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); sendEvent(DequeueEvent(dequeues), l); @@ -332,9 +335,14 @@ bool ReplicatingSubscription::doDispatch() } } -void ReplicatingSubscription::addSkip(const ReplicationIdSet& ids) { +void ReplicatingSubscription::skipEnqueues(const ReplicationIdSet& ids) { Mutex::ScopedLock l(lock); - skip += ids; + skipEnqueue += ids; +} + +void ReplicatingSubscription::skipDequeues(const ReplicationIdSet& ids) { + Mutex::ScopedLock l(lock); + skipDequeue += ids; } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 868442da7e..0e3f544d44 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -137,8 +137,8 @@ class ReplicatingSubscription : BrokerInfo getBrokerInfo() const { return info; } - /** Skip replicating enqueue of of ids. */ - void addSkip(const ReplicationIdSet& ids); +void skipEnqueues(const ReplicationIdSet& ids); +void skipDequeues(const ReplicationIdSet& ids); protected: bool doDispatch(); @@ -147,7 +147,8 @@ class ReplicatingSubscription : std::string logPrefix; QueuePosition position; ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. - ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues. + ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup and tx enqueues. + ReplicationIdSet skipDequeue; // Dequeues to skip: tx dequeues. ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged. bool wasStopped; bool ready; |