diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 110 |
1 files changed, 82 insertions, 28 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 5b531e4636..53fe38a504 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -57,22 +57,21 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) conn->received(frame); } -Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, - const _qmf::ArgsLinkBridge& _args, +Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, + CancellationListener l, const _qmf::ArgsLinkBridge& _args, InitializeCallback init) : - link(_link), id(_id), args(_args), mgmtObject(0), - listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0), + link(_link), channel(_id), args(_args), mgmtObject(0), + listener(l), name(_name), queueName("qpid.bridge_queue_"), persistenceId(0), initialize(init), detached(false) { - std::stringstream title; - title << id << "_" << name; - queueName += title.str(); + queueName += Uuid(true).str(); ManagementAgent* agent = link->getBroker()->getManagementAgent(); if (agent != 0) { mgmtObject = new _qmf::Bridge - (agent, this, link, id, args.i_durable, args.i_src, args.i_dest, + (agent, this, link, name, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); + mgmtObject->set_channelId(channel); agent->addObject(mgmtObject); } QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest); @@ -90,7 +89,7 @@ void Bridge::create(Connection& c) conn = &c; FieldTable options; if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); - SessionHandler& sessionHandler = c.getChannel(id); + SessionHandler& sessionHandler = c.getChannel(channel); sessionHandler.setDetachedCallback( boost::bind(&Bridge::sessionDetached, shared_from_this())); if (args.i_srcIsLocal) { @@ -98,15 +97,15 @@ void Bridge::create(Connection& c) throw Exception("Dynamic routing not supported for push routes"); // Point the bridging commands at the local connection handler pushHandler.reset(new PushHandler(&c)); - channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get())); + channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get())); session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); - session->attach(name, false); + session->attach(queueName, false); session->commandPoint(0,0); } else { - sessionHandler.attachAs(name); + sessionHandler.attachAs(queueName); // Point the bridging commands at the remote peer broker peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); } @@ -168,6 +167,7 @@ void Bridge::cancel(Connection&) QPID_LOG(debug, "Cancelled bridge " << name); } +/** Notify the bridge that the connection has closed */ void Bridge::closed() { if (args.i_dynamic) { @@ -177,9 +177,10 @@ void Bridge::closed() QPID_LOG(debug, "Closed bridge " << name); } -void Bridge::destroy() +/** Shut down the bridge */ +void Bridge::close() { - listener(this); + listener(this); // ask the LinkRegistry to destroy us } void Bridge::setPersistenceId(uint64_t pId) const @@ -187,8 +188,21 @@ void Bridge::setPersistenceId(uint64_t pId) const persistenceId = pId; } + +const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2"); +const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge"); + +bool Bridge::isEncodedBridge(const std::string& key) +{ + return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1; +} + + Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) { + string kind; + buffer.getShortString(kind); + string host; uint16_t port; string src; @@ -196,9 +210,33 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) string key; string id; string excludes; + string name; + + Link::shared_ptr link; + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions identified the bridge by host:port, not by name, and + * transport wasn't provided. Try to find a link using those paramters. + */ + buffer.getShortString(host); + port = buffer.getShort(); + + link = links.getLink(host, port); + if (!link) { + QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port); + return Bridge::shared_ptr(); + } + } else { + string linkName; + + buffer.getShortString(name); + buffer.getShortString(linkName); + link = links.getLink(linkName); + if (!link) { + QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'"); + return Bridge::shared_ptr(); + } + } - buffer.getShortString(host); - port = buffer.getShort(); bool durable(buffer.getOctet()); buffer.getShortString(src); buffer.getShortString(dest); @@ -210,15 +248,21 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) bool dynamic(buffer.getOctet()); uint16_t sync = buffer.getShort(); - return links.declare(host, port, durable, src, dest, key, - is_queue, is_local, id, excludes, dynamic, sync).first; + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions did not provide a name for the bridge, so create one + */ + name = createName(link->getName(), src, dest, key); + } + + return links.declare(name, *link, durable, src, dest, key, is_queue, + is_local, id, excludes, dynamic, sync).first; } void Bridge::encode(Buffer& buffer) const { - buffer.putShortString(string("bridge")); - buffer.putShortString(link->getHost()); - buffer.putShort(link->getPort()); + buffer.putShortString(ENCODED_IDENTIFIER); + buffer.putShortString(name); + buffer.putShortString(link->getName()); buffer.putOctet(args.i_durable ? 1 : 0); buffer.putShortString(args.i_src); buffer.putShortString(args.i_dest); @@ -233,9 +277,9 @@ void Bridge::encode(Buffer& buffer) const uint32_t Bridge::encodedSize() const { - return link->getHost().size() + 1 // short-string (host) - + 7 // short-string ("bridge") - + 2 // port + return ENCODED_IDENTIFIER.size() + 1 // +1 byte length + + name.size() + 1 + + link->getName().size() + 1 + 1 // durable + args.i_src.size() + 1 + args.i_dest.size() + 1 @@ -259,7 +303,8 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, { if (methodId == _qmf::Bridge::METHOD_CLOSE) { //notify that we are closed - destroy(); + QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'"); + close(); return management::Manageable::STATUS_OK; } else { return management::Manageable::STATUS_UNKNOWN_METHOD; @@ -306,7 +351,7 @@ void Bridge::sendReorigin() } bool Bridge::resetProxy() { - SessionHandler& sessionHandler = conn->getChannel(id); + SessionHandler& sessionHandler = conn->getChannel(channel); if (!sessionHandler.getSession()) peer.reset(); else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); return peer.get(); @@ -318,7 +363,7 @@ void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchang peer->getExchange().bind(queue, exchange, key, args); } else { QPID_LOG(error, "Cannot propagate binding for dynamic bridge as session has been detached, deleting dynamic bridge"); - destroy(); + close(); } } @@ -332,9 +377,18 @@ const string& Bridge::getLocalTag() const { return link->getBroker()->getFederationTag(); } - void Bridge::sessionDetached() { detached = true; } +std::string Bridge::createName(const std::string& linkName, + const std::string& src, + const std::string& dest, + const std::string& key) +{ + std::stringstream keystream; + keystream << linkName << "!" << src << "!" << dest << "!" << key; + return keystream.str(); +} + }} |