diff options
| author | Alan Conway <aconway@apache.org> | 2013-12-10 14:11:36 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-12-10 14:11:36 +0000 |
| commit | 38000643066ef75bf0ab10e42989a25fa2ccbccd (patch) | |
| tree | e0517a1a2ca6599e25a214b8a68ed68ece31caae /cpp/src/qpid/ha/QueueReplicator.cpp | |
| parent | cae29125a3eafa5c3caf809dd58339a4145a57dc (diff) | |
| download | qpid-python-38000643066ef75bf0ab10e42989a25fa2ccbccd.tar.gz | |
QPID-5404: HA broker message duplication when deleting a queue with an alt-exchange
The old code ran auto-delete on the backup on disconnect. This reroutes
messages onto the alt queue with incorrect replication IDs from the original
queue, and then replicates duplicate rerouted messages from the primary. The
solution is to process auto deletes on the new primary and let them replicate to
the backups.
- Move all auto-delete logic into QueueReplicator
- Primary process auto-delete on QueueReplicator as part of promotion.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1549844 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index cc6c8a3f30..50f2ececdb 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -50,6 +50,7 @@ using namespace framing::execution; using namespace std; using std::exception; using sys::Mutex; +using boost::shared_ptr; const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency"); @@ -61,6 +62,17 @@ bool QueueReplicator::isReplicatorName(const std::string& name) { return startsWith(name, QUEUE_REPLICATOR_PREFIX); } +namespace { +void pushIfQr(QueueReplicator::Vector& v, const shared_ptr<Exchange>& ex) { + shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex); + if (qr) v.push_back(qr); +} +} + +void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) { + registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1)); +} + class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { public: ErrorListener(const boost::shared_ptr<QueueReplicator>& qr) @@ -116,6 +128,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb, framing::FieldTable args = getArgs(); args.setString(QPID_REPLICATE, printable(NONE).str()); setArgs(args); + // Don't allow backup queues to auto-delete, primary decides when to delete. if (q->isAutoDelete()) q->markInUse(); dispatch[DequeueEvent::KEY] = @@ -306,5 +319,16 @@ bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const bool QueueReplicator::hasBindings() { return false; } std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; } +void QueueReplicator::promoted() { + // Promoted to primary, deal with auto-delete now. + if (queue && queue->isAutoDelete() && subscribed) { + // Make a temporary shared_ptr to prevent premature deletion of queue. + // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue + // which could delete the queue while it's still running it's destroyed logic. + boost::shared_ptr<Queue> q(queue); + q->releaseFromUse(); + q->scheduleAutoDelete(); + } +} }} // namespace qpid::broker |
