diff options
author | Alan Conway <aconway@apache.org> | 2012-02-14 16:00:52 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-14 16:00:52 +0000 |
commit | b44b95ab1b1e0550d23f2b1d5c4a31680d1a8cf2 (patch) | |
tree | 874cb75ef75b06235f8ce25d41aeeda9c424a9e1 | |
parent | 4f818b6166b6909f58f2203ee19d52c2e4ddf222 (diff) | |
download | qpid-python-b44b95ab1b1e0550d23f2b1d5c4a31680d1a8cf2.tar.gz |
QPID-3603: Move init code for WiringReplicator out of Bridge into ha::Backup.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-6@1244029 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 80 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.h | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 76 |
6 files changed, 114 insertions, 75 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index e4d173af9c..d053106309 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -59,9 +59,11 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) } Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, - const _qmf::ArgsLinkBridge& _args) : + const _qmf::ArgsLinkBridge& _args, + InitializeCallback init) : link(_link), id(_id), args(_args), mgmtObject(0), - listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0) + listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0), + initialize(init) { std::stringstream title; title << id << "_" << name; @@ -77,9 +79,9 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest); } -Bridge::~Bridge() +Bridge::~Bridge() { - mgmtObject->resourceDestroy(); + mgmtObject->resourceDestroy(); } void Bridge::create(Connection& c) @@ -98,7 +100,7 @@ void Bridge::create(Connection& c) session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); - + session->attach(name, false); session->commandPoint(0,0); } else { @@ -108,7 +110,8 @@ void Bridge::create(Connection& c) } if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking(); - if (args.i_srcIsQueue) { + if (initialize) initialize(*this, sessionHandler); + else if (args.i_srcIsQueue) { //TODO: something other than this which is nasty... bool isReplicatingLink = QueueReplicator::initReplicationSettings(args.i_dest, link->getBroker()->getQueues(), options); @@ -116,52 +119,6 @@ void Bridge::create(Connection& c) peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest); - } else if (ha::WiringReplicator::isWiringReplicatorDestination(args.i_dest)) { - //declare and bind an event queue - peer->getQueue().declare(queueName, "", false, false, true, true, FieldTable()); - peer->getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#", FieldTable()); - //subscribe to the queue - peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - - //issue a query request for queues and another for exchanges using event queue as the reply-to address - for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions - Variant::Map request; - request["_what"] = "OBJECT"; - Variant::Map schema; - schema["_class_name"] = (i == 0 ? "queue" : "exchange"); - schema["_package_name"] = "org.apache.qpid.broker"; - request["_schema_id"] = schema; - - AMQFrame method((MessageTransferBody(qpid::framing::ProtocolVersion(), "qmf.default.direct", 0, 0))); - method.setBof(true); - method.setEof(false); - method.setBos(true); - method.setEos(true); - AMQHeaderBody headerBody; - MessageProperties* props = headerBody.get<MessageProperties>(true); - props->setReplyTo(qpid::framing::ReplyTo("", queueName)); - props->setAppId("qmf2"); - props->getApplicationHeaders().setString("qmf.opcode", "_query_request"); - headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker"); - AMQFrame header(headerBody); - header.setBof(false); - header.setEof(false); - header.setBos(true); - header.setEos(true); - AMQContentBody data; - qpid::amqp_0_10::MapCodec::encode(request, data.getData()); - AMQFrame content(data); - content.setBof(false); - content.setEof(true); - content.setBos(true); - content.setEos(true); - sessionHandler.out->handle(method); - sessionHandler.out->handle(header); - sessionHandler.out->handle(content); - } - } else { FieldTable queueSettings; @@ -236,11 +193,6 @@ void Bridge::setPersistenceId(uint64_t pId) const persistenceId = pId; } -const string& Bridge::getName() const -{ - return name; -} - Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) { string host; @@ -268,7 +220,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) is_queue, is_local, id, excludes, dynamic, sync).first; } -void Bridge::encode(Buffer& buffer) const +void Bridge::encode(Buffer& buffer) const { buffer.putShortString(string("bridge")); buffer.putShortString(link->getHost()); @@ -285,8 +237,8 @@ void Bridge::encode(Buffer& buffer) const buffer.putShort(args.i_sync); } -uint32_t Bridge::encodedSize() const -{ +uint32_t Bridge::encodedSize() const +{ return link->getHost().size() + 1 // short-string (host) + 7 // short-string ("bridge") + 2 // port @@ -311,7 +263,7 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, management::Args& /*args*/, string&) { - if (methodId == _qmf::Bridge::METHOD_CLOSE) { + if (methodId == _qmf::Bridge::METHOD_CLOSE) { //notify that we are closed destroy(); return management::Manageable::STATUS_OK; @@ -358,7 +310,7 @@ void Bridge::sendReorigin() conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this, queueName, args.i_src, args.i_key, bindArgs)); } -bool Bridge::resetProxy() +bool Bridge::resetProxy() { SessionHandler& sessionHandler = conn->getChannel(id); if (!sessionHandler.getSession()) peer.reset(); diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index 8b4559a871..b849b11ba8 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -42,15 +42,19 @@ class Connection; class ConnectionState; class Link; class LinkRegistry; +class SessionHandler; class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge { public: typedef boost::shared_ptr<Bridge> shared_ptr; typedef boost::function<void(Bridge*)> CancellationListener; + typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback; Bridge(Link* link, framing::ChannelId id, CancellationListener l, - const qmf::org::apache::qpid::broker::ArgsLinkBridge& args); + const qmf::org::apache::qpid::broker::ArgsLinkBridge& args, + InitializeCallback init + ); ~Bridge(); void create(Connection& c); @@ -70,8 +74,8 @@ public: void setPersistenceId(uint64_t id) const; uint64_t getPersistenceId() const { return persistenceId; } uint32_t encodedSize() const; - void encode(framing::Buffer& buffer) const; - const std::string& getName() const; + void encode(framing::Buffer& buffer) const; + const std::string& getName() const { return name; } static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); // Exchange::DynamicBridge methods @@ -81,6 +85,10 @@ public: bool containsLocalTag(const std::string& tagList) const; const std::string& getLocalTag() const; + // Methods needed by initialization functions + std::string getQueueName() const { return queueName; } + const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; } + private: struct PushHandler : framing::FrameHandler { PushHandler(Connection* c) { conn = c; } @@ -103,6 +111,7 @@ private: mutable uint64_t persistenceId; ConnectionState* connState; Connection* conn; + InitializeCallback initialize; bool resetProxy(); }; diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index cf6cb83968..31b4f1b490 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -162,7 +162,9 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, const std::string& tag, const std::string& excludes, bool dynamic, - uint16_t sync) + uint16_t sync, + Bridge::InitializeCallback init +) { Mutex::ScopedLock locker(lock); QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")"); @@ -196,7 +198,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, bridge = Bridge::shared_ptr (new Bridge (l->second.get(), l->second->nextChannel(), boost::bind(&LinkRegistry::destroy, this, - host, port, src, dest, key), args)); + host, port, src, dest, key), + args, init)); bridges[bridgeKey] = bridge; l->second->add(bridge); return std::pair<Bridge::shared_ptr, bool>(bridge, true); diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h index f4bc9aab40..7e5b39f223 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.h +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h @@ -91,6 +91,7 @@ namespace broker { const std::string& authMechanism, const std::string& username, const std::string& password); + std::pair<Bridge::shared_ptr, bool> declare(const std::string& host, uint16_t port, @@ -103,9 +104,12 @@ namespace broker { const std::string& id, const std::string& excludes, bool dynamic, - uint16_t sync); + uint16_t sync, + Bridge::InitializeCallback=0 + ); void destroy(const std::string& host, const uint16_t port); + void destroy(const std::string& host, const uint16_t port, const std::string& src, diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index ca6d6bb193..8cd5072574 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -23,6 +23,7 @@ */ #include "qpid/amqp_0_10/SessionHandler.h" +#include "qpid/broker/SessionHandler.h" #include "qpid/framing/AMQP_ClientProxy.h" namespace qpid { diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 6b99ca74c3..ef581c1a04 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -21,11 +21,79 @@ #include "Backup.h" #include "Settings.h" #include "qpid/Url.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/SessionHandler.h" +#include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/types/Variant.h" namespace qpid { namespace ha { +using namespace framing; +using namespace broker; +using types::Variant; + +namespace { +const std::string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); +} + +// Initialize a bridge as a wiring replicator. +void bridgeInitWiringReplicator(Bridge& bridge, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + std::string queueName = bridge.getQueueName(); + const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); + + //declare and bind an event queue + peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); + peer.getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#", FieldTable()); + //subscribe to the queue + peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + + //issue a query request for queues and another for exchanges using event queue as the reply-to address + for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions + Variant::Map request; + request["_what"] = "OBJECT"; + Variant::Map schema; + schema["_class_name"] = (i == 0 ? "queue" : "exchange"); + schema["_package_name"] = "org.apache.qpid.broker"; + request["_schema_id"] = schema; + + AMQFrame method((MessageTransferBody(ProtocolVersion(), "qmf.default.direct", 0, 0))); + method.setBof(true); + method.setEof(false); + method.setBos(true); + method.setEos(true); + AMQHeaderBody headerBody; + MessageProperties* props = headerBody.get<MessageProperties>(true); + props->setReplyTo(qpid::framing::ReplyTo("", queueName)); + props->setAppId("qmf2"); + props->getApplicationHeaders().setString("qmf.opcode", "_query_request"); + headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker"); + AMQFrame header(headerBody); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + AMQContentBody data; + qpid::amqp_0_10::MapCodec::encode(request, data.getData()); + AMQFrame content(data); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + sessionHandler.out->handle(method); + sessionHandler.out->handle(header); + sessionHandler.out->handle(content); + } +} + Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { // Create a link to replicate wiring if (s.brokerUrl != "dummy") { @@ -41,15 +109,17 @@ Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { broker.getLinks().declare( // Declare the bridge url[0].host, url[0].port, false, // durable - "qpid.wiring-replicator", // src - "qpid.wiring-replicator", // dest + QPID_WIRING_REPLICATOR, // src + QPID_WIRING_REPLICATOR, // dest "x", // key false, // isQueue false, // isLocal "", // id/tag "", // excludes false, // dynamic - 0); // sync? + 0, // sync? + bridgeInitWiringReplicator + ); } // FIXME aconway 2011-11-17: need to enhance the link code to // handle discovery of the primary broker and fail-over correctly. |