summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-10-24 14:33:25 +0000
committerAlan Conway <aconway@apache.org>2012-10-24 14:33:25 +0000
commit37de730a2ee12b6147f36569fc8d806fb1bbf472 (patch)
tree0a33c3699458936da78a33ca40682f9d12cf6766
parent939a9cda52af80ef2f3ec91412e4443e13cb613c (diff)
downloadqpid-python-37de730a2ee12b6147f36569fc8d806fb1bbf472.tar.gz
Bug:868364 - QPID-4391: HA ignore stale responses.
Related issue discovered while fixing this bug: The BrokerReplicater pulls management events and query responses from different queues, there is no co-ordination between them. If a response is processed late, after create and delete events, it will incorrectly re-create the deleted queue. This patch ignores responses if we have already seen an event for the queue or exchange. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1401711 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp169
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.h36
-rwxr-xr-xcpp/src/tests/ha_tests.py33
3 files changed, 135 insertions, 103 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index f451f7c420..74beb83969 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -214,17 +214,75 @@ class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver
BrokerReplicator& brokerReplicator;
};
-template <class E> pair<string,string> eventKey() {
+/** Keep track of queues or exchanges during the update process to solve 2
+ * problems.
+ *
+ * 1. Once all responses are processed, remove any queues/exchanges
+ * that were not mentioned as they no longer exist on the primary.
+ *
+ * 2. During the update if we see an event for an object we should
+ * ignore any subsequent responses for that object as they are out
+ * of date.
+ */
+class BrokerReplicator::UpdateTracker {
+ public:
+ typedef std::set<std::string> Names;
+ typedef boost::function<void (const std::string&)> CleanFn;
+
+ UpdateTracker(CleanFn f, const ReplicationTest& rt) : cleanFn(f), repTest(rt) {}
+
+ /** Destructor cleans up remaining initial queues. */
+ ~UpdateTracker() {
+ // Don't throw in a destructor.
+ try { for_each(initial.begin(), initial.end(), cleanFn); }
+ catch (const std::exception& e) {
+ QPID_LOG(error, "Error in cleanup of lost objects: " << e.what());
+ }
+ }
+
+ /** Add an exchange name */
+ void addExchange(Exchange::shared_ptr ex) {
+ if (repTest.isReplicated(CONFIGURATION, *ex)) initial.insert(ex->getName());
+ }
+
+ /** Add a queue name. */
+ void addQueue(Queue::shared_ptr q) {
+ if (repTest.isReplicated(CONFIGURATION, *q)) initial.insert(q->getName());
+ }
+
+ /** Received an event for name */
+ void event(const std::string& name) {
+ initial.erase(name); // no longer a candidate for deleting
+ events.insert(name); // we have seen an event for this name
+ }
+
+ /** Received a response for name.
+ *@return true if this response should be processed, false if we have
+ *already seen an event for this object.
+ */
+ bool response(const std::string& name) {
+ initial.erase(name); // no longer a candidate for deleting
+ return events.find(name) == events.end(); // true if no event seen yet.
+ }
+
+ private:
+ Names initial, events;
+ CleanFn cleanFn;
+ ReplicationTest repTest;
+};
+
+template <class E> BrokerReplicator::EventKey eventKey() {
return make_pair(E::PACKAGE_NAME, E::EVENT_NAME);
}
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
- haBroker(hb), broker(hb.getBroker()), link(l),
+ haBroker(hb), broker(hb.getBroker()),
+ exchanges(broker.getExchanges()), queues(broker.getQueues()),
+ link(l),
initialized(false),
alternates(hb.getBroker().getExchanges()),
- cleaner(*this),
connection(0)
{
broker.getConnectionObservers().add(
@@ -298,9 +356,16 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
<< " status:" << printable(haBroker.getStatus()));
initialized = true;
- // Scan for existing replicated queues and exchanges. Any that have not been seen by
- // the time all reponses are received will be cleaned up.
- cleaner.start();
+ exchangeTracker.reset(
+ new UpdateTracker(boost::bind(&BrokerReplicator::deleteExchange, this, _1),
+ replicationTest));
+ exchanges.eachExchange(
+ boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1));
+
+ queueTracker.reset(
+ new UpdateTracker(boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
+ replicationTest));
+ queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1));
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
@@ -369,12 +434,12 @@ void BrokerReplicator::route(Deliverable& msg) {
}
if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
QPID_LOG(debug, logPrefix << "All exchange responses received.")
- cleaner.cleanExchanges(); // Clean up exchanges that no longer exist in the primary
+ exchangeTracker.reset(); // Clean up exchanges that no longer exist in the primary
alternates.clear();
}
if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) {
- QPID_LOG(debug, logPrefix << "All queue responses received.")
- cleaner.cleanQueues(); // Clean up queues that no longer exist in the primary
+ QPID_LOG(debug, logPrefix << "All queue responses received.");
+ queueTracker.reset(); // Clean up queues that no longer exist in the primary
}
}
} catch (const std::exception& e) {
@@ -394,12 +459,12 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
string name = values[QNAME].asString();
QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool());
QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
- cleaner.forgetQueue(name);
+ if (queueTracker.get()) queueTracker->event(name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
// If we already have a queue with this name, replace it.
// The queue was definitely created on the primary.
- if (broker.getQueues().find(name)) {
+ if (queues.find(name)) {
QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
deleteQueue(name);
}
@@ -412,7 +477,7 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::findQueueReplicator(
const std::string& qname)
{
string rname = QueueReplicator::replicatorName(qname);
- boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
+ boost::shared_ptr<broker::Exchange> ex = exchanges.find(rname);
return boost::dynamic_pointer_cast<QueueReplicator>(ex);
}
@@ -420,10 +485,10 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
// The remote queue has already been deleted so replicator
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
- boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
+ boost::shared_ptr<Queue> queue = queues.find(name);
if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
- cleaner.forgetQueue(name);
+ if (queueTracker.get()) queueTracker->event(name);
deleteQueue(name);
}
}
@@ -434,12 +499,12 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
string name = values[EXNAME].asString();
QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
- cleaner.forgetExchange(name);
+ if (exchangeTracker.get()) exchangeTracker->event(name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
// If we already have a exchange with this name, replace it.
// The exchange was definitely created on the primary.
- if (broker.getExchanges().find(name)) {
+ if (exchanges.find(name)) {
deleteExchange(name);
QPID_LOG(warning, logPrefix << "Replaced existing exchange: " << name);
}
@@ -453,14 +518,14 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
string name = values[EXNAME].asString();
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
+ boost::shared_ptr<Exchange> exchange = exchanges.find(name);
if (!exchange) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
} else if (!replicationTest.replicateLevel(exchange->getArgs())) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
} else {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
- cleaner.forgetExchange(name);
+ if (exchangeTracker.get()) exchangeTracker->event(name);
deleteExchange(name);
replicatedExchanges.erase(name);
}
@@ -468,9 +533,9 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
void BrokerReplicator::doEventBind(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange =
- broker.getExchanges().find(values[EXNAME].asString());
+ exchanges.find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
- broker.getQueues().find(values[QNAME].asString());
+ queues.find(values[QNAME].asString());
// We only replicate binds for a replicated queue to replicated
// exchange that both exist locally.
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
@@ -488,9 +553,9 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
void BrokerReplicator::doEventUnbind(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange =
- broker.getExchanges().find(values[EXNAME].asString());
+ exchanges.find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
- broker.getQueues().find(values[QNAME].asString());
+ queues.find(values[QNAME].asString());
// We only replicate unbinds for a replicated queue to replicated
// exchange that both exist locally.
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
@@ -544,7 +609,9 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
values[EXCLUSIVE].asBool()))
return;
string name(values[NAME].asString());
- cleaner.forgetQueue(name);
+ if (!queueTracker.get())
+ throw Exception(QPID_MSG("Unexpected queue response: " << values));
+ if (!queueTracker->response(name)) return; // Response is out-of-date
QPID_LOG(debug, logPrefix << "Queue response: " << name);
if (broker.getQueues().find(name)) { // Already exists
if (findQueueReplicator(name))
@@ -571,7 +638,9 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicationTest.replicateLevel(argsMap)) return;
string name = values[NAME].asString();
- cleaner.forgetExchange(name);
+ if (!exchangeTracker.get())
+ throw Exception(QPID_MSG("Unexpected exchange response: " << values));
+ if (!exchangeTracker->response(name)) return; // Response is out of date.
QPID_LOG(debug, logPrefix << "Exchange response: " << name);
if (broker.getExchanges().find(name)) {
if (replicatedExchanges.find(name) != replicatedExchanges.end())
@@ -615,8 +684,8 @@ const std::string QUEUE_REF("queueRef");
void BrokerReplicator::doResponseBind(Variant::Map& values) {
std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
- boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+ boost::shared_ptr<Exchange> exchange = exchanges.find(exName);
+ boost::shared_ptr<Queue> queue = queues.find(qName);
// Automatically replicate binding if queue and exchange exist and are replicated
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
@@ -661,7 +730,7 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
new QueueReplicator(haBroker, queue, link));
- if (!broker.getExchanges().registerExchange(qr))
+ if (!exchanges.registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
return qr;
@@ -670,7 +739,7 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
}
void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
- Queue::shared_ptr queue = broker.getQueues().find(name);
+ Queue::shared_ptr queue = queues.find(name);
if (queue) {
// Purge before deleting to ensure that we don't reroute any
// messages. Any reroutes will be done at the primary and
@@ -749,48 +818,6 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
-
-// BrokerReplicator::Cleaner
-
-BrokerReplicator::Cleaner::Cleaner(BrokerReplicator& br) : brokerReplicator(br) {}
-
-void BrokerReplicator::Cleaner::start() {
- queues.clear();
- exchanges.clear();
- brokerReplicator.broker.getExchanges().eachExchange(boost::bind(&Cleaner::addExchange, this, _1));
- brokerReplicator.broker.getQueues().eachQueue(boost::bind(&Cleaner::addQueue, this, _1));
-}
-
-void BrokerReplicator::Cleaner::addExchange(boost::shared_ptr<broker::Exchange> ex) {
- if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *ex))
- exchanges.insert(ex->getName());
-}
-
-void BrokerReplicator::Cleaner::addQueue(boost::shared_ptr<broker::Queue> q) {
- if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *q))
- queues.insert(q->getName());
-}
-
-void BrokerReplicator::Cleaner::forgetExchange(const std::string& name) {
- exchanges.erase(name);
-}
-
-void BrokerReplicator::Cleaner::forgetQueue(const std::string& name) {
- queues.erase(name);
-}
-
-void BrokerReplicator::Cleaner::cleanExchanges() {
- for_each(exchanges.begin(), exchanges.end(),
- boost::bind(&BrokerReplicator::deleteExchange, &brokerReplicator, _1));
- exchanges.clear();
-}
-
-void BrokerReplicator::Cleaner::cleanQueues() {
- for_each(queues.begin(), queues.end(),
- boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, _1, true));
- queues.clear();
-}
-
void BrokerReplicator::autoDeleteCheck(
boost::shared_ptr<Exchange> ex, set<string>& result)
{
@@ -814,7 +841,7 @@ void BrokerReplicator::disconnected() {
connection = 0;
// Clean up auto-delete queues
set<string> deleteQueues;
- broker.getExchanges().eachExchange(boost::bind(&BrokerReplicator::autoDeleteCheck,
+ exchanges.eachExchange(boost::bind(&BrokerReplicator::autoDeleteCheck,
this, _1, boost::ref(deleteQueues)));
// Don't purge before deleting, the primary is gone so we need to
// reroute the deleted messages.
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h
index 4845360631..1a86be63c3 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/cpp/src/qpid/ha/BrokerReplicator.h
@@ -41,6 +41,8 @@ class Link;
class Bridge;
class SessionHandler;
class Connection;
+class QueueRegistry;
+class ExchangeRegistry;
}
namespace framing {
@@ -90,34 +92,7 @@ class BrokerReplicator : public broker::Exchange,
typedef std::map<std::string, QueueReplicatorPtr> QueueReplicatorMap;
-
- /** Keep track of queues and exchanges that need to be cleaned up. */
- class Cleaner {
- public:
- Cleaner(BrokerReplicator&);
-
- /** Scan for existing queues and exchanges. */
- void start();
-
- // Forget a queue/exchange that does not need cleaning
- void forgetExchange(const std::string& name);
- void forgetQueue(const std::string& name);
- // Clean up queues/exchange that are no longer on primary
- void cleanExchanges();
- void cleanQueues();
-
- private:
- typedef std::set<std::string> Names;
-
- // add a queue/exchange that may need cleaning.
- void addExchange(boost::shared_ptr<broker::Exchange>);
- void addQueue(boost::shared_ptr<broker::Queue>);
-
- BrokerReplicator& brokerReplicator;
- Names queues, exchanges;
- };
- friend class Cleaner;
-
+ class UpdateTracker;
class ErrorListener;
class ConnectionObserver;
@@ -166,15 +141,18 @@ class BrokerReplicator : public broker::Exchange,
ReplicationTest replicationTest;
HaBroker& haBroker;
broker::Broker& broker;
+ broker::ExchangeRegistry& exchanges;
+ broker::QueueRegistry& queues;
boost::shared_ptr<broker::Link> link;
bool initialized;
AlternateExchangeSetter alternates;
qpid::Address primary;
typedef std::set<std::string> StringSet;
StringSet replicatedExchanges; // exchanges that have been replicated.
- Cleaner cleaner;
broker::Connection* connection;
EventDispatchMap dispatch;
+ std::auto_ptr<UpdateTracker> queueTracker;
+ std::auto_ptr<UpdateTracker> exchangeTracker;
};
}} // namespace qpid::broker
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index edaced25c1..ba7b4cb638 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -29,7 +29,21 @@ from logging import getLogger, WARN, ERROR, DEBUG, INFO
from qpidtoollibs import BrokerAgent
from uuid import UUID
-class ReplicationTests(BrokerTest):
+log = getLogger(__name__)
+
+def grep(filename, regexp):
+ for line in open(filename).readlines():
+ if (regexp.search(line)): return True
+ return False
+
+class HaBrokerTest(BrokerTest):
+ """Base class for HA broker tests"""
+ def assert_log_no_errors(self, broker):
+ log = broker.get_log()
+ if grep(log, re.compile("] error|] critical")):
+ self.fail("Errors in log file %s"%(log))
+
+class ReplicationTests(HaBrokerTest):
"""Correctness tests for HA replication."""
def test_replication(self):
@@ -774,6 +788,19 @@ acl deny all all
cluster.start()
send_ttl_messages()
+ def test_stale_response(self):
+ """Check for race condition where a stale response is processed after an
+ event for the same queue/exchange """
+ cluster = HaCluster(self, 2)
+ s = cluster[0].connect().session()
+ s.sender("keep;{create:always}") # Leave this queue in place.
+ for i in xrange(100): # FIXME aconway 2012-10-23: ??? IS this an issue?
+ s.sender("deleteme%s;{create:always,delete:always}"%(i)).close()
+ # It is possible for the backup to attempt to subscribe after the queue
+ # is deleted. This is not an error, but is logged as an error on the primary.
+ # The backup does not log this as an error so we only check the backup log for errors.
+ self.assert_log_no_errors(cluster[1])
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit
@@ -808,7 +835,7 @@ def priority_level(value, levels):
offset = 5-math.ceil(levels/2.0)
return min(max(value - offset, 0), levels-1)
-class LongTests(BrokerTest):
+class LongTests(HaBrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -891,7 +918,7 @@ class LongTests(BrokerTest):
if unexpected_dead:
raise Exception("Brokers not running: %s"%unexpected_dead)
-class RecoveryTests(BrokerTest):
+class RecoveryTests(HaBrokerTest):
"""Tests for recovery after a failure."""
def test_queue_hold(self):