diff options
author | Alan Conway <aconway@apache.org> | 2012-01-19 23:03:52 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-01-19 23:03:52 +0000 |
commit | 7305cf39aff572bca77d89c12cc6d403e20ab20d (patch) | |
tree | 0ad560b8a400e0f5d7a6059f6f307f4cdc16a3bc | |
parent | 82dccc664ada89b73288c06eb7dc425d8359defc (diff) | |
download | qpid-python-7305cf39aff572bca77d89c12cc6d403e20ab20d.tar.gz |
QPID-3603: Automatic wiring and message replication.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233646 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/src/ha.mk | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp (renamed from qpid/cpp/src/qpid/broker/QueueReplicator.cpp) | 67 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h (renamed from qpid/cpp/src/qpid/broker/QueueReplicator.h) | 42 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/WiringReplicator.cpp | 101 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/WiringReplicator.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 6 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 74 |
14 files changed, 201 insertions, 118 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 50d2c1eb86..ec8ff98a54 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -627,8 +627,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueuedMessage.h \ qpid/broker/QueueFlowLimit.h \ qpid/broker/QueueFlowLimit.cpp \ - qpid/broker/QueueReplicator.h \ - qpid/broker/QueueReplicator.cpp \ qpid/broker/ReplicatingSubscription.h \ qpid/broker/ReplicatingSubscription.cpp \ qpid/broker/RateFlowcontrol.h \ diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index ca6415d8dd..3d465d235e 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -29,6 +29,8 @@ ha_la_SOURCES = \ qpid/ha/HaBroker.h \ qpid/ha/HaPlugin.cpp \ qpid/ha/Settings.h \ + qpid/ha/QueueReplicator.h \ + qpid/ha/QueueReplicator.cpp \ qpid/ha/WiringReplicator.cpp \ qpid/ha/WiringReplicator.h diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index b3a742c170..02034811bf 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -25,7 +25,6 @@ #include "qpid/broker/Link.h" #include "qpid/broker/LinkRegistry.h" #include "qpid/ha/WiringReplicator.h" -#include "qpid/broker/QueueReplicator.h" #include "qpid/broker/SessionState.h" #include "qpid/management/ManagementAgent.h" @@ -112,10 +111,7 @@ void Bridge::create(Connection& c) if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking(); if (initialize) initialize(*this, sessionHandler); else if (args.i_srcIsQueue) { - //TODO: something other than this which is nasty... - bool isReplicatingLink = QueueReplicator::initReplicationSettings(args.i_dest, link->getBroker()->getQueues(), options); - - peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, isReplicatingLink ? 1 : 0, false, "", 0, options); + peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest); diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index b067da3702..acaa9d1cbd 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -105,6 +105,8 @@ namespace qpid { std::string getHost() { return host; } uint16_t getPort() { return port; } + std::string getTransport() { return transport; } + bool isDurable() { return durable; } void maintenanceVisit (); uint nextChannel(); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index b34bc65ec5..eb6ab8ba15 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1434,7 +1434,7 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, } -const Broker* Queue::getBroker() +Broker* Queue::getBroker() { return broker; } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index b66600ef43..a53916ffbc 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -403,7 +403,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, void flush(); - const Broker* getBroker(); + Broker* getBroker(); uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } void setDequeueSincePurge(uint32_t value); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index fbf9cd228e..333b707308 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -27,7 +27,6 @@ #include "qpid/broker/Message.h" #include "qpid/ha/WiringReplicator.h" #include "qpid/broker/Queue.h" -#include "qpid/broker/QueueReplicator.h" #include "qpid/broker/ReplicatingSubscription.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/SessionOutputException.h" @@ -478,9 +477,10 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { msg->computeExpiration(getSession().getBroker().getExpiryPolicy()); std::string exchangeName = msg->getExchangeName(); - if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) { - cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues()); - if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName); + if (!cacheExchange || cacheExchange->getName() != exchangeName + || cacheExchange->isDestroyed()) + { + cacheExchange = session.getBroker().getExchanges().get(exchangeName); } cacheExchange->setProperties(msg); diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 300621860b..55c52cc508 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -42,9 +42,10 @@ using types::Variant; using std::string; Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { + // FIXME aconway 2011-11-24: identifying the primary. Only has 1 address. if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack to identify primary. Url url(s.brokerUrl); - QPID_LOG(info, "HA backup broker connecting to: " << url); + QPID_LOG(info, "HA: Acting as backup to " << url); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over. diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 01c0c8e272..f09b2acaaf 100644 --- a/qpid/cpp/src/qpid/broker/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -18,18 +18,62 @@ * under the License. * */ -#include "qpid/broker/QueueReplicator.h" + +#include "QueueReplicator.h" +#include "qpid/broker/Bridge.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Link.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/SessionHandler.h" #include "qpid/framing/SequenceSet.h" +#include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" +#include <boost/shared_ptr.hpp> + +namespace { +const std::string QPID_REPLICATOR_("qpid.replicator-"); +} namespace qpid { -namespace broker { +namespace ha { +using namespace broker; + +QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) + : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management? + queue(q), link(l), current(queue->getPosition()) +{ + // FIXME aconway 2011-11-24: consistent logging. + QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings()); + queue->getBroker()->getLinks().declare( + link->getHost(), link->getPort(), + false, // durable + queue->getName(), // src + getName(), // dest + "", // key + false, // isQueue + false, // isLocal + "", // id/tag + "", // excludes + false, // dynamic + 0, // sync? + boost::bind(&QueueReplicator::initializeBridge, this, _1, _2) + ); +} -QueueReplicator::QueueReplicator(const std::string& name, boost::shared_ptr<Queue> q) : Exchange(name, 0, 0), queue(q), current(queue->getPosition()) {} QueueReplicator::~QueueReplicator() {} +void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); + peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, framing::FieldTable()); + peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); + peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); + QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest); + +} + + namespace { const std::string DEQUEUE_EVENT("dequeue-event"); const std::string REPLICATOR("qpid.replicator-"); @@ -78,23 +122,6 @@ bool QueueReplicator::isReplicatingLink(const std::string& name) return name.find(REPLICATOR) == 0; } -boost::shared_ptr<Exchange> QueueReplicator::create(const std::string& target, QueueRegistry& queues) -{ - boost::shared_ptr<Exchange> exchange; - if (isReplicatingLink(target)) { - std::string queueName = target.substr(REPLICATOR.size()); - boost::shared_ptr<Queue> queue = queues.find(queueName); - if (!queue) { - QPID_LOG(warning, "Unable to create replicator, can't find " << queueName); - } else { - //TODO: need to cache the replicator - QPID_LOG(info, "Creating replicator for " << queueName); - exchange.reset(new QueueReplicator(target, queue)); - } - } - return exchange; -} - bool QueueReplicator::initReplicationSettings(const std::string& target, QueueRegistry& queues, qpid::framing::FieldTable& settings) { if (isReplicatingLink(target)) { diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 679aa9240d..13fbc6e86c 100644 --- a/qpid/cpp/src/qpid/broker/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -1,5 +1,5 @@ -#ifndef QPID_BROKER_QUEUEREPLICATOR_H -#define QPID_BROKER_QUEUEREPLICATOR_H +#ifndef QPID_HA_QUEUEREPLICATOR_H +#define QPID_HA_QUEUEREPLICATOR_H /* * @@ -25,33 +25,43 @@ #include "qpid/framing/SequenceSet.h" namespace qpid { -namespace broker { +namespace broker { +class Bridge; +class Link; +class Queue; class QueueRegistry; +class SessionHandler; +class Deliverable; +} + +namespace ha { /** * Dummy exchange for processing replication messages */ -class QueueReplicator : public Exchange +class QueueReplicator : public broker::Exchange { public: - QueueReplicator(const std::string& name, boost::shared_ptr<Queue>); + QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l); ~QueueReplicator(); std::string getType() const; - bool bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*); - bool unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*); - void route(Deliverable&, const std::string&, const qpid::framing::FieldTable*); - bool isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const); + bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); + bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); + void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); + bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); static bool isReplicatingLink(const std::string&); - static boost::shared_ptr<Exchange> create(const std::string&, QueueRegistry&); - static bool initReplicationSettings(const std::string&, QueueRegistry&, qpid::framing::FieldTable&); + static bool initReplicationSettings(const std::string&, broker::QueueRegistry&, framing::FieldTable&); static const std::string typeName; private: - boost::shared_ptr<Queue> queue; - qpid::framing::SequenceNumber current; - qpid::framing::SequenceSet dequeued; + void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); + + boost::shared_ptr<broker::Queue> queue; + boost::shared_ptr<broker::Link> link; + framing::SequenceNumber current; + framing::SequenceSet dequeued; }; -}} // namespace qpid::broker +}} // namespace qpid::ha -#endif /*!QPID_BROKER_QUEUEREPLICATOR_H*/ +#endif /*!QPID_HA_QUEUEREPLICATOR_H*/ diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp index c89df85503..8b10765492 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp @@ -19,6 +19,7 @@ * */ #include "WiringReplicator.h" +#include "QueueReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Link.h" @@ -62,7 +63,6 @@ const string QUERY_RESPONSE("_query_response"); const string SCHEMA_ID("_schema_id"); const string VALUES("_values"); -const string ALL("all"); const string ALTEX("altEx"); const string ARGS("args"); const string ARGUMENTS("arguments"); @@ -83,7 +83,6 @@ const string QUEUE("queue"); const string RHOST("rhost"); const string TYPE("type"); const string USER("user"); -const string WIRING("wiring"); const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); const string QMF2("qmf2"); @@ -110,15 +109,33 @@ template <class T> bool match(Variant::Map& schema) { return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } -bool isReplicated(const string& value) { - return value == ALL || value == WIRING; +// FIXME aconway 2011-11-24: this should be a class. +enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL }; +const string S_NONE="none"; +const string S_WIRING="wiring"; +const string S_ALL="all"; + +ReplicateLevel replicateLevel(const string& str) { + // FIXME aconway 2011-11-24: case insenstive comparison. + QPID_LOG(critical, "FIXME replicateLevel " << str); + ReplicateLevel rl = RL_NONE; + if (str == S_WIRING) rl = RL_WIRING; + else if (str == S_ALL) rl = RL_ALL; + QPID_LOG(critical, "FIXME replicateLevel " << str << " = " << rl); + return rl; } -bool isReplicated(const framing::FieldTable& f) { - return f.isSet(QPID_REPLICATE) && isReplicated(f.getAsString(QPID_REPLICATE)); + +ReplicateLevel replicateLevel(const framing::FieldTable& f) { + QPID_LOG(critical, "FIXME replicateLevel " << f); + if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE)); + else return RL_NONE; } -bool isReplicated(const Variant::Map& m) { + +ReplicateLevel replicateLevel(const Variant::Map& m) { + QPID_LOG(critical, "FIXME replicateLevel " << m); Variant::Map::const_iterator i = m.find(QPID_REPLICATE); - return i != m.end() && isReplicated(i->second.asString()); + if (i != m.end()) return replicateLevel(i->second.asString()); + else return RL_NONE; } void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { @@ -164,6 +181,8 @@ WiringReplicator::~WiringReplicator() {} WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l) : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l) { + QPID_LOG(debug, "HA: Starting replication from " << + link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); broker.getLinks().declare( link->getHost(), link->getPort(), false, // durable @@ -198,6 +217,7 @@ void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH sendQuery(QUEUE, queueName, sessionHandler); sendQuery(EXCHANGE, queueName, sessionHandler); sendQuery(BINDING, queueName, sessionHandler); + QPID_LOG(debug, "Activated wiring replicator") } void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { @@ -244,13 +264,15 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram } void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { + QPID_LOG(critical, "FIXME doEventQueueDeclare " << values); string name = values[QNAME].asString(); Variant::Map argsMap = values[ARGS].asMap(); - if (values[DISP] == CREATED && isReplicated(argsMap)) { - QPID_LOG(debug, "Creating replicated queue " << name); + if (values[DISP] == CREATED && replicateLevel(argsMap)) { + QPID_LOG(debug, "HA: Creating replicated queue " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - if (!broker.createQueue( + std::pair<boost::shared_ptr<Queue>, bool> result = + broker.createQueue( name, values[DURABLE].asBool(), values[AUTODEL].asBool(), @@ -258,11 +280,14 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { values[ALTEX].asString(), args, values[USER].asString(), - values[RHOST].asString()).second) { + values[RHOST].asString()); + if (result.second) { // FIXME aconway 2011-11-22: should delete old queue and // re-create from event. // Events are always up to date, whereas responses may be // out of date. + startQueueReplicator(result.first); + } else { QPID_LOG(warning, "Replicated queue " << name << " already exists"); } } @@ -271,7 +296,7 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { void WiringReplicator::doEventQueueDelete(Variant::Map& values) { string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = broker.getQueues().find(name); - if (queue && isReplicated(queue->getSettings())) { + if (queue && replicateLevel(queue->getSettings())) { QPID_LOG(debug, "Deleting replicated queue " << name); broker.deleteQueue( name, @@ -282,7 +307,7 @@ void WiringReplicator::doEventQueueDelete(Variant::Map& values) { void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) { Variant::Map argsMap(values[ARGS].asMap()); - if (values[DISP] == CREATED && isReplicated(argsMap)) { + if (values[DISP] == CREATED && replicateLevel(argsMap)) { string name = values[EXNAME].asString(); framing::FieldTable args; amqp_0_10::translate(argsMap, args); @@ -305,7 +330,7 @@ void WiringReplicator::doEventExchangeDelete(Variant::Map& values) { string name = values[EXNAME].asString(); try { boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name); - if (exchange && isReplicated(exchange->getArgs())) { + if (exchange && replicateLevel(exchange->getArgs())) { QPID_LOG(debug, "Deleting replicated exchange " << name); broker.deleteExchange( name, @@ -320,7 +345,7 @@ void WiringReplicator::doEventBind(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = broker.getQueues().find(values[QNAME].asString()); // We only replicated a binds for a replicated queue to replicated exchange. - if (isReplicated(exchange->getArgs()) && isReplicated(queue->getSettings())) { + if (replicateLevel(exchange->getArgs()) && replicateLevel(queue->getSettings())) { framing::FieldTable args; amqp_0_10::translate(values[ARGS].asMap(), args); string key = values[KEY].asString(); @@ -333,21 +358,28 @@ void WiringReplicator::doEventBind(Variant::Map& values) { } void WiringReplicator::doResponseQueue(Variant::Map& values) { + QPID_LOG(critical, "FIXME doResponseQueue " << values); // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication Variant::Map argsMap(values[ARGUMENTS].asMap()); - if (!isReplicated(argsMap)) return; + QPID_LOG(critical, "FIXME doResponseQueue replevel " << replicateLevel(argsMap)); + if (!replicateLevel(argsMap)) return; framing::FieldTable args; amqp_0_10::translate(argsMap, args); + string name(values[NAME].asString()); QPID_LOG(debug, "Creating replicated queue " << values[NAME].asString() << " (in catch-up)"); - if (!broker.createQueue( - values[NAME].asString(), + std::pair<boost::shared_ptr<Queue>, bool> result = + broker.createQueue( + name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), 0 /*i.e. no owner regardless of exclusivity on master*/, ""/*TODO: need to include alternate-exchange*/, args, ""/*TODO: who is the user?*/, - ""/*TODO: what should we use as connection id?*/).second) { + ""/*TODO: what should we use as connection id?*/); + if (result.second) { + startQueueReplicator(result.first); + } else { // FIXME aconway 2011-11-22: Normal to find queue already // exists if we're failing over. QPID_LOG(warning, "Replicated queue " << values[NAME] << " already exists (in catch-up)"); @@ -356,7 +388,7 @@ void WiringReplicator::doResponseQueue(Variant::Map& values) { void WiringReplicator::doResponseExchange(Variant::Map& values) { Variant::Map argsMap(values[ARGUMENTS].asMap()); - if (!isReplicated(argsMap)) return; + if (!replicateLevel(argsMap)) return; framing::FieldTable args; amqp_0_10::translate(argsMap, args); QPID_LOG(debug, "Creating replicated exchange " << values[NAME].asString() << " (in catch-up)"); @@ -396,23 +428,21 @@ const std::string QUEUE_REF("queueRef"); void WiringReplicator::doResponseBind(Variant::Map& values) { try { std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); - boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName); - if (!exchange) return; - std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); + QPID_LOG(critical, "FIXME doResponseBind " << qName << " to " << exName); + boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName); boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); - if (!queue) return; + QPID_LOG(critical, "FIXME doResponseBind ptrs " << queue.get() << " to " << exchange.get()); + // FIXME aconway 2011-11-24: more flexible configuration for binding replication. - // We only replicated a bind for a replicated queue to replicated exchange. - // FIXME aconway 2011-11-22: do we always log binds between replicated ex/q - // or do we consider the bind arguments as well? - if (exchange && queue && - isReplicated(exchange->getArgs()) && isReplicated(queue->getSettings())) + // Automatically replicate exchange if queue and exchange are replicated + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) { framing::FieldTable args; amqp_0_10::translate(values[ARGUMENTS].asMap(), args); string key = values[KEY].asString(); - QPID_LOG(debug, "Replicated binding exchange=" << exchange->getName() + QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); exchange->bind(queue, key, &args); @@ -420,6 +450,15 @@ void WiringReplicator::doResponseBind(Variant::Map& values) { } catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or exchange. } +void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { + QPID_LOG(critical, "FIXME startQueueReplicator " << queue->getName() << " " << queue->getSettings()); + if (replicateLevel(queue->getSettings()) == RL_ALL) { + QPID_LOG(critical, "FIXME startQueueReplicator starting"); + boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); + broker.getExchanges().registerExchange(qr); + } +} + bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.h b/qpid/cpp/src/qpid/ha/WiringReplicator.h index 55d6130cb8..6a5edb114c 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.h +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.h @@ -68,7 +68,7 @@ class WiringReplicator : public broker::Exchange void doResponseBind(types::Variant::Map& values); private: - void startQueueReplicator(const std::string& name); + void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); broker::Broker& broker; boost::shared_ptr<broker::Link> link; diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index aa83989649..8dcde5a863 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -519,6 +519,12 @@ class BrokerTest(TestCase): actual_contents = self.browse(session, queue, timeout) self.assertEqual(expect_contents, actual_contents) + def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01): + """Wait up to timeout for contents of queue to match expect_contents""" + def test(): return self.browse(session, queue, 0) == expect_contents + retry(test, timeout, delay) + self.assertEqual(expect_contents, self.browse(session, queue, 0)) + def join(thread, timeout=10): thread.join(timeout) if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread) diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index bd5b847510..2b52d202ca 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -57,50 +57,52 @@ class ShortTests(BrokerTest): def assert_missing(self,session, address): try: - session.receiver(a) + session.receiver(address) self.fail("Should not have been replicated: %s"%(address)) except NotFound: pass - def test_replicate_wiring(self): - queue="%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}" - exchange="%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}" - - # Create some wiring before starting the backup, to test catch-up + def test_replication(self): + def queue(name, replicate): + return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate) + + def exchange(name, replicate, bindq): + return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq) + def setup(p, prefix): + """Create config, send messages on the primary p""" + p.sender(queue(prefix+"q1", "all")).send(Message("1")) + p.sender(queue(prefix+"q2", "wiring")).send(Message("2")) + p.sender(queue(prefix+"q3", "none")).send(Message("3")) + p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4")) + p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5")) + # FIXME aconway 2011-11-24: need a marker so we can wait till sync is done. + p.sender(queue(prefix+"x", "wiring")) + + def verify(b, prefix): + """Verify setup was replicated to backup b""" + # FIXME aconway 2011-11-21: wait for wiring to replicate. + self.wait(b, prefix+"x"); + # Verify backup + # FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication. + self.assert_browse_retry(b, prefix+"q1", ["1", "4"]) + self.assert_browse_retry(b, prefix+"q2", []) # wiring only + self.assert_missing(b, prefix+"q3") + b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all + self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) + b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring + self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"]) + + # Create config, send messages before starting the backup, to test catch-up replication. primary = self.ha_broker(name="primary") p = primary.connect().session() - p.sender(queue%("q1", "all")).send(Message("1")) - p.sender(queue%("q2", "wiring")).send(Message("2")) - p.sender(queue%("q3", "none")).send(Message("3")) - p.sender(exchange%("e1", "all", "e1", "q2")).send(Message("4")) - - # Create some after starting backup, test steady-state replication + setup(p, "1") + # Start the backup backup = self.ha_broker(name="backup", broker_url=primary.host_port()) b = backup.connect().session() - # FIXME aconway 2011-11-21: need to wait for backup to be ready to test event replication - for a in ["q1", "q2", "e1"]: self.wait(b,a) - p.sender(queue%("q11", "all")).send(Message("11")) - p.sender(queue%("q12", "wiring")).send(Message("12")) - p.sender(queue%("q13", "none")).send(Message("13")) - p.sender(exchange%("e11", "all", "e11", "q12")).send(Message("14")) - - # Verify replication - # FIXME aconway 2011-11-18: We should kill primary here and fail over. - for a in ["q11", "q12", "e11"]: self.wait(b,a) - # FIXME aconway 2011-11-18: replicate messages -# self.assert_browse(b, "q11", ["11", "14", "e11"]) -# self.assert_browse(b, "q12", []) # wiring only -# self.assert_missing(b,"q13") - b.sender("e11").send(Message("e11")) # Verify bind - self.assert_browse(b, "q12", ["e11"]) - - for a in ["q1", "q2", "e1"]: self.wait(b,a) - # FIXME aconway 2011-11-18: replicate messages -# self.assert_browse(b, "q1", ["1", "4", "e1"]) -# self.assert_browse(b, "q2", []) # wiring only -# self.assert_missing(b,"q3") - b.sender("e1").send(Message("e1")) # Verify bind - self.assert_browse(b, "q2", ["e1"]) + verify(b, "1") + # Create config, send messages after starting the backup, to test steady-state replication. + setup(p, "2") + verify(b, "2") if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) |