summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-11-14 16:04:04 +0000
committerAlan Conway <aconway@apache.org>2012-11-14 16:04:04 +0000
commit3b9fdc8e68cb42e6ebfa75f3dc756fd54369f735 (patch)
tree128d1ac54182753c4d8f2d49ce38a5d88bea7909
parentbce4ad2c993a34d240b1166ab6321bc14b78c612 (diff)
downloadqpid-python-3b9fdc8e68cb42e6ebfa75f3dc756fd54369f735.tar.gz
QPID-4428: HA add UUID tag to avoid using an out of date queue/exchange.
Imagine a cluster with primary A and backups B and C. A queue Q is created on A and replicated to B, C. Now A dies and B takes over as primary. Before C can connect to B, a client destroys Q and creates a new queue with the same name. When B connects it sees Q and incorrectly assumes it is the same Q that it has already replicated. Now C has an inconsistent replica of Q. The fix is to tag queues/exchanges with a UUID so a backup can tell if a queue is not the same as the one it has already replicated, even if the names are the same. This all also applies to exchanges. - Minor imrovements to printing UUIDs in a FieldTable. - Fix comparison of void Variants, added operator != git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1409241 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/framing/FieldValue.h13
-rw-r--r--qpid/cpp/include/qpid/types/Variant.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp26
-rw-r--r--qpid/cpp/src/qpid/broker/ConfigurationObserver.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.h2
-rw-r--r--qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp47
-rw-r--r--qpid/cpp/src/qpid/broker/ExchangeRegistry.h25
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp13
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h6
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.cpp28
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp11
-rw-r--r--qpid/cpp/src/qpid/framing/FieldValue.cpp14
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp36
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp29
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h3
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp11
-rw-r--r--qpid/cpp/src/qpid/ha/types.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/types.h2
-rw-r--r--qpid/cpp/src/qpid/types/Variant.cpp4
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py10
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py36
22 files changed, 238 insertions, 90 deletions
diff --git a/qpid/cpp/include/qpid/framing/FieldValue.h b/qpid/cpp/include/qpid/framing/FieldValue.h
index 458de62fdf..e964da495a 100644
--- a/qpid/cpp/include/qpid/framing/FieldValue.h
+++ b/qpid/cpp/include/qpid/framing/FieldValue.h
@@ -175,11 +175,19 @@ class FixedWidthValue : public FieldValue::Data {
return v;
}
uint8_t* rawOctets() { return octets; }
- uint8_t* rawOctets() const { return octets; }
+ const uint8_t* rawOctets() const { return octets; }
void print(std::ostream& o) const { o << "F" << width << ":"; };
};
+class UuidData : public FixedWidthValue<16> {
+ public:
+ UuidData();
+ UuidData(const unsigned char* bytes);
+ bool convertsToString() const;
+ std::string getString() const;
+};
+
template <class T, int W>
inline T FieldValue::getIntegerValue() const
{
@@ -356,7 +364,7 @@ class Var16Value : public FieldValue {
class Var32Value : public FieldValue {
public:
QPID_COMMON_EXTERN Var32Value(const std::string& v, uint8_t code);
-};
+ };
class Struct32Value : public FieldValue {
public:
@@ -453,6 +461,7 @@ class ListValue : public FieldValue {
class UuidValue : public FieldValue {
public:
+ QPID_COMMON_EXTERN UuidValue();
QPID_COMMON_EXTERN UuidValue(const unsigned char*);
};
diff --git a/qpid/cpp/include/qpid/types/Variant.h b/qpid/cpp/include/qpid/types/Variant.h
index 3493559777..e6bfd6bc0a 100644
--- a/qpid/cpp/include/qpid/types/Variant.h
+++ b/qpid/cpp/include/qpid/types/Variant.h
@@ -177,6 +177,7 @@ QPID_TYPES_EXTERN std::ostream& operator<<(std::ostream& out, const Variant& val
QPID_TYPES_EXTERN std::ostream& operator<<(std::ostream& out, const Variant::Map& map);
QPID_TYPES_EXTERN std::ostream& operator<<(std::ostream& out, const Variant::List& list);
QPID_TYPES_EXTERN bool operator==(const Variant& a, const Variant& b);
+QPID_TYPES_EXTERN bool operator!=(const Variant& a, const Variant& b);
#endif
}} // namespace qpid::types
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 7026dc7aa5..96de6998b0 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -1176,29 +1176,12 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
}
std::pair<Exchange::shared_ptr, bool> result;
- result = exchanges.declare(name, type, durable, arguments);
+ result = exchanges.declare(
+ name, type, durable, arguments, alternate, connectionId, userId);
if (result.second) {
- if (alternate) {
- result.first->setAlternate(alternate);
- alternate->incAlternateUsers();
- }
if (durable) {
store->create(*result.first, arguments);
}
- if (managementAgent.get()) {
- //TODO: debatable whether we should raise an event here for
- //create when this is a 'declare' event; ideally add a create
- //event instead?
- managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId,
- userId,
- name,
- type,
- alternateExchange,
- durable,
- false,
- ManagementAgent::toMap(arguments),
- "created"));
- }
QPID_LOG_CAT(debug, model, "Create exchange. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId
@@ -1225,10 +1208,7 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId,
if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
if (exchange->isDurable()) store->destroy(*exchange);
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
- exchanges.destroy(name);
-
- if (managementAgent.get())
- managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
+ exchanges.destroy(name, connectionId, userId);
QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId);
diff --git a/qpid/cpp/src/qpid/broker/ConfigurationObserver.h b/qpid/cpp/src/qpid/broker/ConfigurationObserver.h
index 701043db40..789490e08c 100644
--- a/qpid/cpp/src/qpid/broker/ConfigurationObserver.h
+++ b/qpid/cpp/src/qpid/broker/ConfigurationObserver.h
@@ -38,6 +38,10 @@ class Exchange;
/**
* Observer for changes to configuration (aka wiring)
+ *
+ * NOTE: create and destroy functions are called with
+ * the registry lock held. This is necessary to ensure
+ * they are called in the correct sequence.
*/
class ConfigurationObserver
{
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index 12360df81d..9098c75f0b 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -408,5 +408,10 @@ bool Exchange::routeWithAlternate(Deliverable& msg)
return msg.delivered;
}
+void Exchange::setArgs(const framing::FieldTable& newArgs) {
+ args = newArgs;
+ if (mgmtExchange) mgmtExchange->set_arguments(ManagementAgent::toMap(args));
+}
+
}}
diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h
index 517b551a83..8197b64d6b 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.h
+++ b/qpid/cpp/src/qpid/broker/Exchange.h
@@ -173,8 +173,8 @@ public:
const std::string& getName() const { return name; }
bool isDurable() { return durable; }
- qpid::framing::FieldTable& getArgs() { return args; }
const qpid::framing::FieldTable& getArgs() const { return args; }
+ void setArgs(const framing::FieldTable&);
QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; }
QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate);
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
index b31c7bd7b8..bc6a20ff9a 100644
--- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -29,20 +29,26 @@
#include "qpid/management/ManagementDirectExchange.h"
#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
using namespace qpid::broker;
using namespace qpid::sys;
using std::pair;
using std::string;
using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+namespace _qmf = qmf::org::apache::qpid::broker;
pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type){
return declare(name, type, false, FieldTable());
}
-pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
- bool durable, const FieldTable& args){
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(
+ const string& name, const string& type, bool durable, const FieldTable& args,
+ Exchange::shared_ptr alternate, const string& connectionId, const string& userId)
+{
Exchange::shared_ptr exchange;
std::pair<Exchange::shared_ptr, bool> result;
{
@@ -73,31 +79,58 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
}
exchanges[name] = exchange;
result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
+ if (alternate) {
+ exchange->setAlternate(alternate);
+ alternate->incAlternateUsers();
+ }
+ // Call exchangeCreate inside the lock to ensure correct ordering.
+ if (broker) broker->getConfigurationObservers().exchangeCreate(exchange);
} else {
result = std::pair<Exchange::shared_ptr, bool>(i->second, false);
}
+ if (broker && broker->getManagementAgent()) {
+ // Call raiseEvent inside the lock to ensure correct ordering.
+ broker->getManagementAgent()->raiseEvent(
+ _qmf::EventExchangeDeclare(
+ connectionId,
+ userId,
+ name,
+ type,
+ alternate ? alternate->getName() : string(),
+ durable,
+ false,
+ ManagementAgent::toMap(result.first->getArgs()),
+ "created"));
+ }
}
- if (broker && exchange) broker->getConfigurationObservers().exchangeCreate(exchange);
return result;
}
-void ExchangeRegistry::destroy(const string& name){
+void ExchangeRegistry::destroy(
+ const string& name, const string& connectionId, const string& userId)
+{
if (name.empty() ||
(name.find("amq.") == 0 &&
(name == "amq.direct" || name == "amq.fanout" || name == "amq.topic" || name == "amq.match")) ||
name == "qpid.management")
throw framing::NotAllowedException(QPID_MSG("Cannot delete default exchange: '" << name << "'"));
- Exchange::shared_ptr exchange;
{
RWlock::ScopedWlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i != exchanges.end()) {
- exchange = i->second;
+ if (broker) {
+ // Call exchangeDestroy and raiseEvent inside the lock to ensure
+ // correct ordering.
+ broker->getConfigurationObservers().exchangeDestroy(i->second);
+ if (broker->getManagementAgent())
+ broker->getManagementAgent()->raiseEvent(
+ _qmf::EventExchangeDelete(connectionId, userId, name));
+ }
i->second->destroy();
exchanges.erase(i);
+
}
}
- if (broker && exchange) broker->getConfigurationObservers().exchangeDestroy(exchange);
}
Exchange::shared_ptr ExchangeRegistry::find(const string& name){
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
index 27b705fbe5..8db2c34863 100644
--- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -46,14 +46,23 @@ class ExchangeRegistry{
bool, const qpid::framing::FieldTable&, qpid::management::Manageable*, qpid::broker::Broker*> FactoryFunction;
ExchangeRegistry (Broker* b = 0) : parent(0), broker(b) {}
- QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare
- (const std::string& name, const std::string& type);
- QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare
- (const std::string& name,
- const std::string& type,
- bool durable,
- const qpid::framing::FieldTable& args = framing::FieldTable());
- QPID_BROKER_EXTERN void destroy(const std::string& name);
+ QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare(
+ const std::string& name, const std::string& type);
+
+ QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare(
+ const std::string& name,
+ const std::string& type,
+ bool durable,
+ const qpid::framing::FieldTable& args = framing::FieldTable(),
+ Exchange::shared_ptr alternate = Exchange::shared_ptr(),
+ const std::string& connectionId = std::string(),
+ const std::string& userId = std::string());
+
+ QPID_BROKER_EXTERN void destroy(
+ const std::string& name,
+ const std::string& connectionId = std::string(),
+ const std::string& userId = std::string());
+
QPID_BROKER_EXTERN Exchange::shared_ptr getDefault();
/**
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index f1288ae59e..eb72db3a7b 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1169,14 +1169,10 @@ void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::strin
{
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
- QPID_LOG(debug, "Auto-deleting " << queue->getName());
- queue->destroyed();
-
- if (broker.getManagementAgent())
- broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, queue->getName()));
- QPID_LOG_CAT(debug, model, "Delete queue. name:" << queue->getName()
+ QPID_LOG_CAT(debug, model, "Auto-delete queue: " << queue->getName()
<< " user:" << userId
<< " rhost:" << connectionId );
+ queue->destroyed();
}
}
@@ -1598,5 +1594,10 @@ void Queue::UsageBarrier::destroy()
while (count) usageLock.wait();
}
+void Queue::addArgument(const string& key, const types::Variant& value) {
+ settings.original.insert(types::Variant::Map::value_type(key, value));
+ if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap());
+}
+
}}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 3fa8391d46..eecc8ce433 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -145,7 +145,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
mutable qpid::sys::Mutex messageLock;
mutable qpid::sys::Mutex ownershipLock;
mutable uint64_t persistenceId;
- const QueueSettings settings;
+ QueueSettings settings;
qpid::framing::FieldTable encodableSettings;
QueueDepth current;
QueueBindings bindings;
@@ -423,6 +423,10 @@ class Queue : public boost::enable_shared_from_this<Queue>,
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
QPID_BROKER_EXTERN void setDequeueSincePurge(uint32_t value);
+
+ /** Add an argument to be included in management messages about this queue. */
+ void addArgument(const std::string& key, const types::Variant& value);
+
friend class QueueFactory;
};
}
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index ed9f01c8b2..b59eb530f0 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -69,23 +69,25 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings,
queue->create();
}
queues[name] = queue;
+ // NOTE: raiseEvent and queueCreate must be called with the lock held in
+ // order to ensure events are generated in the correct order.
+ // Call queueCreate before raiseEvents so it can add arguments that
+ // will be included in the management event.
+ if (getBroker()) getBroker()->getConfigurationObservers().queueCreate(queue);
result = std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
result = std::pair<Queue::shared_ptr, bool>(i->second, false);
}
- // NOTE: raiseEvent must be called with the lock held in order to
- // ensure management events are generated in the correct order.
- if (getBroker() && getBroker()->getManagementAgent() && connectionId.size() && userId.size()) {
+ if (getBroker() && getBroker()->getManagementAgent()) {
getBroker()->getManagementAgent()->raiseEvent(
_qmf::EventQueueDeclare(
connectionId, userId, name,
settings.durable, owner, settings.autodelete,
alternate ? alternate->getName() : string(),
- settings.asMap(),
+ result.first->getSettings().asMap(),
result.second ? "created" : "existing"));
}
}
- if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first);
return result;
}
@@ -99,17 +101,17 @@ void QueueRegistry::destroy(
if (i != queues.end()) {
q = i->second;
queues.erase(i);
- if (getBroker() && getBroker()->getManagementAgent() &&
- connectionId.size() && userId.size())
- {
- // NOTE: raiseEvent must be called with the lock held in order to
- // ensure management events are generated in the correct order.
- getBroker()->getManagementAgent()->raiseEvent(
- _qmf::EventQueueDelete(connectionId, userId, name));
+ if (getBroker()) {
+ // NOTE: queueDestroy and raiseEvent must be called with the
+ // lock held in order to ensure events are generated
+ // in the correct order.
+ getBroker()->getConfigurationObservers().queueDestroy(q);
+ if (getBroker()->getManagementAgent())
+ getBroker()->getManagementAgent()->raiseEvent(
+ _qmf::EventQueueDelete(connectionId, userId, name));
}
}
}
- if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q);
}
Queue::shared_ptr QueueRegistry::find(const string& name){
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 0cf55d06e6..0263ff2a58 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -98,17 +98,6 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
//exchange already there, not created
checkType(response.first, type);
checkAlternate(response.first, alternate);
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(),
- getConnection().getUserId(),
- exchange,
- type,
- alternateExchange,
- durable,
- false,
- ManagementAgent::toMap(args),
- "existing"));
QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange
<< " user:" << getConnection().getUserId()
<< " rhost:" << getConnection().getUrl()
diff --git a/qpid/cpp/src/qpid/framing/FieldValue.cpp b/qpid/cpp/src/qpid/framing/FieldValue.cpp
index ce5a50117c..4abed0f77f 100644
--- a/qpid/cpp/src/qpid/framing/FieldValue.cpp
+++ b/qpid/cpp/src/qpid/framing/FieldValue.cpp
@@ -23,6 +23,7 @@
#include "qpid/framing/Buffer.h"
#include "qpid/framing/Endian.h"
#include "qpid/framing/List.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/Msg.h"
@@ -43,7 +44,9 @@ void FieldValue::setType(uint8_t type)
data.reset(new EncodedValue<List>());
} else if (typeOctet == 0xAA) {
data.reset(new EncodedValue<Array>());
- } else {
+ } else if (typeOctet == 0x48) {
+ data.reset(new UuidData());
+ } else {
uint8_t lenType = typeOctet >> 4;
switch(lenType){
case 0:
@@ -213,9 +216,12 @@ Integer8Value::Integer8Value(int8_t v) :
Integer16Value::Integer16Value(int16_t v) :
FieldValue(0x11, new FixedWidthValue<2>(v))
{}
-UuidValue::UuidValue(const unsigned char* v) :
- FieldValue(0x48, new FixedWidthValue<16>(v))
-{}
+
+UuidData::UuidData() {}
+UuidData::UuidData(const unsigned char* bytes) : FixedWidthValue<16>(bytes) {}
+bool UuidData::convertsToString() const { return true; }
+std::string UuidData::getString() const { return Uuid(rawOctets()).str(); }
+UuidValue::UuidValue(const unsigned char* v) : FieldValue(0x48, new UuidData(v)) {}
void FieldValue::print(std::ostream& out) const {
data->print(out);
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 742251536e..6b88111732 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -29,6 +29,7 @@
#include "qpid/broker/Link.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/SessionHandler.h"
@@ -280,7 +281,9 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>&
{
broker.getConnectionObservers().add(
boost::shared_ptr<broker::ConnectionObserver>(new ConnectionObserver(*this)));
- getArgs().setString(QPID_REPLICATE, printable(NONE).str());
+ framing::FieldTable args = getArgs();
+ args.setString(QPID_REPLICATE, printable(NONE).str());
+ setArgs(args);
dispatch[EventQueueDeclare::getFullName()] = &BrokerReplicator::doEventQueueDeclare;
dispatch[EventQueueDelete::getFullName()] = &BrokerReplicator::doEventQueueDelete;
@@ -458,7 +461,8 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
// If we already have a queue with this name, replace it.
// The queue was definitely created on the primary.
if (queues.find(name)) {
- QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
+ QPID_LOG(warning, logPrefix << "Declare event, replacing exsiting queue: "
+ << name);
deleteQueue(name);
}
replicateQueue(name, values[DURABLE].asBool(), autoDel, args,
@@ -499,7 +503,8 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
// The exchange was definitely created on the primary.
if (exchanges.find(name)) {
deleteExchange(name);
- QPID_LOG(warning, logPrefix << "Replaced existing exchange: " << name);
+ QPID_LOG(warning, logPrefix << "Declare event, replacing existing exchange: "
+ << name);
}
CreateExchangeResult result = createExchange(
name, values[EXTYPE].asString(), values[DURABLE].asBool(), args,
@@ -591,8 +596,15 @@ string getAltExchange(const types::Variant& var) {
}
else return string();
}
+
+Variant getHaUuid(const Variant::Map& map) {
+ Variant::Map::const_iterator i = map.find(QPID_HA_UUID);
+ return i == map.end() ? Variant() : i->second;
}
+} // namespace
+
+
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicationTest.isReplicated(
@@ -606,6 +618,14 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
throw Exception(QPID_MSG("Unexpected queue response: " << values));
if (!queueTracker->response(name)) return; // Response is out-of-date
QPID_LOG(debug, logPrefix << "Queue response: " << name);
+ // If we see a queue with the same name as one we have, but not the same UUID,
+ // then replace the one we have.
+ boost::shared_ptr<Queue> queue = queues.find(name);
+ if (queue && getHaUuid(queue->getSettings().original) != getHaUuid(argsMap)) {
+ QPID_LOG(warning, logPrefix << "UUID mismatch, replacing queue: "
+ << name);
+ deleteQueue(name);
+ }
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
boost::shared_ptr<QueueReplicator> qr = replicateQueue(
@@ -629,6 +649,16 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
QPID_LOG(debug, logPrefix << "Exchange response: " << name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
+ // If we see an exchange with the same name as one we have, but not the same UUID,
+ // then replace the one we have.
+ boost::shared_ptr<Exchange> exchange = exchanges.find(name);
+ if (exchange &&
+ exchange->getArgs().getAsString(QPID_HA_UUID) != args.getAsString(QPID_HA_UUID))
+ {
+ QPID_LOG(warning, logPrefix << "UUID mismatch, replacing exchange: "
+ << name);
+ deleteExchange(name);
+ }
CreateExchangeResult result = createExchange(
name, values[TYPE].asString(), values[DURABLE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index bdb1a66a83..6d5d68191b 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -31,6 +31,8 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Timer.h"
#include <boost/bind.hpp>
@@ -39,6 +41,8 @@ namespace qpid {
namespace ha {
using sys::Mutex;
+using namespace std;
+using namespace framing;
namespace {
@@ -58,6 +62,8 @@ class PrimaryConfigurationObserver : public broker::ConfigurationObserver
PrimaryConfigurationObserver(Primary& p) : primary(p) {}
void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); }
void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
+ void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); }
+ void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); }
private:
Primary& primary;
};
@@ -178,9 +184,12 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) {
}
}
+// NOTE: Called with queue registry lock held.
void Primary::queueCreate(const QueuePtr& q) {
- // Throw if there is an invalid replication level in the queue settings.
- haBroker.getReplicationTest().replicateLevel(q->getSettings().storeSettings);
+ if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *q)) {
+ // Give each queue a unique id to avoid confusion of same-named queues.
+ q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
+ }
Mutex::ScopedLock l(lock);
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
i->second->queueCreate(q);
@@ -188,6 +197,7 @@ void Primary::queueCreate(const QueuePtr& q) {
}
}
+// NOTE: Called with queue registry lock held.
void Primary::queueDestroy(const QueuePtr& q) {
Mutex::ScopedLock l(lock);
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
@@ -195,6 +205,21 @@ void Primary::queueDestroy(const QueuePtr& q) {
checkReady(l);
}
+// NOTE: Called with exchange registry lock held.
+void Primary::exchangeCreate(const ExchangePtr& ex) {
+ if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *ex)) {
+ // Give each exchange a unique id to avoid confusion of same-named exchanges.
+ FieldTable args = ex->getArgs();
+ args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0])));
+ ex->setArgs(args);
+ }
+}
+
+// NOTE: Called with exchange registry lock held.
+void Primary::exchangeDestroy(const ExchangePtr&) {
+ // Do nothing
+ }
+
void Primary::opened(broker::Connection& connection) {
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 22b231ed72..c713115176 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -60,6 +60,7 @@ class Primary
{
public:
typedef boost::shared_ptr<broker::Queue> QueuePtr;
+ typedef boost::shared_ptr<broker::Exchange> ExchangePtr;
static Primary* get() { return instance; }
@@ -72,6 +73,8 @@ class Primary
// Called via ConfigurationObserver
void queueCreate(const QueuePtr&);
void queueDestroy(const QueuePtr&);
+ void exchangeCreate(const ExchangePtr&);
+ void exchangeDestroy(const ExchangePtr&);
// Called via ConnectionObserver
void opened(broker::Connection& connection);
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index ea76763425..1f14ce4669 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -49,9 +49,8 @@ using namespace framing;
using namespace std;
using sys::Mutex;
-const std::string QPID_HA_EVENT_PREFIX("qpid.ha-");
-const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
-const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position");
+const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_PREFIX+"dequeue");
+const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_PREFIX+"position");
const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
std::string QueueReplicator::replicatorName(const std::string& queueName) {
@@ -63,7 +62,7 @@ bool QueueReplicator::isReplicatorName(const std::string& name) {
}
bool QueueReplicator::isEventKey(const std::string key) {
- const std::string& prefix = QPID_HA_EVENT_PREFIX;
+ const std::string& prefix = QPID_HA_PREFIX;
bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0;
return ret;
}
@@ -114,7 +113,9 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
- getArgs().setString(QPID_REPLICATE, printable(NONE).str());
+ framing::FieldTable args = getArgs();
+ args.setString(QPID_REPLICATE, printable(NONE).str());
+ setArgs(args);
}
// This must be separate from the constructor so we can call shared_from_this.
diff --git a/qpid/cpp/src/qpid/ha/types.cpp b/qpid/cpp/src/qpid/ha/types.cpp
index 53e2056213..57712762ab 100644
--- a/qpid/cpp/src/qpid/ha/types.cpp
+++ b/qpid/cpp/src/qpid/ha/types.cpp
@@ -33,6 +33,8 @@ namespace ha {
using namespace std;
const string QPID_REPLICATE("qpid.replicate");
+const string QPID_HA_PREFIX("qpid.ha-");
+const string QPID_HA_UUID(QPID_HA_PREFIX+"uuid");
string EnumBase::str() const {
assert(value < count);
diff --git a/qpid/cpp/src/qpid/ha/types.h b/qpid/cpp/src/qpid/ha/types.h
index 35faf9f624..d1afbf1190 100644
--- a/qpid/cpp/src/qpid/ha/types.h
+++ b/qpid/cpp/src/qpid/ha/types.h
@@ -99,6 +99,8 @@ inline bool isBackup(BrokerStatus s) { return !isPrimary(s); }
// String constants.
extern const std::string QPID_REPLICATE;
+extern const std::string QPID_HA_PREFIX;
+extern const std::string QPID_HA_UUID;
/** Define IdSet type, not a typedef so we can overload operator << */
class IdSet : public std::set<types::Uuid> {};
diff --git a/qpid/cpp/src/qpid/types/Variant.cpp b/qpid/cpp/src/qpid/types/Variant.cpp
index d332fffa5e..9b981c9171 100644
--- a/qpid/cpp/src/qpid/types/Variant.cpp
+++ b/qpid/cpp/src/qpid/types/Variant.cpp
@@ -650,7 +650,7 @@ VariantImpl* VariantImpl::create(const Variant& v)
}
}
-Variant::Variant() : impl(0) {}
+Variant::Variant() : impl(new VariantImpl()) {}
Variant::Variant(bool b) : impl(new VariantImpl(b)) {}
Variant::Variant(uint8_t i) : impl(new VariantImpl(i)) {}
Variant::Variant(uint16_t i) : impl(new VariantImpl(i)) {}
@@ -893,6 +893,8 @@ bool operator==(const Variant& a, const Variant& b)
return a.isEqualTo(b);
}
+bool operator!=(const Variant& a, const Variant& b) { return !(a == b); }
+
bool Variant::isEqualTo(const Variant& other) const
{
return impl && impl->isEqualTo(*other.impl);
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 9eebfa952f..d7885d9622 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -142,7 +142,9 @@ class HaBroker(Broker):
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
assert subprocess.call(
- [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
+ [self.qpid_config_path, "--broker", self.host_port()]+args,
+ stdout=1, stderr=subprocess.STDOUT
+ ) == 0
def config_replicate(self, from_broker, queue):
self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
@@ -160,12 +162,14 @@ class HaBroker(Broker):
else:
return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
- def wait_backup(self, address):
- """Wait for address to become valid on a backup broker."""
+ def wait_address(self, address):
+ """Wait for address to become valid on the broker."""
bs = self.connect_admin().session()
try: wait_address(bs, address)
finally: bs.connection.close()
+ def wait_backup(self, address): self.wait_address(address)
+
def assert_browse(self, queue, expected, **kwargs):
"""Verify queue contents by browsing."""
bs = self.connect().session()
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 60c68730f8..d25e68b29c 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -270,6 +270,7 @@ class ReplicationTests(HaBrokerTest):
def test_qpid_config_replication(self):
"""Set up replication via qpid-config"""
brokers = HaCluster(self,2)
+ brokers[0].wait_status("active")
brokers[0].config_declare("q","all")
brokers[0].connect().session().sender("q").send("foo")
brokers[1].assert_browse_backup("q", ["foo"])
@@ -830,6 +831,41 @@ acl deny all all
verify_qmf_events("q2")
finally: l.restore()
+ def test_missed_recreate(self):
+ """If a queue or exchange is destroyed and one with the same name re-created
+ while a backup is disconnected, the backup should also delete/recreate
+ the object when it re-connects"""
+ cluster = HaCluster(self, 3)
+ sn = cluster[0].connect().session()
+ # Create a queue with messages
+ s = sn.sender("qq;{create:always}")
+ msgs = [str(i) for i in xrange(3)]
+ for m in msgs: s.send(m)
+ cluster[1].assert_browse_backup("qq", msgs)
+ cluster[2].assert_browse_backup("qq", msgs)
+ # Set up an exchange with a binding.
+ sn.sender("xx;{create:always,node:{type:topic}}")
+ sn.sender("xxq;{create:always,node:{x-bindings:[{exchange:'xx',queue:'xxq',key:xxq}]}}")
+ cluster[1].wait_address("xx")
+ self.assertEqual(cluster[1].agent().getExchange("xx").values["bindingCount"], 1)
+ cluster[2].wait_address("xx")
+ self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 1)
+
+ # Simulate the race by re-creating the objects before promoting the new primary
+ cluster.kill(0, False)
+ sn = cluster[1].connect_admin().session()
+ sn.sender("qq;{delete:always}").close()
+ s = sn.sender("qq;{create:always}")
+ s.send("foo")
+ sn.sender("xx;{delete:always}").close()
+ sn.sender("xx;{create:always,node:{type:topic}}")
+ cluster[1].promote()
+ cluster[1].wait_status("active")
+ # Verify we are not still using the old objects on cluster[2]
+ cluster[2].assert_browse_backup("qq", ["foo"])
+ cluster[2].wait_address("xx")
+ self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 0)
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit