diff options
author | Alan Conway <aconway@apache.org> | 2012-10-24 14:33:25 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-10-24 14:33:25 +0000 |
commit | 37de730a2ee12b6147f36569fc8d806fb1bbf472 (patch) | |
tree | 0a33c3699458936da78a33ca40682f9d12cf6766 | |
parent | 939a9cda52af80ef2f3ec91412e4443e13cb613c (diff) | |
download | qpid-python-37de730a2ee12b6147f36569fc8d806fb1bbf472.tar.gz |
Bug:868364 - QPID-4391: HA ignore stale responses.
Related issue discovered while fixing this bug:
The BrokerReplicater pulls management events and query responses from different
queues, there is no co-ordination between them. If a response is processed late,
after create and delete events, it will incorrectly re-create the deleted queue.
This patch ignores responses if we have already seen an event for the queue or
exchange.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1401711 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 169 | ||||
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.h | 36 | ||||
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 33 |
3 files changed, 135 insertions, 103 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index f451f7c420..74beb83969 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -214,17 +214,75 @@ class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver BrokerReplicator& brokerReplicator; }; -template <class E> pair<string,string> eventKey() { +/** Keep track of queues or exchanges during the update process to solve 2 + * problems. + * + * 1. Once all responses are processed, remove any queues/exchanges + * that were not mentioned as they no longer exist on the primary. + * + * 2. During the update if we see an event for an object we should + * ignore any subsequent responses for that object as they are out + * of date. + */ +class BrokerReplicator::UpdateTracker { + public: + typedef std::set<std::string> Names; + typedef boost::function<void (const std::string&)> CleanFn; + + UpdateTracker(CleanFn f, const ReplicationTest& rt) : cleanFn(f), repTest(rt) {} + + /** Destructor cleans up remaining initial queues. */ + ~UpdateTracker() { + // Don't throw in a destructor. + try { for_each(initial.begin(), initial.end(), cleanFn); } + catch (const std::exception& e) { + QPID_LOG(error, "Error in cleanup of lost objects: " << e.what()); + } + } + + /** Add an exchange name */ + void addExchange(Exchange::shared_ptr ex) { + if (repTest.isReplicated(CONFIGURATION, *ex)) initial.insert(ex->getName()); + } + + /** Add a queue name. */ + void addQueue(Queue::shared_ptr q) { + if (repTest.isReplicated(CONFIGURATION, *q)) initial.insert(q->getName()); + } + + /** Received an event for name */ + void event(const std::string& name) { + initial.erase(name); // no longer a candidate for deleting + events.insert(name); // we have seen an event for this name + } + + /** Received a response for name. + *@return true if this response should be processed, false if we have + *already seen an event for this object. + */ + bool response(const std::string& name) { + initial.erase(name); // no longer a candidate for deleting + return events.find(name) == events.end(); // true if no event seen yet. + } + + private: + Names initial, events; + CleanFn cleanFn; + ReplicationTest repTest; +}; + +template <class E> BrokerReplicator::EventKey eventKey() { return make_pair(E::PACKAGE_NAME, E::EVENT_NAME); } BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), logPrefix("Backup: "), replicationTest(hb.getReplicationTest()), - haBroker(hb), broker(hb.getBroker()), link(l), + haBroker(hb), broker(hb.getBroker()), + exchanges(broker.getExchanges()), queues(broker.getQueues()), + link(l), initialized(false), alternates(hb.getBroker().getExchanges()), - cleaner(*this), connection(0) { broker.getConnectionObservers().add( @@ -298,9 +356,16 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH << " status:" << printable(haBroker.getStatus())); initialized = true; - // Scan for existing replicated queues and exchanges. Any that have not been seen by - // the time all reponses are received will be cleaned up. - cleaner.start(); + exchangeTracker.reset( + new UpdateTracker(boost::bind(&BrokerReplicator::deleteExchange, this, _1), + replicationTest)); + exchanges.eachExchange( + boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1)); + + queueTracker.reset( + new UpdateTracker(boost::bind(&BrokerReplicator::deleteQueue, this, _1, true), + replicationTest)); + queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1)); framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); @@ -369,12 +434,12 @@ void BrokerReplicator::route(Deliverable& msg) { } if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) { QPID_LOG(debug, logPrefix << "All exchange responses received.") - cleaner.cleanExchanges(); // Clean up exchanges that no longer exist in the primary + exchangeTracker.reset(); // Clean up exchanges that no longer exist in the primary alternates.clear(); } if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) { - QPID_LOG(debug, logPrefix << "All queue responses received.") - cleaner.cleanQueues(); // Clean up queues that no longer exist in the primary + QPID_LOG(debug, logPrefix << "All queue responses received."); + queueTracker.reset(); // Clean up queues that no longer exist in the primary } } } catch (const std::exception& e) { @@ -394,12 +459,12 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { string name = values[QNAME].asString(); QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool()); QPID_LOG(debug, logPrefix << "Queue declare event: " << name); - cleaner.forgetQueue(name); + if (queueTracker.get()) queueTracker->event(name); framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); // If we already have a queue with this name, replace it. // The queue was definitely created on the primary. - if (broker.getQueues().find(name)) { + if (queues.find(name)) { QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); deleteQueue(name); } @@ -412,7 +477,7 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::findQueueReplicator( const std::string& qname) { string rname = QueueReplicator::replicatorName(qname); - boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname); + boost::shared_ptr<broker::Exchange> ex = exchanges.find(rname); return boost::dynamic_pointer_cast<QueueReplicator>(ex); } @@ -420,10 +485,10 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { // The remote queue has already been deleted so replicator // sessions may be closed by a "queue deleted" exception. string name = values[QNAME].asString(); - boost::shared_ptr<Queue> queue = broker.getQueues().find(name); + boost::shared_ptr<Queue> queue = queues.find(name); if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) { QPID_LOG(debug, logPrefix << "Queue delete event: " << name); - cleaner.forgetQueue(name); + if (queueTracker.get()) queueTracker->event(name); deleteQueue(name); } } @@ -434,12 +499,12 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) { string name = values[EXNAME].asString(); QPID_LOG(debug, logPrefix << "Exchange declare event: " << name); - cleaner.forgetExchange(name); + if (exchangeTracker.get()) exchangeTracker->event(name); framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); // If we already have a exchange with this name, replace it. // The exchange was definitely created on the primary. - if (broker.getExchanges().find(name)) { + if (exchanges.find(name)) { deleteExchange(name); QPID_LOG(warning, logPrefix << "Replaced existing exchange: " << name); } @@ -453,14 +518,14 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { string name = values[EXNAME].asString(); - boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); + boost::shared_ptr<Exchange> exchange = exchanges.find(name); if (!exchange) { QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name); } else if (!replicationTest.replicateLevel(exchange->getArgs())) { QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name); } else { QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); - cleaner.forgetExchange(name); + if (exchangeTracker.get()) exchangeTracker->event(name); deleteExchange(name); replicatedExchanges.erase(name); } @@ -468,9 +533,9 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { void BrokerReplicator::doEventBind(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = - broker.getExchanges().find(values[EXNAME].asString()); + exchanges.find(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = - broker.getQueues().find(values[QNAME].asString()); + queues.find(values[QNAME].asString()); // We only replicate binds for a replicated queue to replicated // exchange that both exist locally. if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && @@ -488,9 +553,9 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { void BrokerReplicator::doEventUnbind(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = - broker.getExchanges().find(values[EXNAME].asString()); + exchanges.find(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = - broker.getQueues().find(values[QNAME].asString()); + queues.find(values[QNAME].asString()); // We only replicate unbinds for a replicated queue to replicated // exchange that both exist locally. if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && @@ -544,7 +609,9 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { values[EXCLUSIVE].asBool())) return; string name(values[NAME].asString()); - cleaner.forgetQueue(name); + if (!queueTracker.get()) + throw Exception(QPID_MSG("Unexpected queue response: " << values)); + if (!queueTracker->response(name)) return; // Response is out-of-date QPID_LOG(debug, logPrefix << "Queue response: " << name); if (broker.getQueues().find(name)) { // Already exists if (findQueueReplicator(name)) @@ -571,7 +638,9 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicationTest.replicateLevel(argsMap)) return; string name = values[NAME].asString(); - cleaner.forgetExchange(name); + if (!exchangeTracker.get()) + throw Exception(QPID_MSG("Unexpected exchange response: " << values)); + if (!exchangeTracker->response(name)) return; // Response is out of date. QPID_LOG(debug, logPrefix << "Exchange response: " << name); if (broker.getExchanges().find(name)) { if (replicatedExchanges.find(name) != replicatedExchanges.end()) @@ -615,8 +684,8 @@ const std::string QUEUE_REF("queueRef"); void BrokerReplicator::doResponseBind(Variant::Map& values) { std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); - boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName); - boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); + boost::shared_ptr<Exchange> exchange = exchanges.find(exName); + boost::shared_ptr<Queue> queue = queues.find(qName); // Automatically replicate binding if queue and exchange exist and are replicated if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && @@ -661,7 +730,7 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator( if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) { boost::shared_ptr<QueueReplicator> qr( new QueueReplicator(haBroker, queue, link)); - if (!broker.getExchanges().registerExchange(qr)) + if (!exchanges.registerExchange(qr)) throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); return qr; @@ -670,7 +739,7 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator( } void BrokerReplicator::deleteQueue(const std::string& name, bool purge) { - Queue::shared_ptr queue = broker.getQueues().find(name); + Queue::shared_ptr queue = queues.find(name); if (queue) { // Purge before deleting to ensure that we don't reroute any // messages. Any reroutes will be done at the primary and @@ -749,48 +818,6 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } - -// BrokerReplicator::Cleaner - -BrokerReplicator::Cleaner::Cleaner(BrokerReplicator& br) : brokerReplicator(br) {} - -void BrokerReplicator::Cleaner::start() { - queues.clear(); - exchanges.clear(); - brokerReplicator.broker.getExchanges().eachExchange(boost::bind(&Cleaner::addExchange, this, _1)); - brokerReplicator.broker.getQueues().eachQueue(boost::bind(&Cleaner::addQueue, this, _1)); -} - -void BrokerReplicator::Cleaner::addExchange(boost::shared_ptr<broker::Exchange> ex) { - if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *ex)) - exchanges.insert(ex->getName()); -} - -void BrokerReplicator::Cleaner::addQueue(boost::shared_ptr<broker::Queue> q) { - if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *q)) - queues.insert(q->getName()); -} - -void BrokerReplicator::Cleaner::forgetExchange(const std::string& name) { - exchanges.erase(name); -} - -void BrokerReplicator::Cleaner::forgetQueue(const std::string& name) { - queues.erase(name); -} - -void BrokerReplicator::Cleaner::cleanExchanges() { - for_each(exchanges.begin(), exchanges.end(), - boost::bind(&BrokerReplicator::deleteExchange, &brokerReplicator, _1)); - exchanges.clear(); -} - -void BrokerReplicator::Cleaner::cleanQueues() { - for_each(queues.begin(), queues.end(), - boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, _1, true)); - queues.clear(); -} - void BrokerReplicator::autoDeleteCheck( boost::shared_ptr<Exchange> ex, set<string>& result) { @@ -814,7 +841,7 @@ void BrokerReplicator::disconnected() { connection = 0; // Clean up auto-delete queues set<string> deleteQueues; - broker.getExchanges().eachExchange(boost::bind(&BrokerReplicator::autoDeleteCheck, + exchanges.eachExchange(boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1, boost::ref(deleteQueues))); // Don't purge before deleting, the primary is gone so we need to // reroute the deleted messages. diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h index 4845360631..1a86be63c3 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.h +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -41,6 +41,8 @@ class Link; class Bridge; class SessionHandler; class Connection; +class QueueRegistry; +class ExchangeRegistry; } namespace framing { @@ -90,34 +92,7 @@ class BrokerReplicator : public broker::Exchange, typedef std::map<std::string, QueueReplicatorPtr> QueueReplicatorMap; - - /** Keep track of queues and exchanges that need to be cleaned up. */ - class Cleaner { - public: - Cleaner(BrokerReplicator&); - - /** Scan for existing queues and exchanges. */ - void start(); - - // Forget a queue/exchange that does not need cleaning - void forgetExchange(const std::string& name); - void forgetQueue(const std::string& name); - // Clean up queues/exchange that are no longer on primary - void cleanExchanges(); - void cleanQueues(); - - private: - typedef std::set<std::string> Names; - - // add a queue/exchange that may need cleaning. - void addExchange(boost::shared_ptr<broker::Exchange>); - void addQueue(boost::shared_ptr<broker::Queue>); - - BrokerReplicator& brokerReplicator; - Names queues, exchanges; - }; - friend class Cleaner; - + class UpdateTracker; class ErrorListener; class ConnectionObserver; @@ -166,15 +141,18 @@ class BrokerReplicator : public broker::Exchange, ReplicationTest replicationTest; HaBroker& haBroker; broker::Broker& broker; + broker::ExchangeRegistry& exchanges; + broker::QueueRegistry& queues; boost::shared_ptr<broker::Link> link; bool initialized; AlternateExchangeSetter alternates; qpid::Address primary; typedef std::set<std::string> StringSet; StringSet replicatedExchanges; // exchanges that have been replicated. - Cleaner cleaner; broker::Connection* connection; EventDispatchMap dispatch; + std::auto_ptr<UpdateTracker> queueTracker; + std::auto_ptr<UpdateTracker> exchangeTracker; }; }} // namespace qpid::broker diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index edaced25c1..ba7b4cb638 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -29,7 +29,21 @@ from logging import getLogger, WARN, ERROR, DEBUG, INFO from qpidtoollibs import BrokerAgent from uuid import UUID -class ReplicationTests(BrokerTest): +log = getLogger(__name__) + +def grep(filename, regexp): + for line in open(filename).readlines(): + if (regexp.search(line)): return True + return False + +class HaBrokerTest(BrokerTest): + """Base class for HA broker tests""" + def assert_log_no_errors(self, broker): + log = broker.get_log() + if grep(log, re.compile("] error|] critical")): + self.fail("Errors in log file %s"%(log)) + +class ReplicationTests(HaBrokerTest): """Correctness tests for HA replication.""" def test_replication(self): @@ -774,6 +788,19 @@ acl deny all all cluster.start() send_ttl_messages() + def test_stale_response(self): + """Check for race condition where a stale response is processed after an + event for the same queue/exchange """ + cluster = HaCluster(self, 2) + s = cluster[0].connect().session() + s.sender("keep;{create:always}") # Leave this queue in place. + for i in xrange(100): # FIXME aconway 2012-10-23: ??? IS this an issue? + s.sender("deleteme%s;{create:always,delete:always}"%(i)).close() + # It is possible for the backup to attempt to subscribe after the queue + # is deleted. This is not an error, but is logged as an error on the primary. + # The backup does not log this as an error so we only check the backup log for errors. + self.assert_log_no_errors(cluster[1]) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit @@ -808,7 +835,7 @@ def priority_level(value, levels): offset = 5-math.ceil(levels/2.0) return min(max(value - offset, 0), levels-1) -class LongTests(BrokerTest): +class LongTests(HaBrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): @@ -891,7 +918,7 @@ class LongTests(BrokerTest): if unexpected_dead: raise Exception("Brokers not running: %s"%unexpected_dead) -class RecoveryTests(BrokerTest): +class RecoveryTests(HaBrokerTest): """Tests for recovery after a failure.""" def test_queue_hold(self): |