diff options
author | Alan Conway <aconway@apache.org> | 2013-03-07 17:25:47 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-03-07 17:25:47 +0000 |
commit | 6129886499678fbab1111b6e7422b77b6f62e143 (patch) | |
tree | 95cec06ad2139b0beb9dd7146950408a1b824c88 | |
parent | 74f4b51ba0cc95529b054e450cd6327f91520b6b (diff) | |
download | qpid-python-6129886499678fbab1111b6e7422b77b6f62e143.tar.gz |
QPID-4630: HA Fix starting from persistent store.
This was implemented in r1390123 but broken by subsequent changes.
When re-starting a persistent HA cluster, the broker that becomes primary keeps
its recovered queues while backup brokers discard their recovered queues and
catch up from the primary.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1453971 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/include/qpid/amqp_0_10/Codecs.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 23 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 1 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_store_tests.py | 24 |
7 files changed, 42 insertions, 32 deletions
diff --git a/qpid/cpp/include/qpid/amqp_0_10/Codecs.h b/qpid/cpp/include/qpid/amqp_0_10/Codecs.h index d632a9f20a..99a06e0a32 100644 --- a/qpid/cpp/include/qpid/amqp_0_10/Codecs.h +++ b/qpid/cpp/include/qpid/amqp_0_10/Codecs.h @@ -24,10 +24,12 @@ #include "qpid/CommonImportExport.h" #include "qpid/types/Variant.h" +#include "boost/shared_ptr.hpp" namespace qpid { namespace framing { class FieldTable; +class FieldValue; } namespace amqp_0_10 { /** @@ -74,6 +76,8 @@ QPID_COMMON_EXTERN void translate(const qpid::types::Variant::Map& from, const s QPID_COMMON_EXTERN void translate(const qpid::framing::FieldTable& from, qpid::types::Variant::Map& to); +QPID_COMMON_EXTERN boost::shared_ptr<framing::FieldValue> fieldValue(const types::Variant&); + }} // namespace qpid::amqp_0_10 #endif /*!QPID_AMQP_0_10_CODECS_H*/ diff --git a/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp b/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp index 1288652ee1..e7637b16ad 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp @@ -527,6 +527,15 @@ void translate(const FieldTable& from, Variant::Map& to) convert(from, to, &toVariantMapEntry); } +boost::shared_ptr<framing::FieldValue> fieldValue(const types::Variant& value) { + // TODO aconway 2013-03-05: Nasty implementation + Variant::Map m; + m[std::string()] = value; + framing::FieldTable f; + translate(m, f); + return f.get(std::string()); +} + const std::string ListCodec::contentType("amqp/list"); const std::string MapCodec::contentType("amqp/map"); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 81e26d0b48..91b497c57b 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1587,7 +1587,9 @@ void Queue::UsageBarrier::destroy() } void Queue::addArgument(const string& key, const types::Variant& value) { - settings.original.insert(types::Variant::Map::value_type(key, value)); + settings.original[key] = value; + qpid::amqp_0_10::translate(settings.asMap(), encodableSettings); + settings.storeSettings.set(key, qpid::amqp_0_10::fieldValue(value)); if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap()); } diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index 576d0f198b..13f9bbe23c 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -59,6 +59,8 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings, QueueMap::iterator i = queues.find(name); if (i == queues.end()) { Queue::shared_ptr queue = create(name, settings); + // Allow ConfigurationObserver to modify settings before storing the message. + if (getBroker()) getBroker()->getConfigurationObservers().queueCreate(queue); //Move this to factory also? if (alternate) queue->setAlternateExchange(alternate);//need to do this *before* create @@ -67,16 +69,11 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings, queue->create(); } queues[name] = queue; - // NOTE: raiseEvent and queueCreate must be called with the lock held in - // order to ensure events are generated in the correct order. - // Call queueCreate before raiseEvents so it can add arguments that - // will be included in the management event. - if (getBroker()) getBroker()->getConfigurationObservers().queueCreate(queue); result = std::pair<Queue::shared_ptr, bool>(queue, true); } else { result = std::pair<Queue::shared_ptr, bool>(i->second, false); } - if (getBroker() && getBroker()->getManagementAgent()) { + if (getBroker() && getBroker()->getManagementAgent()) { getBroker()->getManagementAgent()->raiseEvent( _qmf::EventQueueDeclare( connectionId, userId, name, diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 7c0fad9b60..42e54a0125 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -234,7 +234,10 @@ class BrokerReplicator::UpdateTracker { /** Destructor cleans up remaining initial queues. */ ~UpdateTracker() { // Don't throw in a destructor. - try { for_each(initial.begin(), initial.end(), cleanFn); } + try { + for_each(initial.begin(), initial.end(), + boost::bind(&UpdateTracker::clean, this, _1)); + } catch (const std::exception& e) { QPID_LOG(error, "Error in cleanup of lost objects: " << e.what()); } @@ -269,7 +272,8 @@ class BrokerReplicator::UpdateTracker { private: void clean(const std::string& name) { - QPID_LOG(info, "Backup updated, deleting " << type << " " << name); + QPID_LOG(info, "Backup: Deleted " << type << " " << name << + ": no longer exists on primary"); cleanFn(name); } @@ -531,7 +535,6 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { CreateExchangeResult result = createExchange( name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString()); - replicatedExchanges.insert(name); assert(result.second); } } @@ -547,7 +550,6 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); if (exchangeTracker.get()) exchangeTracker->event(name); deleteExchange(name); - replicatedExchanges.erase(name); } } @@ -638,12 +640,12 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { throw Exception(QPID_MSG("Unexpected queue response: " << values)); if (!queueTracker->response(name)) return; // Response is out-of-date QPID_LOG(debug, logPrefix << "Queue response: " << name); - // If we see a queue with the same name as one we have, but not the same UUID, - // then replace the one we have. boost::shared_ptr<Queue> queue = queues.find(name); - if (queue && getHaUuid(queue->getSettings().original) != getHaUuid(argsMap)) { - QPID_LOG(warning, logPrefix << "UUID mismatch, replacing queue: " - << name); + if (queue) { // Already exists + bool uuidOk = (getHaUuid(queue->getSettings().original) == getHaUuid(argsMap)); + if (!uuidOk) QPID_LOG(debug, logPrefix << "UUID mismatch for queue: " << name); + if (uuidOk && findQueueReplicator(name)) return; // already replicated, UUID OK. + QPID_LOG(debug, logPrefix << "Queue response replacing queue: " << name); deleteQueue(name); } framing::FieldTable args; @@ -669,7 +671,7 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { QPID_LOG(debug, logPrefix << "Exchange response: " << name); framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); - // If we see an exchange with the same name as one we have, but not the same UUID, + // If we see an exchange with the same name as one we have, but a different UUID, // then replace the one we have. boost::shared_ptr<Exchange> exchange = exchanges.find(name); if (exchange && @@ -682,7 +684,6 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { CreateExchangeResult result = createExchange( name, values[TYPE].asString(), values[DURABLE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); - replicatedExchanges.insert(name); } namespace { diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index f64c54abe8..03df5fec6b 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -153,7 +153,6 @@ class BrokerReplicator : public broker::Exchange, AlternateExchangeSetter alternates; qpid::Address primary; typedef std::set<std::string> StringSet; - StringSet replicatedExchanges; // exchanges that have been replicated. broker::Connection* connection; EventDispatchMap dispatch; std::auto_ptr<UpdateTracker> queueTracker; diff --git a/qpid/cpp/src/tests/ha_store_tests.py b/qpid/cpp/src/tests/ha_store_tests.py index 16bf6e2964..21d403a46e 100755 --- a/qpid/cpp/src/tests/ha_store_tests.py +++ b/qpid/cpp/src/tests/ha_store_tests.py @@ -40,13 +40,12 @@ class StoreTests(BrokerTest): def test_store_recovery(self): """Verify basic store and recover functionality""" - cluster = HaCluster(self, 2) + cluster = HaCluster(self, 1) sn = cluster[0].connect().session() + # Create queue qq, exchange exx and binding between them s = sn.sender("qq;{create:always,node:{durable:true}}") - sk = sn.sender("xx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:xx,key:k,queue:qq}]}}") - s.send(Message("foo", durable=True)) - s.send(Message("bar", durable=True)) - sk.send(Message("baz", durable=True)) + sk = sn.sender("exx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:exx,key:k,queue:qq}]}}") + for m in ["foo", "bar", "baz"]: s.send(Message(m, durable=True)) r = cluster[0].connect().session().receiver("qq") self.assertEqual(r.fetch().content, "foo") r.session.acknowledge() @@ -57,16 +56,16 @@ class StoreTests(BrokerTest): def verify(broker, x_count): sn = broker.connect().session() assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count)*["x"]) - sn.sender("xx/k").send(Message("x", durable=True)) + sn.sender("exx/k").send(Message("x", durable=True)) assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count+1)*["x"]) - verify(cluster[0], 0) - cluster.bounce(0, promote_next=False) - cluster[0].promote() + verify(cluster[0], 0) # Sanity check + cluster.bounce(0) cluster[0].wait_status("active") - verify(cluster[0], 1) - cluster.kill(0, promote_next=False) - cluster[1].promote() + verify(cluster[0], 1) # Loaded from store + cluster.start() + cluster[1].wait_status("ready") + cluster.kill(0) cluster[1].wait_status("active") verify(cluster[1], 2) cluster.bounce(1, promote_next=False) @@ -90,7 +89,6 @@ class StoreTests(BrokerTest): # Make changes that the backup doesn't see cluster.kill(1, promote_next=False) - time.sleep(1) # FIXME aconway 2012-09-25: r1 = cluster[0].connect().session().receiver("q1") for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m) r1.session.acknowledge() |