summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-03-07 17:25:47 +0000
committerAlan Conway <aconway@apache.org>2013-03-07 17:25:47 +0000
commit6129886499678fbab1111b6e7422b77b6f62e143 (patch)
tree95cec06ad2139b0beb9dd7146950408a1b824c88
parent74f4b51ba0cc95529b054e450cd6327f91520b6b (diff)
downloadqpid-python-6129886499678fbab1111b6e7422b77b6f62e143.tar.gz
QPID-4630: HA Fix starting from persistent store.
This was implemented in r1390123 but broken by subsequent changes. When re-starting a persistent HA cluster, the broker that becomes primary keeps its recovered queues while backup brokers discard their recovered queues and catch up from the primary. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1453971 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/amqp_0_10/Codecs.h4
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.cpp9
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp23
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h1
-rwxr-xr-xqpid/cpp/src/tests/ha_store_tests.py24
7 files changed, 42 insertions, 32 deletions
diff --git a/qpid/cpp/include/qpid/amqp_0_10/Codecs.h b/qpid/cpp/include/qpid/amqp_0_10/Codecs.h
index d632a9f20a..99a06e0a32 100644
--- a/qpid/cpp/include/qpid/amqp_0_10/Codecs.h
+++ b/qpid/cpp/include/qpid/amqp_0_10/Codecs.h
@@ -24,10 +24,12 @@
#include "qpid/CommonImportExport.h"
#include "qpid/types/Variant.h"
+#include "boost/shared_ptr.hpp"
namespace qpid {
namespace framing {
class FieldTable;
+class FieldValue;
}
namespace amqp_0_10 {
/**
@@ -74,6 +76,8 @@ QPID_COMMON_EXTERN void translate(const qpid::types::Variant::Map& from, const s
QPID_COMMON_EXTERN void translate(const qpid::framing::FieldTable& from,
qpid::types::Variant::Map& to);
+QPID_COMMON_EXTERN boost::shared_ptr<framing::FieldValue> fieldValue(const types::Variant&);
+
}} // namespace qpid::amqp_0_10
#endif /*!QPID_AMQP_0_10_CODECS_H*/
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp b/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp
index 1288652ee1..e7637b16ad 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp
@@ -527,6 +527,15 @@ void translate(const FieldTable& from, Variant::Map& to)
convert(from, to, &toVariantMapEntry);
}
+boost::shared_ptr<framing::FieldValue> fieldValue(const types::Variant& value) {
+ // TODO aconway 2013-03-05: Nasty implementation
+ Variant::Map m;
+ m[std::string()] = value;
+ framing::FieldTable f;
+ translate(m, f);
+ return f.get(std::string());
+}
+
const std::string ListCodec::contentType("amqp/list");
const std::string MapCodec::contentType("amqp/map");
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 81e26d0b48..91b497c57b 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1587,7 +1587,9 @@ void Queue::UsageBarrier::destroy()
}
void Queue::addArgument(const string& key, const types::Variant& value) {
- settings.original.insert(types::Variant::Map::value_type(key, value));
+ settings.original[key] = value;
+ qpid::amqp_0_10::translate(settings.asMap(), encodableSettings);
+ settings.storeSettings.set(key, qpid::amqp_0_10::fieldValue(value));
if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap());
}
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index 576d0f198b..13f9bbe23c 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -59,6 +59,8 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings,
QueueMap::iterator i = queues.find(name);
if (i == queues.end()) {
Queue::shared_ptr queue = create(name, settings);
+ // Allow ConfigurationObserver to modify settings before storing the message.
+ if (getBroker()) getBroker()->getConfigurationObservers().queueCreate(queue);
//Move this to factory also?
if (alternate)
queue->setAlternateExchange(alternate);//need to do this *before* create
@@ -67,16 +69,11 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings,
queue->create();
}
queues[name] = queue;
- // NOTE: raiseEvent and queueCreate must be called with the lock held in
- // order to ensure events are generated in the correct order.
- // Call queueCreate before raiseEvents so it can add arguments that
- // will be included in the management event.
- if (getBroker()) getBroker()->getConfigurationObservers().queueCreate(queue);
result = std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
result = std::pair<Queue::shared_ptr, bool>(i->second, false);
}
- if (getBroker() && getBroker()->getManagementAgent()) {
+ if (getBroker() && getBroker()->getManagementAgent()) {
getBroker()->getManagementAgent()->raiseEvent(
_qmf::EventQueueDeclare(
connectionId, userId, name,
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 7c0fad9b60..42e54a0125 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -234,7 +234,10 @@ class BrokerReplicator::UpdateTracker {
/** Destructor cleans up remaining initial queues. */
~UpdateTracker() {
// Don't throw in a destructor.
- try { for_each(initial.begin(), initial.end(), cleanFn); }
+ try {
+ for_each(initial.begin(), initial.end(),
+ boost::bind(&UpdateTracker::clean, this, _1));
+ }
catch (const std::exception& e) {
QPID_LOG(error, "Error in cleanup of lost objects: " << e.what());
}
@@ -269,7 +272,8 @@ class BrokerReplicator::UpdateTracker {
private:
void clean(const std::string& name) {
- QPID_LOG(info, "Backup updated, deleting " << type << " " << name);
+ QPID_LOG(info, "Backup: Deleted " << type << " " << name <<
+ ": no longer exists on primary");
cleanFn(name);
}
@@ -531,7 +535,6 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
CreateExchangeResult result = createExchange(
name, values[EXTYPE].asString(), values[DURABLE].asBool(), args,
values[ALTEX].asString());
- replicatedExchanges.insert(name);
assert(result.second);
}
}
@@ -547,7 +550,6 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
if (exchangeTracker.get()) exchangeTracker->event(name);
deleteExchange(name);
- replicatedExchanges.erase(name);
}
}
@@ -638,12 +640,12 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
throw Exception(QPID_MSG("Unexpected queue response: " << values));
if (!queueTracker->response(name)) return; // Response is out-of-date
QPID_LOG(debug, logPrefix << "Queue response: " << name);
- // If we see a queue with the same name as one we have, but not the same UUID,
- // then replace the one we have.
boost::shared_ptr<Queue> queue = queues.find(name);
- if (queue && getHaUuid(queue->getSettings().original) != getHaUuid(argsMap)) {
- QPID_LOG(warning, logPrefix << "UUID mismatch, replacing queue: "
- << name);
+ if (queue) { // Already exists
+ bool uuidOk = (getHaUuid(queue->getSettings().original) == getHaUuid(argsMap));
+ if (!uuidOk) QPID_LOG(debug, logPrefix << "UUID mismatch for queue: " << name);
+ if (uuidOk && findQueueReplicator(name)) return; // already replicated, UUID OK.
+ QPID_LOG(debug, logPrefix << "Queue response replacing queue: " << name);
deleteQueue(name);
}
framing::FieldTable args;
@@ -669,7 +671,7 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
QPID_LOG(debug, logPrefix << "Exchange response: " << name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
- // If we see an exchange with the same name as one we have, but not the same UUID,
+ // If we see an exchange with the same name as one we have, but a different UUID,
// then replace the one we have.
boost::shared_ptr<Exchange> exchange = exchanges.find(name);
if (exchange &&
@@ -682,7 +684,6 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
CreateExchangeResult result = createExchange(
name, values[TYPE].asString(), values[DURABLE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
- replicatedExchanges.insert(name);
}
namespace {
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index f64c54abe8..03df5fec6b 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -153,7 +153,6 @@ class BrokerReplicator : public broker::Exchange,
AlternateExchangeSetter alternates;
qpid::Address primary;
typedef std::set<std::string> StringSet;
- StringSet replicatedExchanges; // exchanges that have been replicated.
broker::Connection* connection;
EventDispatchMap dispatch;
std::auto_ptr<UpdateTracker> queueTracker;
diff --git a/qpid/cpp/src/tests/ha_store_tests.py b/qpid/cpp/src/tests/ha_store_tests.py
index 16bf6e2964..21d403a46e 100755
--- a/qpid/cpp/src/tests/ha_store_tests.py
+++ b/qpid/cpp/src/tests/ha_store_tests.py
@@ -40,13 +40,12 @@ class StoreTests(BrokerTest):
def test_store_recovery(self):
"""Verify basic store and recover functionality"""
- cluster = HaCluster(self, 2)
+ cluster = HaCluster(self, 1)
sn = cluster[0].connect().session()
+ # Create queue qq, exchange exx and binding between them
s = sn.sender("qq;{create:always,node:{durable:true}}")
- sk = sn.sender("xx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:xx,key:k,queue:qq}]}}")
- s.send(Message("foo", durable=True))
- s.send(Message("bar", durable=True))
- sk.send(Message("baz", durable=True))
+ sk = sn.sender("exx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:exx,key:k,queue:qq}]}}")
+ for m in ["foo", "bar", "baz"]: s.send(Message(m, durable=True))
r = cluster[0].connect().session().receiver("qq")
self.assertEqual(r.fetch().content, "foo")
r.session.acknowledge()
@@ -57,16 +56,16 @@ class StoreTests(BrokerTest):
def verify(broker, x_count):
sn = broker.connect().session()
assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count)*["x"])
- sn.sender("xx/k").send(Message("x", durable=True))
+ sn.sender("exx/k").send(Message("x", durable=True))
assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count+1)*["x"])
- verify(cluster[0], 0)
- cluster.bounce(0, promote_next=False)
- cluster[0].promote()
+ verify(cluster[0], 0) # Sanity check
+ cluster.bounce(0)
cluster[0].wait_status("active")
- verify(cluster[0], 1)
- cluster.kill(0, promote_next=False)
- cluster[1].promote()
+ verify(cluster[0], 1) # Loaded from store
+ cluster.start()
+ cluster[1].wait_status("ready")
+ cluster.kill(0)
cluster[1].wait_status("active")
verify(cluster[1], 2)
cluster.bounce(1, promote_next=False)
@@ -90,7 +89,6 @@ class StoreTests(BrokerTest):
# Make changes that the backup doesn't see
cluster.kill(1, promote_next=False)
- time.sleep(1) # FIXME aconway 2012-09-25:
r1 = cluster[0].connect().session().receiver("q1")
for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m)
r1.session.acknowledge()