summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-08-08 09:24:03 +0000
committerAlan Conway <aconway@apache.org>2014-08-08 09:24:03 +0000
commita833f714a4de983bce8fb1c2f6b87070bd3b4309 (patch)
treeb480782830390484038c9b62e45c71958f58571c
parentd06ee666b50104bcd7cc42f656a68cce8636f79c (diff)
downloadqpid-python-a833f714a4de983bce8fb1c2f6b87070bd3b4309.tar.gz
QPID-5974: HA qpid-txtest2 can bring down a cluster (JERR_MAP_LOCKED))
Problem: transactional dequeues can be sent via two paths as part of the transaction and via the normal queue replication. If journal is involved this can result result in store errors if the normal replication path attempts to dequeue before the transaction. Solution: this is also the case for enqueues, and we already have code in place to skip replication of tx enqueues via the normal route. Copied the same logic for dequeues. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616703 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp14
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h11
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp35
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.h3
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp20
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h7
6 files changed, 61 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index af4ae12177..dd41f74790 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -265,14 +265,24 @@ void Primary::addReplica(ReplicatingSubscription& rs) {
replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs;
}
-void Primary::skip(
+void Primary::skipEnqueues(
const types::Uuid& backup,
const boost::shared_ptr<broker::Queue>& queue,
const ReplicationIdSet& ids)
{
sys::Mutex::ScopedLock l(lock);
ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
- if (i != replicas.end()) i->second->addSkip(ids);
+ if (i != replicas.end()) i->second->skipEnqueues(ids);
+}
+
+void Primary::skipDequeues(
+ const types::Uuid& backup,
+ const boost::shared_ptr<broker::Queue>& queue,
+ const ReplicationIdSet& ids)
+{
+ sys::Mutex::ScopedLock l(lock);
+ ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
+ if (i != replicas.end()) i->second->skipDequeues(ids);
}
// Called from ReplicatingSubscription::cancel
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index af368bca0f..46cf990834 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -90,9 +90,14 @@ class Primary : public Role
void removeReplica(const ReplicatingSubscription&);
/** Skip replication of ids to queue on backup. */
- void skip(const types::Uuid& backup,
- const boost::shared_ptr<broker::Queue>& queue,
- const ReplicationIdSet& ids);
+ void skipEnqueues(const types::Uuid& backup,
+ const boost::shared_ptr<broker::Queue>& queue,
+ const ReplicationIdSet& ids);
+
+ /** Skip replication of dequeue of ids to queue on backup. */
+ void skipDequeues(const types::Uuid& backup,
+ const boost::shared_ptr<broker::Queue>& queue,
+ const ReplicationIdSet& ids);
// Called via BrokerObserver
void queueCreate(const QueuePtr&);
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index c94ced7024..be3dc25653 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -165,6 +165,7 @@ void PrimaryTxObserver::dequeue(
if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
empty = false;
+ dequeues[q] += id;
txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
}
}
@@ -180,25 +181,30 @@ struct Skip {
const ReplicationIdSet& ids_) :
backup(backup_), queue(queue_), ids(ids_) {}
- void skip(Primary& p) const { p.skip(backup, queue, ids); }
+ void skipEnqueues(Primary& p) const { p.skipEnqueues(backup, queue, ids); }
+ void skipDequeues(Primary& p) const { p.skipDequeues(backup, queue, ids); }
};
} // namespace
+void PrimaryTxObserver::skip(Mutex::ScopedLock&) {
+ // Tell replicating subscriptions to skip IDs in the transaction.
+ vector<Skip> skipEnq, skipDeq;
+ for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b) {
+ for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
+ skipEnq.push_back(Skip(*b, q->first, q->second));
+ for (QueueIdsMap::iterator q = dequeues.begin(); q != dequeues.end(); ++q)
+ skipDeq.push_back(Skip(*b, q->first, q->second));
+ }
+ Mutex::ScopedUnlock u(lock); // Outside lock
+ for_each(skipEnq.begin(), skipEnq.end(), boost::bind(&Skip::skipEnqueues, _1, boost::ref(primary)));
+ for_each(skipDeq.begin(), skipDeq.end(), boost::bind(&Skip::skipDequeues, _1, boost::ref(primary)));
+}
+
bool PrimaryTxObserver::prepare() {
QPID_LOG(debug, logPrefix << "Prepare " << backups);
- vector<Skip> skips;
- {
- Mutex::ScopedLock l(lock);
- checkState(SENDING, "Too late for prepare");
- state = PREPARING;
- // Tell replicating subscriptions to skip IDs in the transaction.
- for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b)
- for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
- skips.push_back(Skip(*b, q->first, q->second));
- }
- // Outside lock
- for_each(skips.begin(), skips.end(),
- boost::bind(&Skip::skip, _1, boost::ref(primary)));
+ Mutex::ScopedLock l(lock);
+ checkState(SENDING, "Too late for prepare");
+ state = PREPARING;
txQueue->deliver(TxPrepareEvent().message());
return true;
}
@@ -208,6 +214,7 @@ void PrimaryTxObserver::commit() {
Mutex::ScopedLock l(lock);
checkState(PREPARING, "Cannot commit, not preparing");
if (incomplete.size() == 0) {
+ skip(l); // Tell local replicating subscriptions to skip tx enqueue/dequeue.
txQueue->deliver(TxCommitEvent().message());
end(l);
} else {
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
index 5b7c2e3e93..6ea1ba185b 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -99,6 +99,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
PrimaryTxObserver(Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&);
void initialize();
+ void skip(sys::Mutex::ScopedLock&);
void checkState(State expect, const std::string& msg);
void end(sys::Mutex::ScopedLock&);
void txPrepareOkEvent(const std::string& data);
@@ -120,7 +121,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
types::Uuid id;
std::string exchangeName;
QueuePtr txQueue;
- QueueIdsMap enqueues;
+ QueueIdsMap enqueues, dequeues;
UuidSet backups; // All backups of transaction.
UuidSet incomplete; // Incomplete backups (not yet responded to prepare)
bool empty; // True if the transaction is empty - no enqueues/dequeues.
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index a0cfa393aa..908458fad3 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -161,7 +161,7 @@ void ReplicatingSubscription::initialize() {
{
sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
dequeues += initDequeues; // Messages on backup that are not on primary.
- skip = backupIds - initDequeues; // Messages already on the backup.
+ skipEnqueue = backupIds - initDequeues; // Messages already on the backup.
// Queue front is moving but we know this subscriptions will start at a
// position >= front so if front is safe then position must be.
position = front;
@@ -169,7 +169,7 @@ void ReplicatingSubscription::initialize() {
QPID_LOG(debug, logPrefix << "Subscribed: front " << front
<< ", back " << back
<< ", guarded " << guard->getFirst()
- << ", on backup " << skip);
+ << ", on backup " << skipEnqueue);
checkReady(l);
}
@@ -215,9 +215,9 @@ bool ReplicatingSubscription::deliver(
position = m.getSequence();
try {
bool result = false;
- if (skip.contains(id)) {
+ if (skipEnqueue.contains(id)) {
QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m));
- skip -= id;
+ skipEnqueue -= id;
guard->complete(id); // This will never be acknowledged.
notify();
result = true;
@@ -281,6 +281,9 @@ void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
{
+ ReplicationIdSet oldDequeues = dequeues;
+ dequeues -= skipDequeue; // Don't send skipped dequeues
+ skipDequeue -= oldDequeues; // Forget dequeues that would have been sent.
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
sendEvent(DequeueEvent(dequeues), l);
@@ -332,9 +335,14 @@ bool ReplicatingSubscription::doDispatch()
}
}
-void ReplicatingSubscription::addSkip(const ReplicationIdSet& ids) {
+void ReplicatingSubscription::skipEnqueues(const ReplicationIdSet& ids) {
Mutex::ScopedLock l(lock);
- skip += ids;
+ skipEnqueue += ids;
+}
+
+void ReplicatingSubscription::skipDequeues(const ReplicationIdSet& ids) {
+ Mutex::ScopedLock l(lock);
+ skipDequeue += ids;
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 868442da7e..0e3f544d44 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -137,8 +137,8 @@ class ReplicatingSubscription :
BrokerInfo getBrokerInfo() const { return info; }
- /** Skip replicating enqueue of of ids. */
- void addSkip(const ReplicationIdSet& ids);
+void skipEnqueues(const ReplicationIdSet& ids);
+void skipDequeues(const ReplicationIdSet& ids);
protected:
bool doDispatch();
@@ -147,7 +147,8 @@ class ReplicatingSubscription :
std::string logPrefix;
QueuePosition position;
ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
- ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues.
+ ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup and tx enqueues.
+ ReplicationIdSet skipDequeue; // Dequeues to skip: tx dequeues.
ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
bool wasStopped;
bool ready;