diff options
author | Alan Conway <aconway@apache.org> | 2012-02-14 16:02:15 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-14 16:02:15 +0000 |
commit | 0c135c2ab924740e45be870cd22288049fbbecad (patch) | |
tree | a6b9f0bb7398ee8be510394125ac99cdcad2e4a4 | |
parent | fb6bbc399e8a1911d4ec3693509cf7ca571a582a (diff) | |
download | qpid-python-0c135c2ab924740e45be870cd22288049fbbecad.tar.gz |
QPID-3603: In progress - integrate ReplicatingSubscription.
The code to use ReplicatingSubscription is there but it is disabled by
commenting out getConsumerFactories().add in Backup.cpp because it
hangs the test.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-6@1244037 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaPlugin.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/WiringReplicator.cpp | 35 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 3 |
7 files changed, 54 insertions, 28 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 55c52cc508..d3a2e71ff6 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -21,6 +21,7 @@ #include "Backup.h" #include "Settings.h" #include "WiringReplicator.h" +#include "ReplicatingSubscription.h" #include "qpid/Url.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Bridge.h" @@ -58,6 +59,12 @@ Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { link = result.first; boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link)); broker.getExchanges().registerExchange(wr); + + // FIXME aconway 2011-11-25: using ReplicatingSubscription hangs the tests + // The tests pass with a plain subscription if we dont add the factory. +// broker.getConsumerFactories().add( +// boost::shared_ptr<ReplicatingSubscription::Factory>( +// new ReplicatingSubscription::Factory())); } } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index d4edd273dc..38ae19a11e 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -58,7 +58,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) mgmtObject->set_status("solo"); ma->addObject(mgmtObject); } - QPID_LOG(notice, "HA broker initialized, client-url=" << clientUrl + QPID_LOG(notice, "HA: broker initialized, client-url=" << clientUrl << ", broker-url=" << brokerUrl); backup.reset(new Backup(broker, s)); } diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index 23f501328d..798dbc2bfd 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -56,10 +56,10 @@ struct HaPlugin : public Plugin { void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); if (broker && settings.enabled) { - QPID_LOG(info, "HA plugin enabled"); + QPID_LOG(info, "HA: Enabled"); haBroker.reset(new ha::HaBroker(*broker, settings)); } else - QPID_LOG(info, "HA plugin disabled"); + QPID_LOG(info, "HA: Disabled"); } }; diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 873d3784a7..1f9a8730b3 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -66,6 +66,23 @@ std::string mask(const std::string& in) return DOLLAR + in + INTERNAL; } +boost::shared_ptr<broker::SemanticState::ConsumerImpl> +ReplicatingSubscription::Factory::create( + SemanticState* _parent, + const std::string& _name, + Queue::shared_ptr _queue, + bool ack, + bool _acquire, + bool _exclusive, + const std::string& _tag, + const std::string& _resumeId, + uint64_t _resumeTtl, + const framing::FieldTable& _arguments +) { + return boost::shared_ptr<broker::SemanticState::ConsumerImpl>( + new ReplicatingSubscription(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments)); +} + ReplicatingSubscription::ReplicatingSubscription( SemanticState* _parent, const std::string& _name, @@ -81,7 +98,8 @@ ReplicatingSubscription::ReplicatingSubscription( events(new Queue(mask(_name))), consumer(new DelegatingConsumer(*this)) { - + // FIXME aconway 2011-11-25: string constants. + QPID_LOG(debug, "HA: replicating subscription " << _name << " to " << _queue->getName()); if (_arguments.isSet("qpid.high_sequence_number")) { qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number"); qpid::framing::SequenceNumber lwm; diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index a1c10a7641..0ba2b2f0de 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -24,6 +24,7 @@ #include "qpid/broker/SemanticState.h" #include "qpid/broker/QueueObserver.h" +#include "qpid/broker/ConsumerFactory.h" namespace qpid { @@ -43,11 +44,21 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, public broker::QueueObserver { public: + struct Factory : public broker::ConsumerFactory { + boost::shared_ptr<broker::SemanticState::ConsumerImpl> create( + broker::SemanticState* parent, + const std::string& name, boost::shared_ptr<broker::Queue> , + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, + const framing::FieldTable& arguments); + }; + ReplicatingSubscription(broker::SemanticState* parent, const std::string& name, boost::shared_ptr<broker::Queue> , bool ack, bool acquire, bool exclusive, const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); + ~ReplicatingSubscription(); void init(); diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp index 8b10765492..04d0f9d9ee 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp @@ -117,22 +117,18 @@ const string S_ALL="all"; ReplicateLevel replicateLevel(const string& str) { // FIXME aconway 2011-11-24: case insenstive comparison. - QPID_LOG(critical, "FIXME replicateLevel " << str); ReplicateLevel rl = RL_NONE; if (str == S_WIRING) rl = RL_WIRING; else if (str == S_ALL) rl = RL_ALL; - QPID_LOG(critical, "FIXME replicateLevel " << str << " = " << rl); return rl; } ReplicateLevel replicateLevel(const framing::FieldTable& f) { - QPID_LOG(critical, "FIXME replicateLevel " << f); if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE)); else return RL_NONE; } ReplicateLevel replicateLevel(const Variant::Map& m) { - QPID_LOG(critical, "FIXME replicateLevel " << m); Variant::Map::const_iterator i = m.find(QPID_REPLICATE); if (i != m.end()) return replicateLevel(i->second.asString()); else return RL_NONE; @@ -234,6 +230,7 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram Variant::Map& map = list.front().asMap(); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); + QPID_LOG(trace, "HA: Configuration event from primary: " << values); if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values); else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values); @@ -249,29 +246,30 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram Variant::Map& values = i->asMap()[VALUES].asMap(); framing::FieldTable args; amqp_0_10::translate(values[ARGUMENTS].asMap(), args); + QPID_LOG(trace, "HA: Configuration response from primary: " << values); if (type == QUEUE) doResponseQueue(values); else if (type == EXCHANGE) doResponseExchange(values); else if (type == BINDING) doResponseBind(values); - else throw Exception(QPID_MSG("Ignoring unexpected class: " << type)); + else throw Exception(QPID_MSG("HA: Unexpected response type: " << type)); } } else { - QPID_LOG(warning, QPID_MSG("Replicator: Ignoring QMFv2 message with headers: " << *headers)); + QPID_LOG(warning, QPID_MSG("HA: Expecting remote configuration message, got: " << *headers)); } } catch (const std::exception& e) { - QPID_LOG(warning, "Replicator: Error replicating configuration: " << e.what()); - QPID_LOG(debug, "Replicator: Error processing: " << list); + QPID_LOG(warning, "HA: Error replicating configuration: " << e.what()); + QPID_LOG(debug, "HA: Error processing configuration message: " << list); } } void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { - QPID_LOG(critical, "FIXME doEventQueueDeclare " << values); string name = values[QNAME].asString(); Variant::Map argsMap = values[ARGS].asMap(); if (values[DISP] == CREATED && replicateLevel(argsMap)) { - QPID_LOG(debug, "HA: Creating replicated queue " << name); - framing::FieldTable args; + framing::FieldTable args; amqp_0_10::translate(argsMap, args); - std::pair<boost::shared_ptr<Queue>, bool> result = + + QPID_LOG(debug, "HA: Creating queue from event " << name); + std::pair<boost::shared_ptr<Queue>, bool> result = broker.createQueue( name, values[DURABLE].asBool(), @@ -288,7 +286,7 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { // out of date. startQueueReplicator(result.first); } else { - QPID_LOG(warning, "Replicated queue " << name << " already exists"); + QPID_LOG(warning, "HA: Replicated queue " << name << " already exists"); } } } @@ -297,7 +295,7 @@ void WiringReplicator::doEventQueueDelete(Variant::Map& values) { string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = broker.getQueues().find(name); if (queue && replicateLevel(queue->getSettings())) { - QPID_LOG(debug, "Deleting replicated queue " << name); + QPID_LOG(debug, "HA: Deleting queue from event: " << name); broker.deleteQueue( name, values[USER].asString(), @@ -311,6 +309,7 @@ void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) { string name = values[EXNAME].asString(); framing::FieldTable args; amqp_0_10::translate(argsMap, args); + QPID_LOG(debug, "HA: Creating exchange from event " << name); if (!broker.createExchange( name, values[EXTYPE].asString(), @@ -331,7 +330,7 @@ void WiringReplicator::doEventExchangeDelete(Variant::Map& values) { try { boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name); if (exchange && replicateLevel(exchange->getArgs())) { - QPID_LOG(debug, "Deleting replicated exchange " << name); + QPID_LOG(debug, "HA: Deleting exchange:" << name); broker.deleteExchange( name, values[USER].asString(), @@ -358,10 +357,8 @@ void WiringReplicator::doEventBind(Variant::Map& values) { } void WiringReplicator::doResponseQueue(Variant::Map& values) { - QPID_LOG(critical, "FIXME doResponseQueue " << values); // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication Variant::Map argsMap(values[ARGUMENTS].asMap()); - QPID_LOG(critical, "FIXME doResponseQueue replevel " << replicateLevel(argsMap)); if (!replicateLevel(argsMap)) return; framing::FieldTable args; amqp_0_10::translate(argsMap, args); @@ -429,10 +426,8 @@ void WiringReplicator::doResponseBind(Variant::Map& values) { try { std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); - QPID_LOG(critical, "FIXME doResponseBind " << qName << " to " << exName); boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName); boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); - QPID_LOG(critical, "FIXME doResponseBind ptrs " << queue.get() << " to " << exchange.get()); // FIXME aconway 2011-11-24: more flexible configuration for binding replication. // Automatically replicate exchange if queue and exchange are replicated @@ -451,9 +446,7 @@ void WiringReplicator::doResponseBind(Variant::Map& values) { } void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { - QPID_LOG(critical, "FIXME startQueueReplicator " << queue->getName() << " " << queue->getSettings()); if (replicateLevel(queue->getSettings()) == RL_ALL) { - QPID_LOG(critical, "FIXME startQueueReplicator starting"); boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); broker.getExchanges().registerExchange(qr); } diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 2b52d202ca..79123058b7 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -43,9 +43,6 @@ class ShortTests(BrokerTest): cmd="qpid-route route add %s %s qpid.node-cloner x"%(backup, primary) self.assertEqual(0, os.system(cmd)) - def setup_replication(self, primary, backup, queue): - self.assertEqual(0,os.system("qpid-route --ack 1 queue add %s %s qpid.replicator-%s %s"%(backup, primary, queue, queue))) - # FIXME aconway 2011-11-15: work around async replication. def wait(self, session, address): def check(): |