diff options
| author | Alan Conway <aconway@apache.org> | 2013-12-10 14:11:36 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-12-10 14:11:36 +0000 |
| commit | 38000643066ef75bf0ab10e42989a25fa2ccbccd (patch) | |
| tree | e0517a1a2ca6599e25a214b8a68ed68ece31caae /cpp/src/qpid | |
| parent | cae29125a3eafa5c3caf809dd58339a4145a57dc (diff) | |
| download | qpid-python-38000643066ef75bf0ab10e42989a25fa2ccbccd.tar.gz | |
QPID-5404: HA broker message duplication when deleting a queue with an alt-exchange
The old code ran auto-delete on the backup on disconnect. This reroutes
messages onto the alt queue with incorrect replication IDs from the original
queue, and then replicates duplicate rerouted messages from the primary. The
solution is to process auto deletes on the new primary and let them replicate to
the backups.
- Move all auto-delete logic into QueueReplicator
- Primary process auto-delete on QueueReplicator as part of promotion.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1549844 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/ha/Backup.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 23 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/Primary.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/PrimaryTxObserver.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 24 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.h | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 1 |
8 files changed, 49 insertions, 25 deletions
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp index 503de3e351..93ad5ec381 100644 --- a/cpp/src/qpid/ha/Backup.cpp +++ b/cpp/src/qpid/ha/Backup.cpp @@ -100,8 +100,8 @@ Role* Backup::recover(Mutex::ScopedLock&) { // Reset membership before allowing backups to connect. backups = membership.otherBackups(); membership.clear(); - return new Primary(haBroker, backups); } + return new Primary(haBroker, backups); } Role* Backup::promote() { diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 5e8da17a1b..1587b5b33f 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -865,27 +865,14 @@ bool BrokerReplicator::hasBindings() { return false; } string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } -void BrokerReplicator::disconnectedExchange(boost::shared_ptr<Exchange> ex) { +void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr<Exchange> ex) { boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); - // FIXME aconway 2013-11-01: move logic with releaseFromUse to QueueReplicator if (qr) { qr->disconnect(); if (TxReplicator::isTxQueue(qr->getQueue()->getName())) { // Transactions are aborted on failover so clean up tx-queues deleteQueue(qr->getQueue()->getName()); } - else if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) { - if (qr->getQueue()->getSettings().autoDeleteDelay) { - // Start the auto-delete timer - qr->getQueue()->releaseFromUse(); - qr->getQueue()->scheduleAutoDelete(); - } - else { - // Delete immediately. Don't purge, the primary is gone so we need - // to reroute the deleted messages. - deleteQueue(qr->getQueue()->getName(), false); - } - } } } @@ -893,9 +880,9 @@ typedef vector<boost::shared_ptr<Exchange> > ExchangeVector; // Callback function for accumulating exchange candidates namespace { - void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) { - ev.push_back(i); - } +void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) { + ev.push_back(i); +} } // Called by ConnectionObserver::disconnected, disconnected from the network side. @@ -907,7 +894,7 @@ void BrokerReplicator::disconnected() { ExchangeVector exs; exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1)); for_each(exs.begin(), exs.end(), - boost::bind(&BrokerReplicator::disconnectedExchange, this, _1)); + boost::bind(&BrokerReplicator::disconnectedQueueReplicator, this, _1)); } void BrokerReplicator::setMembership(const Variant::List& brokers) { diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h index e319ab1219..b3e3fe3223 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.h +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -148,7 +148,7 @@ class BrokerReplicator : public broker::Exchange, void deleteQueue(const std::string& name, bool purge=true); void deleteExchange(const std::string& name); - void disconnectedExchange(boost::shared_ptr<broker::Exchange>); + void disconnectedQueueReplicator(boost::shared_ptr<broker::Exchange>); void disconnected(); void setMembership(const types::Variant::List&); // Set membership from list. diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp index 0c1858ceb1..0c0fe983bb 100644 --- a/cpp/src/qpid/ha/Primary.cpp +++ b/cpp/src/qpid/ha/Primary.cpp @@ -94,7 +94,16 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : logPrefix("Primary: "), active(false), replicationTest(hb.getSettings().replicateDefault.get()) { + // Note that at this point, we are still rejecting client connections. + // So we are safe from client interference while we set up the primary. + hb.getMembership().setStatus(RECOVERING); + + // Process all QueueReplicators, handles auto-delete queues. + QueueReplicator::Vector qrs; + QueueReplicator::copy(hb.getBroker().getExchanges(), qrs); + std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1)); + broker::QueueRegistry& queues = hb.getBroker().getQueues(); queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1)); if (expect.empty()) { diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp index a32334bcf9..eeb3312aec 100644 --- a/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -26,7 +26,6 @@ #include "QueueGuard.h" #include "RemoteBackup.h" #include "ReplicatingSubscription.h" -#include "QueueReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" @@ -121,7 +120,7 @@ void PrimaryTxObserver::initialize() { throw InvalidArgumentException( QPID_MSG(logPrefix << "TX replication queue already exists.")); txQueue = result.first; - txQueue->markInUse(true); // Prevent auto-delete till we are done. + txQueue->markInUse(); // Prevent auto-delete till we are done. txQueue->deliver(TxBackupsEvent(backups).message()); } @@ -228,7 +227,8 @@ void PrimaryTxObserver::end(Mutex::ScopedLock&) { // If there are no outstanding completions, break pointer cycle here. // Otherwise break it in cancel() when the remaining completions are done. if (incomplete.empty()) txBuffer = 0; - txQueue->releaseFromUse(true); // txQueue will auto-delete + txQueue->releaseFromUse(); // txQueue will auto-delete + txQueue->scheduleAutoDelete(); txQueue.reset(); try { broker.getExchanges().destroy(getExchangeName()); diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index cc6c8a3f30..50f2ececdb 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -50,6 +50,7 @@ using namespace framing::execution; using namespace std; using std::exception; using sys::Mutex; +using boost::shared_ptr; const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency"); @@ -61,6 +62,17 @@ bool QueueReplicator::isReplicatorName(const std::string& name) { return startsWith(name, QUEUE_REPLICATOR_PREFIX); } +namespace { +void pushIfQr(QueueReplicator::Vector& v, const shared_ptr<Exchange>& ex) { + shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex); + if (qr) v.push_back(qr); +} +} + +void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) { + registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1)); +} + class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { public: ErrorListener(const boost::shared_ptr<QueueReplicator>& qr) @@ -116,6 +128,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb, framing::FieldTable args = getArgs(); args.setString(QPID_REPLICATE, printable(NONE).str()); setArgs(args); + // Don't allow backup queues to auto-delete, primary decides when to delete. if (q->isAutoDelete()) q->markInUse(); dispatch[DequeueEvent::KEY] = @@ -306,5 +319,16 @@ bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const bool QueueReplicator::hasBindings() { return false; } std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; } +void QueueReplicator::promoted() { + // Promoted to primary, deal with auto-delete now. + if (queue && queue->isAutoDelete() && subscribed) { + // Make a temporary shared_ptr to prevent premature deletion of queue. + // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue + // which could delete the queue while it's still running it's destroyed logic. + boost::shared_ptr<Queue> q(queue); + q->releaseFromUse(); + q->scheduleAutoDelete(); + } +} }} // namespace qpid::broker diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h index 6fd140fde3..8938285fe3 100644 --- a/cpp/src/qpid/ha/QueueReplicator.h +++ b/cpp/src/qpid/ha/QueueReplicator.h @@ -38,6 +38,7 @@ class Queue; class QueueRegistry; class SessionHandler; class Deliverable; +class ExchangeRegistry; } namespace ha { @@ -59,9 +60,12 @@ class QueueReplicator : public broker::Exchange, public: static const std::string QPID_SYNC_FREQUENCY; static const std::string REPLICATOR_PREFIX; + typedef std::vector<boost::shared_ptr<QueueReplicator> > Vector; static std::string replicatorName(const std::string& queueName); static bool isReplicatorName(const std::string&); + /*** Copy QueueReplicators from the registry */ + static void copy(broker::ExchangeRegistry&, Vector& result); QueueReplicator(HaBroker&, boost::shared_ptr<broker::Queue> q, @@ -78,7 +82,6 @@ class QueueReplicator : public broker::Exchange, // Set if the queue has ever been subscribed to, used for auto-delete cleanup. void setSubscribed() { subscribed = true; } - bool isSubscribed() { return subscribed; } boost::shared_ptr<broker::Queue> getQueue() const { return queue; } @@ -90,6 +93,8 @@ class QueueReplicator : public broker::Exchange, bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); bool hasBindings(); + void promoted(); + protected: typedef boost::function<void(const std::string&, sys::Mutex::ScopedLock&)> DispatchFn; typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap; diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index d0b93da85f..95215e1e59 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -22,7 +22,6 @@ #include "Event.h" #include "IdSetter.h" #include "QueueGuard.h" -#include "QueueReplicator.h" #include "QueueSnapshots.h" #include "ReplicatingSubscription.h" #include "TxReplicatingSubscription.h" |
