diff options
author | Alan Conway <aconway@apache.org> | 2012-11-14 16:04:04 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-11-14 16:04:04 +0000 |
commit | 3b9fdc8e68cb42e6ebfa75f3dc756fd54369f735 (patch) | |
tree | 128d1ac54182753c4d8f2d49ce38a5d88bea7909 | |
parent | bce4ad2c993a34d240b1166ab6321bc14b78c612 (diff) | |
download | qpid-python-3b9fdc8e68cb42e6ebfa75f3dc756fd54369f735.tar.gz |
QPID-4428: HA add UUID tag to avoid using an out of date queue/exchange.
Imagine a cluster with primary A and backups B and C. A queue Q is created on A
and replicated to B, C. Now A dies and B takes over as primary. Before C can
connect to B, a client destroys Q and creates a new queue with the same name.
When B connects it sees Q and incorrectly assumes it is the same Q that it has
already replicated. Now C has an inconsistent replica of Q.
The fix is to tag queues/exchanges with a UUID so a backup can tell if a queue
is not the same as the one it has already replicated, even if the names are the
same. This all also applies to exchanges.
- Minor imrovements to printing UUIDs in a FieldTable.
- Fix comparison of void Variants, added operator !=
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1409241 13f79535-47bb-0310-9956-ffa450edef68
22 files changed, 238 insertions, 90 deletions
diff --git a/qpid/cpp/include/qpid/framing/FieldValue.h b/qpid/cpp/include/qpid/framing/FieldValue.h index 458de62fdf..e964da495a 100644 --- a/qpid/cpp/include/qpid/framing/FieldValue.h +++ b/qpid/cpp/include/qpid/framing/FieldValue.h @@ -175,11 +175,19 @@ class FixedWidthValue : public FieldValue::Data { return v; } uint8_t* rawOctets() { return octets; } - uint8_t* rawOctets() const { return octets; } + const uint8_t* rawOctets() const { return octets; } void print(std::ostream& o) const { o << "F" << width << ":"; }; }; +class UuidData : public FixedWidthValue<16> { + public: + UuidData(); + UuidData(const unsigned char* bytes); + bool convertsToString() const; + std::string getString() const; +}; + template <class T, int W> inline T FieldValue::getIntegerValue() const { @@ -356,7 +364,7 @@ class Var16Value : public FieldValue { class Var32Value : public FieldValue { public: QPID_COMMON_EXTERN Var32Value(const std::string& v, uint8_t code); -}; + }; class Struct32Value : public FieldValue { public: @@ -453,6 +461,7 @@ class ListValue : public FieldValue { class UuidValue : public FieldValue { public: + QPID_COMMON_EXTERN UuidValue(); QPID_COMMON_EXTERN UuidValue(const unsigned char*); }; diff --git a/qpid/cpp/include/qpid/types/Variant.h b/qpid/cpp/include/qpid/types/Variant.h index 3493559777..e6bfd6bc0a 100644 --- a/qpid/cpp/include/qpid/types/Variant.h +++ b/qpid/cpp/include/qpid/types/Variant.h @@ -177,6 +177,7 @@ QPID_TYPES_EXTERN std::ostream& operator<<(std::ostream& out, const Variant& val QPID_TYPES_EXTERN std::ostream& operator<<(std::ostream& out, const Variant::Map& map); QPID_TYPES_EXTERN std::ostream& operator<<(std::ostream& out, const Variant::List& list); QPID_TYPES_EXTERN bool operator==(const Variant& a, const Variant& b); +QPID_TYPES_EXTERN bool operator!=(const Variant& a, const Variant& b); #endif }} // namespace qpid::types diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 7026dc7aa5..96de6998b0 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -1176,29 +1176,12 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( } std::pair<Exchange::shared_ptr, bool> result; - result = exchanges.declare(name, type, durable, arguments); + result = exchanges.declare( + name, type, durable, arguments, alternate, connectionId, userId); if (result.second) { - if (alternate) { - result.first->setAlternate(alternate); - alternate->incAlternateUsers(); - } if (durable) { store->create(*result.first, arguments); } - if (managementAgent.get()) { - //TODO: debatable whether we should raise an event here for - //create when this is a 'declare' event; ideally add a create - //event instead? - managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId, - userId, - name, - type, - alternateExchange, - durable, - false, - ManagementAgent::toMap(arguments), - "created")); - } QPID_LOG_CAT(debug, model, "Create exchange. name:" << name << " user:" << userId << " rhost:" << connectionId @@ -1225,10 +1208,7 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId, if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); if (exchange->isDurable()) store->destroy(*exchange); if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); - exchanges.destroy(name); - - if (managementAgent.get()) - managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name)); + exchanges.destroy(name, connectionId, userId); QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name << " user:" << userId << " rhost:" << connectionId); diff --git a/qpid/cpp/src/qpid/broker/ConfigurationObserver.h b/qpid/cpp/src/qpid/broker/ConfigurationObserver.h index 701043db40..789490e08c 100644 --- a/qpid/cpp/src/qpid/broker/ConfigurationObserver.h +++ b/qpid/cpp/src/qpid/broker/ConfigurationObserver.h @@ -38,6 +38,10 @@ class Exchange; /** * Observer for changes to configuration (aka wiring) + * + * NOTE: create and destroy functions are called with + * the registry lock held. This is necessary to ensure + * they are called in the correct sequence. */ class ConfigurationObserver { diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 12360df81d..9098c75f0b 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -408,5 +408,10 @@ bool Exchange::routeWithAlternate(Deliverable& msg) return msg.delivered; } +void Exchange::setArgs(const framing::FieldTable& newArgs) { + args = newArgs; + if (mgmtExchange) mgmtExchange->set_arguments(ManagementAgent::toMap(args)); +} + }} diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index 517b551a83..8197b64d6b 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -173,8 +173,8 @@ public: const std::string& getName() const { return name; } bool isDurable() { return durable; } - qpid::framing::FieldTable& getArgs() { return args; } const qpid::framing::FieldTable& getArgs() const { return args; } + void setArgs(const framing::FieldTable&); QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; } QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate); diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp index b31c7bd7b8..bc6a20ff9a 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -29,20 +29,26 @@ #include "qpid/management/ManagementDirectExchange.h" #include "qpid/management/ManagementTopicExchange.h" #include "qpid/framing/reply_exceptions.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" using namespace qpid::broker; using namespace qpid::sys; using std::pair; using std::string; using qpid::framing::FieldTable; +using qpid::management::ManagementAgent; +namespace _qmf = qmf::org::apache::qpid::broker; pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type){ return declare(name, type, false, FieldTable()); } -pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type, - bool durable, const FieldTable& args){ +pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare( + const string& name, const string& type, bool durable, const FieldTable& args, + Exchange::shared_ptr alternate, const string& connectionId, const string& userId) +{ Exchange::shared_ptr exchange; std::pair<Exchange::shared_ptr, bool> result; { @@ -73,31 +79,58 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c } exchanges[name] = exchange; result = std::pair<Exchange::shared_ptr, bool>(exchange, true); + if (alternate) { + exchange->setAlternate(alternate); + alternate->incAlternateUsers(); + } + // Call exchangeCreate inside the lock to ensure correct ordering. + if (broker) broker->getConfigurationObservers().exchangeCreate(exchange); } else { result = std::pair<Exchange::shared_ptr, bool>(i->second, false); } + if (broker && broker->getManagementAgent()) { + // Call raiseEvent inside the lock to ensure correct ordering. + broker->getManagementAgent()->raiseEvent( + _qmf::EventExchangeDeclare( + connectionId, + userId, + name, + type, + alternate ? alternate->getName() : string(), + durable, + false, + ManagementAgent::toMap(result.first->getArgs()), + "created")); + } } - if (broker && exchange) broker->getConfigurationObservers().exchangeCreate(exchange); return result; } -void ExchangeRegistry::destroy(const string& name){ +void ExchangeRegistry::destroy( + const string& name, const string& connectionId, const string& userId) +{ if (name.empty() || (name.find("amq.") == 0 && (name == "amq.direct" || name == "amq.fanout" || name == "amq.topic" || name == "amq.match")) || name == "qpid.management") throw framing::NotAllowedException(QPID_MSG("Cannot delete default exchange: '" << name << "'")); - Exchange::shared_ptr exchange; { RWlock::ScopedWlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); if (i != exchanges.end()) { - exchange = i->second; + if (broker) { + // Call exchangeDestroy and raiseEvent inside the lock to ensure + // correct ordering. + broker->getConfigurationObservers().exchangeDestroy(i->second); + if (broker->getManagementAgent()) + broker->getManagementAgent()->raiseEvent( + _qmf::EventExchangeDelete(connectionId, userId, name)); + } i->second->destroy(); exchanges.erase(i); + } } - if (broker && exchange) broker->getConfigurationObservers().exchangeDestroy(exchange); } Exchange::shared_ptr ExchangeRegistry::find(const string& name){ diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h index 27b705fbe5..8db2c34863 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h @@ -46,14 +46,23 @@ class ExchangeRegistry{ bool, const qpid::framing::FieldTable&, qpid::management::Manageable*, qpid::broker::Broker*> FactoryFunction; ExchangeRegistry (Broker* b = 0) : parent(0), broker(b) {} - QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare - (const std::string& name, const std::string& type); - QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare - (const std::string& name, - const std::string& type, - bool durable, - const qpid::framing::FieldTable& args = framing::FieldTable()); - QPID_BROKER_EXTERN void destroy(const std::string& name); + QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare( + const std::string& name, const std::string& type); + + QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare( + const std::string& name, + const std::string& type, + bool durable, + const qpid::framing::FieldTable& args = framing::FieldTable(), + Exchange::shared_ptr alternate = Exchange::shared_ptr(), + const std::string& connectionId = std::string(), + const std::string& userId = std::string()); + + QPID_BROKER_EXTERN void destroy( + const std::string& name, + const std::string& connectionId = std::string(), + const std::string& userId = std::string()); + QPID_BROKER_EXTERN Exchange::shared_ptr getDefault(); /** diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index f1288ae59e..eb72db3a7b 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1169,14 +1169,10 @@ void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::strin { if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { - QPID_LOG(debug, "Auto-deleting " << queue->getName()); - queue->destroyed(); - - if (broker.getManagementAgent()) - broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, queue->getName())); - QPID_LOG_CAT(debug, model, "Delete queue. name:" << queue->getName() + QPID_LOG_CAT(debug, model, "Auto-delete queue: " << queue->getName() << " user:" << userId << " rhost:" << connectionId ); + queue->destroyed(); } } @@ -1598,5 +1594,10 @@ void Queue::UsageBarrier::destroy() while (count) usageLock.wait(); } +void Queue::addArgument(const string& key, const types::Variant& value) { + settings.original.insert(types::Variant::Map::value_type(key, value)); + if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap()); +} + }} diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 3fa8391d46..eecc8ce433 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -145,7 +145,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, mutable qpid::sys::Mutex messageLock; mutable qpid::sys::Mutex ownershipLock; mutable uint64_t persistenceId; - const QueueSettings settings; + QueueSettings settings; qpid::framing::FieldTable encodableSettings; QueueDepth current; QueueBindings bindings; @@ -423,6 +423,10 @@ class Queue : public boost::enable_shared_from_this<Queue>, uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } QPID_BROKER_EXTERN void setDequeueSincePurge(uint32_t value); + + /** Add an argument to be included in management messages about this queue. */ + void addArgument(const std::string& key, const types::Variant& value); + friend class QueueFactory; }; } diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index ed9f01c8b2..b59eb530f0 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -69,23 +69,25 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings, queue->create(); } queues[name] = queue; + // NOTE: raiseEvent and queueCreate must be called with the lock held in + // order to ensure events are generated in the correct order. + // Call queueCreate before raiseEvents so it can add arguments that + // will be included in the management event. + if (getBroker()) getBroker()->getConfigurationObservers().queueCreate(queue); result = std::pair<Queue::shared_ptr, bool>(queue, true); } else { result = std::pair<Queue::shared_ptr, bool>(i->second, false); } - // NOTE: raiseEvent must be called with the lock held in order to - // ensure management events are generated in the correct order. - if (getBroker() && getBroker()->getManagementAgent() && connectionId.size() && userId.size()) { + if (getBroker() && getBroker()->getManagementAgent()) { getBroker()->getManagementAgent()->raiseEvent( _qmf::EventQueueDeclare( connectionId, userId, name, settings.durable, owner, settings.autodelete, alternate ? alternate->getName() : string(), - settings.asMap(), + result.first->getSettings().asMap(), result.second ? "created" : "existing")); } } - if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first); return result; } @@ -99,17 +101,17 @@ void QueueRegistry::destroy( if (i != queues.end()) { q = i->second; queues.erase(i); - if (getBroker() && getBroker()->getManagementAgent() && - connectionId.size() && userId.size()) - { - // NOTE: raiseEvent must be called with the lock held in order to - // ensure management events are generated in the correct order. - getBroker()->getManagementAgent()->raiseEvent( - _qmf::EventQueueDelete(connectionId, userId, name)); + if (getBroker()) { + // NOTE: queueDestroy and raiseEvent must be called with the + // lock held in order to ensure events are generated + // in the correct order. + getBroker()->getConfigurationObservers().queueDestroy(q); + if (getBroker()->getManagementAgent()) + getBroker()->getManagementAgent()->raiseEvent( + _qmf::EventQueueDelete(connectionId, userId, name)); } } } - if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q); } Queue::shared_ptr QueueRegistry::find(const string& name){ diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 0cf55d06e6..0263ff2a58 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -98,17 +98,6 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const //exchange already there, not created checkType(response.first, type); checkAlternate(response.first, alternate); - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), - getConnection().getUserId(), - exchange, - type, - alternateExchange, - durable, - false, - ManagementAgent::toMap(args), - "existing")); QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange << " user:" << getConnection().getUserId() << " rhost:" << getConnection().getUrl() diff --git a/qpid/cpp/src/qpid/framing/FieldValue.cpp b/qpid/cpp/src/qpid/framing/FieldValue.cpp index ce5a50117c..4abed0f77f 100644 --- a/qpid/cpp/src/qpid/framing/FieldValue.cpp +++ b/qpid/cpp/src/qpid/framing/FieldValue.cpp @@ -23,6 +23,7 @@ #include "qpid/framing/Buffer.h" #include "qpid/framing/Endian.h" #include "qpid/framing/List.h" +#include "qpid/framing/Uuid.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/Msg.h" @@ -43,7 +44,9 @@ void FieldValue::setType(uint8_t type) data.reset(new EncodedValue<List>()); } else if (typeOctet == 0xAA) { data.reset(new EncodedValue<Array>()); - } else { + } else if (typeOctet == 0x48) { + data.reset(new UuidData()); + } else { uint8_t lenType = typeOctet >> 4; switch(lenType){ case 0: @@ -213,9 +216,12 @@ Integer8Value::Integer8Value(int8_t v) : Integer16Value::Integer16Value(int16_t v) : FieldValue(0x11, new FixedWidthValue<2>(v)) {} -UuidValue::UuidValue(const unsigned char* v) : - FieldValue(0x48, new FixedWidthValue<16>(v)) -{} + +UuidData::UuidData() {} +UuidData::UuidData(const unsigned char* bytes) : FixedWidthValue<16>(bytes) {} +bool UuidData::convertsToString() const { return true; } +std::string UuidData::getString() const { return Uuid(rawOctets()).str(); } +UuidValue::UuidValue(const unsigned char* v) : FieldValue(0x48, new UuidData(v)) {} void FieldValue::print(std::ostream& out) const { data->print(out); diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 742251536e..6b88111732 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -29,6 +29,7 @@ #include "qpid/broker/Link.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/SessionHandler.h" @@ -280,7 +281,9 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& { broker.getConnectionObservers().add( boost::shared_ptr<broker::ConnectionObserver>(new ConnectionObserver(*this))); - getArgs().setString(QPID_REPLICATE, printable(NONE).str()); + framing::FieldTable args = getArgs(); + args.setString(QPID_REPLICATE, printable(NONE).str()); + setArgs(args); dispatch[EventQueueDeclare::getFullName()] = &BrokerReplicator::doEventQueueDeclare; dispatch[EventQueueDelete::getFullName()] = &BrokerReplicator::doEventQueueDelete; @@ -458,7 +461,8 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { // If we already have a queue with this name, replace it. // The queue was definitely created on the primary. if (queues.find(name)) { - QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); + QPID_LOG(warning, logPrefix << "Declare event, replacing exsiting queue: " + << name); deleteQueue(name); } replicateQueue(name, values[DURABLE].asBool(), autoDel, args, @@ -499,7 +503,8 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { // The exchange was definitely created on the primary. if (exchanges.find(name)) { deleteExchange(name); - QPID_LOG(warning, logPrefix << "Replaced existing exchange: " << name); + QPID_LOG(warning, logPrefix << "Declare event, replacing existing exchange: " + << name); } CreateExchangeResult result = createExchange( name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, @@ -591,8 +596,15 @@ string getAltExchange(const types::Variant& var) { } else return string(); } + +Variant getHaUuid(const Variant::Map& map) { + Variant::Map::const_iterator i = map.find(QPID_HA_UUID); + return i == map.end() ? Variant() : i->second; } +} // namespace + + void BrokerReplicator::doResponseQueue(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicationTest.isReplicated( @@ -606,6 +618,14 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { throw Exception(QPID_MSG("Unexpected queue response: " << values)); if (!queueTracker->response(name)) return; // Response is out-of-date QPID_LOG(debug, logPrefix << "Queue response: " << name); + // If we see a queue with the same name as one we have, but not the same UUID, + // then replace the one we have. + boost::shared_ptr<Queue> queue = queues.find(name); + if (queue && getHaUuid(queue->getSettings().original) != getHaUuid(argsMap)) { + QPID_LOG(warning, logPrefix << "UUID mismatch, replacing queue: " + << name); + deleteQueue(name); + } framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); boost::shared_ptr<QueueReplicator> qr = replicateQueue( @@ -629,6 +649,16 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { QPID_LOG(debug, logPrefix << "Exchange response: " << name); framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); + // If we see an exchange with the same name as one we have, but not the same UUID, + // then replace the one we have. + boost::shared_ptr<Exchange> exchange = exchanges.find(name); + if (exchange && + exchange->getArgs().getAsString(QPID_HA_UUID) != args.getAsString(QPID_HA_UUID)) + { + QPID_LOG(warning, logPrefix << "UUID mismatch, replacing exchange: " + << name); + deleteExchange(name); + } CreateExchangeResult result = createExchange( name, values[TYPE].asString(), values[DURABLE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index bdb1a66a83..6d5d68191b 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -31,6 +31,8 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include "qpid/sys/Timer.h" #include <boost/bind.hpp> @@ -39,6 +41,8 @@ namespace qpid { namespace ha { using sys::Mutex; +using namespace std; +using namespace framing; namespace { @@ -58,6 +62,8 @@ class PrimaryConfigurationObserver : public broker::ConfigurationObserver PrimaryConfigurationObserver(Primary& p) : primary(p) {} void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); } void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); } + void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); } + void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); } private: Primary& primary; }; @@ -178,9 +184,12 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) { } } +// NOTE: Called with queue registry lock held. void Primary::queueCreate(const QueuePtr& q) { - // Throw if there is an invalid replication level in the queue settings. - haBroker.getReplicationTest().replicateLevel(q->getSettings().storeSettings); + if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *q)) { + // Give each queue a unique id to avoid confusion of same-named queues. + q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true))); + } Mutex::ScopedLock l(lock); for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) { i->second->queueCreate(q); @@ -188,6 +197,7 @@ void Primary::queueCreate(const QueuePtr& q) { } } +// NOTE: Called with queue registry lock held. void Primary::queueDestroy(const QueuePtr& q) { Mutex::ScopedLock l(lock); for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) @@ -195,6 +205,21 @@ void Primary::queueDestroy(const QueuePtr& q) { checkReady(l); } +// NOTE: Called with exchange registry lock held. +void Primary::exchangeCreate(const ExchangePtr& ex) { + if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *ex)) { + // Give each exchange a unique id to avoid confusion of same-named exchanges. + FieldTable args = ex->getArgs(); + args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0]))); + ex->setArgs(args); + } +} + +// NOTE: Called with exchange registry lock held. +void Primary::exchangeDestroy(const ExchangePtr&) { + // Do nothing + } + void Primary::opened(broker::Connection& connection) { BrokerInfo info; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 22b231ed72..c713115176 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -60,6 +60,7 @@ class Primary { public: typedef boost::shared_ptr<broker::Queue> QueuePtr; + typedef boost::shared_ptr<broker::Exchange> ExchangePtr; static Primary* get() { return instance; } @@ -72,6 +73,8 @@ class Primary // Called via ConfigurationObserver void queueCreate(const QueuePtr&); void queueDestroy(const QueuePtr&); + void exchangeCreate(const ExchangePtr&); + void exchangeDestroy(const ExchangePtr&); // Called via ConnectionObserver void opened(broker::Connection& connection); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index ea76763425..1f14ce4669 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -49,9 +49,8 @@ using namespace framing; using namespace std; using sys::Mutex; -const std::string QPID_HA_EVENT_PREFIX("qpid.ha-"); -const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue"); -const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position"); +const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_PREFIX+"dequeue"); +const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_PREFIX+"position"); const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency"); std::string QueueReplicator::replicatorName(const std::string& queueName) { @@ -63,7 +62,7 @@ bool QueueReplicator::isReplicatorName(const std::string& name) { } bool QueueReplicator::isEventKey(const std::string key) { - const std::string& prefix = QPID_HA_EVENT_PREFIX; + const std::string& prefix = QPID_HA_PREFIX; bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0; return ret; } @@ -114,7 +113,9 @@ QueueReplicator::QueueReplicator(HaBroker& hb, args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); - getArgs().setString(QPID_REPLICATE, printable(NONE).str()); + framing::FieldTable args = getArgs(); + args.setString(QPID_REPLICATE, printable(NONE).str()); + setArgs(args); } // This must be separate from the constructor so we can call shared_from_this. diff --git a/qpid/cpp/src/qpid/ha/types.cpp b/qpid/cpp/src/qpid/ha/types.cpp index 53e2056213..57712762ab 100644 --- a/qpid/cpp/src/qpid/ha/types.cpp +++ b/qpid/cpp/src/qpid/ha/types.cpp @@ -33,6 +33,8 @@ namespace ha { using namespace std; const string QPID_REPLICATE("qpid.replicate"); +const string QPID_HA_PREFIX("qpid.ha-"); +const string QPID_HA_UUID(QPID_HA_PREFIX+"uuid"); string EnumBase::str() const { assert(value < count); diff --git a/qpid/cpp/src/qpid/ha/types.h b/qpid/cpp/src/qpid/ha/types.h index 35faf9f624..d1afbf1190 100644 --- a/qpid/cpp/src/qpid/ha/types.h +++ b/qpid/cpp/src/qpid/ha/types.h @@ -99,6 +99,8 @@ inline bool isBackup(BrokerStatus s) { return !isPrimary(s); } // String constants. extern const std::string QPID_REPLICATE; +extern const std::string QPID_HA_PREFIX; +extern const std::string QPID_HA_UUID; /** Define IdSet type, not a typedef so we can overload operator << */ class IdSet : public std::set<types::Uuid> {}; diff --git a/qpid/cpp/src/qpid/types/Variant.cpp b/qpid/cpp/src/qpid/types/Variant.cpp index d332fffa5e..9b981c9171 100644 --- a/qpid/cpp/src/qpid/types/Variant.cpp +++ b/qpid/cpp/src/qpid/types/Variant.cpp @@ -650,7 +650,7 @@ VariantImpl* VariantImpl::create(const Variant& v) } } -Variant::Variant() : impl(0) {} +Variant::Variant() : impl(new VariantImpl()) {} Variant::Variant(bool b) : impl(new VariantImpl(b)) {} Variant::Variant(uint8_t i) : impl(new VariantImpl(i)) {} Variant::Variant(uint16_t i) : impl(new VariantImpl(i)) {} @@ -893,6 +893,8 @@ bool operator==(const Variant& a, const Variant& b) return a.isEqualTo(b); } +bool operator!=(const Variant& a, const Variant& b) { return !(a == b); } + bool Variant::isEqualTo(const Variant& other) const { return impl && impl->isEqualTo(*other.impl); diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 9eebfa952f..d7885d9622 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -142,7 +142,9 @@ class HaBroker(Broker): # FIXME aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): assert subprocess.call( - [self.qpid_config_path, "--broker", self.host_port()]+args) == 0 + [self.qpid_config_path, "--broker", self.host_port()]+args, + stdout=1, stderr=subprocess.STDOUT + ) == 0 def config_replicate(self, from_broker, queue): self.qpid_config(["add", "queue", "--start-replica", from_broker, queue]) @@ -160,12 +162,14 @@ class HaBroker(Broker): else: return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) - def wait_backup(self, address): - """Wait for address to become valid on a backup broker.""" + def wait_address(self, address): + """Wait for address to become valid on the broker.""" bs = self.connect_admin().session() try: wait_address(bs, address) finally: bs.connection.close() + def wait_backup(self, address): self.wait_address(address) + def assert_browse(self, queue, expected, **kwargs): """Verify queue contents by browsing.""" bs = self.connect().session() diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 60c68730f8..d25e68b29c 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -270,6 +270,7 @@ class ReplicationTests(HaBrokerTest): def test_qpid_config_replication(self): """Set up replication via qpid-config""" brokers = HaCluster(self,2) + brokers[0].wait_status("active") brokers[0].config_declare("q","all") brokers[0].connect().session().sender("q").send("foo") brokers[1].assert_browse_backup("q", ["foo"]) @@ -830,6 +831,41 @@ acl deny all all verify_qmf_events("q2") finally: l.restore() + def test_missed_recreate(self): + """If a queue or exchange is destroyed and one with the same name re-created + while a backup is disconnected, the backup should also delete/recreate + the object when it re-connects""" + cluster = HaCluster(self, 3) + sn = cluster[0].connect().session() + # Create a queue with messages + s = sn.sender("qq;{create:always}") + msgs = [str(i) for i in xrange(3)] + for m in msgs: s.send(m) + cluster[1].assert_browse_backup("qq", msgs) + cluster[2].assert_browse_backup("qq", msgs) + # Set up an exchange with a binding. + sn.sender("xx;{create:always,node:{type:topic}}") + sn.sender("xxq;{create:always,node:{x-bindings:[{exchange:'xx',queue:'xxq',key:xxq}]}}") + cluster[1].wait_address("xx") + self.assertEqual(cluster[1].agent().getExchange("xx").values["bindingCount"], 1) + cluster[2].wait_address("xx") + self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 1) + + # Simulate the race by re-creating the objects before promoting the new primary + cluster.kill(0, False) + sn = cluster[1].connect_admin().session() + sn.sender("qq;{delete:always}").close() + s = sn.sender("qq;{create:always}") + s.send("foo") + sn.sender("xx;{delete:always}").close() + sn.sender("xx;{create:always,node:{type:topic}}") + cluster[1].promote() + cluster[1].wait_status("active") + # Verify we are not still using the old objects on cluster[2] + cluster[2].assert_browse_backup("qq", ["foo"]) + cluster[2].wait_address("xx") + self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 0) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit |