diff options
author | Alan Conway <aconway@apache.org> | 2012-02-14 16:07:36 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-14 16:07:36 +0000 |
commit | ac149d9a7f5df376e7054eeea2728edcd061f5eb (patch) | |
tree | b93c913e446b7970a56bad5c139e92ea444117f8 | |
parent | 1693fee9a3a229a6859d5eecccbee13350f68cde (diff) | |
download | qpid-python-ac149d9a7f5df376e7054eeea2728edcd061f5eb.tar.gz |
QPID-3603: Replicate unbind events.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-6@1244074 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 49 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 20 |
3 files changed, 59 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index a496ab7583..58602b5511 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -30,6 +30,7 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/MessageTransferBody.h" #include "qmf/org/apache/qpid/broker/EventBind.h" +#include "qmf/org/apache/qpid/broker/EventUnbind.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" @@ -41,6 +42,7 @@ namespace qpid { namespace ha { using qmf::org::apache::qpid::broker::EventBind; +using qmf::org::apache::qpid::broker::EventUnbind; using qmf::org::apache::qpid::broker::EventExchangeDeclare; using qmf::org::apache::qpid::broker::EventExchangeDelete; using qmf::org::apache::qpid::broker::EventQueueDeclare; @@ -70,6 +72,7 @@ const string ARGUMENTS("arguments"); const string AUTODEL("autoDel"); const string AUTODELETE("autoDelete"); const string BIND("bind"); +const string UNBIND("unbind"); const string BINDING("binding"); const string CREATED("created"); const string DISP("disp"); @@ -171,6 +174,12 @@ void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler.out->handle(header); sessionHandler.out->handle(content); } + +void translate(const Variant& value, framing::FieldTable& outArgs) { + if (!value.isVoid()) + amqp_0_10::translate(value.asMap(), outArgs); +} + } // namespace BrokerReplicator::~BrokerReplicator() {} @@ -237,21 +246,19 @@ void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const fram else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values); else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values); else if (match<EventBind>(schema)) doEventBind(values); - // FIXME aconway 2011-11-21: handle unbind & all other relevant events. + else if (match<EventUnbind>(schema)) doEventUnbind(values); } } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME]; Variant::Map& values = i->asMap()[VALUES].asMap(); framing::FieldTable args; - amqp_0_10::translate(values[ARGUMENTS].asMap(), args); + translate(values[ARGUMENTS].asMap(), args); if (type == QUEUE) doResponseQueue(values); else if (type == EXCHANGE) doResponseExchange(values); else if (type == BINDING) doResponseBind(values); - else QPID_LOG(error, "HA: Backup received unknown response: type=" << type + else QPID_LOG(error, "HA: Backup received unknown response type=" << type << " values=" << values); - - // FIXME aconway 2011-12-06: handle all relevant response types. } } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers); } catch (const std::exception& e) { @@ -264,7 +271,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { Variant::Map argsMap = values[ARGS].asMap(); if (values[DISP] == CREATED && replicateLevel(argsMap)) { framing::FieldTable args; - amqp_0_10::translate(argsMap, args); + translate(argsMap, args); std::pair<boost::shared_ptr<Queue>, bool> result = broker.createQueue( name, @@ -312,7 +319,7 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { if (values[DISP] == CREATED && replicateLevel(argsMap)) { string name = values[EXNAME].asString(); framing::FieldTable args; - amqp_0_10::translate(argsMap, args); + translate(argsMap, args); if (broker.createExchange( name, values[EXTYPE].asString(), @@ -356,7 +363,7 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { queue && replicateLevel(queue->getSettings())) { framing::FieldTable args; - amqp_0_10::translate(values[ARGS].asMap(), args); + translate(values[ARGS].asMap(), args); string key = values[KEY].asString(); QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName() << " queue=" << queue->getName() @@ -365,12 +372,32 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { } } +void BrokerReplicator::doEventUnbind(Variant::Map& values) { + boost::shared_ptr<Exchange> exchange = + broker.getExchanges().find(values[EXNAME].asString()); + boost::shared_ptr<Queue> queue = + broker.getQueues().find(values[QNAME].asString()); + // We only replicate unbinds for a replicated queue to replicated + // exchange that both exist locally. + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) + { + framing::FieldTable args; + translate(values[ARGS].asMap(), args); + string key = values[KEY].asString(); + QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + exchange->unbind(queue, key, &args); + } +} + void BrokerReplicator::doResponseQueue(Variant::Map& values) { // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication Variant::Map argsMap(values[ARGUMENTS].asMap()); if (!replicateLevel(argsMap)) return; framing::FieldTable args; - amqp_0_10::translate(argsMap, args); + translate(argsMap, args); string name(values[NAME].asString()); std::pair<boost::shared_ptr<Queue>, bool> result = broker.createQueue( @@ -396,7 +423,7 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { Variant::Map argsMap(values[ARGUMENTS].asMap()); if (!replicateLevel(argsMap)) return; framing::FieldTable args; - amqp_0_10::translate(argsMap, args); + translate(argsMap, args); if (broker.createExchange( values[NAME].asString(), values[TYPE].asString(), @@ -445,7 +472,7 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { queue && replicateLevel(queue->getSettings())) { framing::FieldTable args; - amqp_0_10::translate(values[ARGUMENTS].asMap(), args); + translate(values[ARGUMENTS].asMap(), args); string key = values[KEY].asString(); exchange->bind(queue, key, &args); QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName() diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 154dd340ac..f0b9e0b599 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -63,14 +63,18 @@ class BrokerReplicator : public broker::Exchange private: void initializeBridge(broker::Bridge&, broker::SessionHandler&); + void doEventQueueDeclare(types::Variant::Map& values); void doEventQueueDelete(types::Variant::Map& values); void doEventExchangeDeclare(types::Variant::Map& values); void doEventExchangeDelete(types::Variant::Map& values); void doEventBind(types::Variant::Map&); + void doEventUnbind(types::Variant::Map&); + void doResponseQueue(types::Variant::Map& values); void doResponseExchange(types::Variant::Map& values); void doResponseBind(types::Variant::Map& values); + void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); broker::Broker& broker; diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index ed9786674a..38f243a5c1 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -55,12 +55,15 @@ class ShortTests(BrokerTest): except NotFound: pass def test_replication(self): + """Test basic replication of wiring and messages before and + after backup has connected""" + def queue(name, replicate): return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate) def exchange(name, replicate, bindq): return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq) - def setup(p, prefix): + def setup(p, prefix, primary): """Create config, send messages on the primary p""" s = p.sender(queue(prefix+"q1", "all")) for m in ["a", "b", "1"]: s.send(Message(m)) @@ -71,6 +74,14 @@ class ShortTests(BrokerTest): p.sender(queue(prefix+"q3", "none")).send(Message("3")) p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4")) p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5")) + # Test unbind + p.sender(queue(prefix+"q4", "all")).send(Message("6")) + s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4")) + s3.send(Message("7")) + # Use old connection to unbind + us = primary.connect_old().session(str(qpid.datatypes.uuid4())) + us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4") + p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped # FIXME aconway 2011-11-24: need a marker so we can wait till sync is done. p.sender(queue(prefix+"x", "wiring")) @@ -93,13 +104,16 @@ class ShortTests(BrokerTest): b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"]) + b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind. + self.assert_browse_retry(b, prefix+"q4", ["6","7"]) + primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary p = primary.connect().session() # Create config, send messages before starting the backup, to test catch-up replication. - setup(p, "1") + setup(p, "1", primary) backup = self.ha_broker(name="backup", broker_url=primary.host_port()) # Create config, send messages after starting the backup, to test steady-state replication. - setup(p, "2") + setup(p, "2", primary) # Verify the data on the backup b = backup.connect().session() |