From 9a8d6e55a3e5c2a95dd6cfcfaf23de1e40ecdae0 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Thu, 3 May 2012 14:12:54 +0000 Subject: QPID-3767: fix remote session and queue name to be unique git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3767@1333466 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Bridge.cpp | 40 +++++++++++++++++++++++-------------- qpid/cpp/src/qpid/broker/Bridge.h | 8 +++++++- qpid/cpp/src/qpid/broker/Broker.cpp | 6 +++--- qpid/cpp/src/qpid/broker/Link.cpp | 36 +++++++++++++++++++++------------ qpid/cpp/src/qpid/broker/Link.h | 5 +++++ 5 files changed, 63 insertions(+), 32 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index f9876d1ad8..53fe38a504 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -60,20 +60,18 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, CancellationListener l, const _qmf::ArgsLinkBridge& _args, InitializeCallback init) : - link(_link), id(_id), args(_args), mgmtObject(0), + link(_link), channel(_id), args(_args), mgmtObject(0), listener(l), name(_name), queueName("qpid.bridge_queue_"), persistenceId(0), initialize(init), detached(false) { - std::stringstream title; - title << id << "_" << name; - queueName += title.str(); + queueName += Uuid(true).str(); ManagementAgent* agent = link->getBroker()->getManagementAgent(); if (agent != 0) { mgmtObject = new _qmf::Bridge (agent, this, link, name, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); - mgmtObject->set_channelId(id); + mgmtObject->set_channelId(channel); agent->addObject(mgmtObject); } QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest); @@ -91,7 +89,7 @@ void Bridge::create(Connection& c) conn = &c; FieldTable options; if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); - SessionHandler& sessionHandler = c.getChannel(id); + SessionHandler& sessionHandler = c.getChannel(channel); sessionHandler.setDetachedCallback( boost::bind(&Bridge::sessionDetached, shared_from_this())); if (args.i_srcIsLocal) { @@ -99,15 +97,15 @@ void Bridge::create(Connection& c) throw Exception("Dynamic routing not supported for push routes"); // Point the bridging commands at the local connection handler pushHandler.reset(new PushHandler(&c)); - channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get())); + channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get())); session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); - session->attach(name, false); + session->attach(queueName, false); session->commandPoint(0,0); } else { - sessionHandler.attachAs(name); + sessionHandler.attachAs(queueName); // Point the bridging commands at the remote peer broker peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); } @@ -217,12 +215,8 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) Link::shared_ptr link; if (kind == ENCODED_IDENTIFIER_V1) { /** previous versions identified the bridge by host:port, not by name, and - * transport wasn't provided. So create a unique name for the new bridge. + * transport wasn't provided. Try to find a link using those paramters. */ - - framing::Uuid uuid(true); - name = QPID_NAME_PREFIX + uuid.str(); - buffer.getShortString(host); port = buffer.getShort(); @@ -254,6 +248,12 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) bool dynamic(buffer.getOctet()); uint16_t sync = buffer.getShort(); + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions did not provide a name for the bridge, so create one + */ + name = createName(link->getName(), src, dest, key); + } + return links.declare(name, *link, durable, src, dest, key, is_queue, is_local, id, excludes, dynamic, sync).first; } @@ -351,7 +351,7 @@ void Bridge::sendReorigin() } bool Bridge::resetProxy() { - SessionHandler& sessionHandler = conn->getChannel(id); + SessionHandler& sessionHandler = conn->getChannel(channel); if (!sessionHandler.getSession()) peer.reset(); else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); return peer.get(); @@ -381,4 +381,14 @@ void Bridge::sessionDetached() { detached = true; } +std::string Bridge::createName(const std::string& linkName, + const std::string& src, + const std::string& dest, + const std::string& key) +{ + std::stringstream keystream; + keystream << linkName << "!" << src << "!" << dest << "!" << key; + return keystream.str(); +} + }} diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index 286e8935c0..2ec9774dde 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -98,6 +98,12 @@ public: std::string getQueueName() const { return queueName; } const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; } + /** create a name for a bridge (if none supplied by user config) */ + static std::string createName(const std::string& linkName, + const std::string& src, + const std::string& dest, + const std::string& key); + private: // Callback when the bridge's session is detached. void sessionDetached(); @@ -114,7 +120,7 @@ private: std::auto_ptr peer; Link* const link; - framing::ChannelId id; + const framing::ChannelId channel; qmf::org::apache::qpid::broker::ArgsLinkBridge args; qmf::org::apache::qpid::broker::Bridge* mgmtObject; CancellationListener listener; diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 249050d41f..c13ac19454 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -459,7 +459,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, * "create()" broker method if these features are needed. * TBD: deprecate this interface. */ - QPID_LOG(warning, "The Broker::connect() method will be removed in a future release of QPID." + QPID_LOG(info, "The Broker::connect() method will be removed in a future release of QPID." " Please use the Broker::create() method with type='link' instead."); _qmf::ArgsBrokerConnect& hp= dynamic_cast<_qmf::ArgsBrokerConnect&>(args); @@ -477,9 +477,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, // - this behavior is backward compatible with previous releases. if (!links.getLink(hp.i_host, hp.i_port, transport)) { // new link, need to generate a unique name for it - framing::Uuid uuid(true); std::pair response = - links.declare(QPID_NAME_PREFIX + uuid.str(), hp.i_host, hp.i_port, transport, + links.declare(Link::createName(transport, hp.i_host, hp.i_port), + hp.i_host, hp.i_port, transport, hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password); if (!response.first) { text = "Unable to create Link"; diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index d2dea85dbf..1cc723a717 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -175,7 +175,7 @@ Link::Link(const string& _name, broker->getTimer().add(timerTask); stringstream exchangeName; - exchangeName << "qpid.link." << transport << ":" << host << ":" << port; + exchangeName << "qpid.link." << name; std::pair rc = broker->getExchanges().declare(exchangeName.str(), exchangeTypeName); failoverExchange = boost::static_pointer_cast(rc.first); @@ -575,13 +575,8 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) string password; string name; - if (kind == ENCODED_IDENTIFIER_V1) { - /** previous versions identified the Link by host:port, there was no name - * assigned. So create a unique name for the new Link. - */ - framing::Uuid uuid(true); - name = QPID_NAME_PREFIX + uuid.str(); - } else { + if (kind == ENCODED_IDENTIFIER) { + // newer version provides a link name. buffer.getShortString(name); } buffer.getShortString(host); @@ -592,6 +587,13 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) buffer.getShortString(username); buffer.getShortString(password); + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions identified the Link by host:port, there was no name + * assigned. So create a name for the new Link. + */ + name = createName(transport, host, port); + } + return links.declare(name, host, port, transport, durable, authMechanism, username, password).first; } @@ -652,7 +654,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te /* TBD: deprecate this interface in favor of the Broker::create() method. The * Broker::create() method allows the user to assign a name to the bridge. */ - QPID_LOG(warning, "The Link::bridge() method will be removed in a future release of QPID." + QPID_LOG(info, "The Link::bridge() method will be removed in a future release of QPID." " Please use the Broker::create() method with type='bridge' instead."); _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args; QPID_LOG(debug, "Link::bridge() request received; src=" << iargs.i_src << @@ -662,11 +664,10 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te // existing bridge - this behavior is backward compatible with previous releases. Bridge::shared_ptr bridge = links->getBridge(*this, iargs.i_src, iargs.i_dest, iargs.i_key); if (!bridge) { - // need to create a new bridge on this link - framing::Uuid uuid(true); - const std::string name(QPID_NAME_PREFIX + uuid.str()); + // need to create a new bridge on this link. std::pair rc = - links->declare( name, *this, iargs.i_durable, + links->declare( Bridge::createName(name, iargs.i_src, iargs.i_dest, iargs.i_key), + *this, iargs.i_durable, iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, iargs.i_dynamic, iargs.i_sync); @@ -752,6 +753,15 @@ void Link::setState(const framing::FieldTable& state) } } +std::string Link::createName(const std::string& transport, + const std::string& host, + uint16_t port) +{ + stringstream linkName; + linkName << QPID_NAME_PREFIX << transport << std::string(":") + << host << std::string(":") << port; + return linkName.str(); +} const std::string Link::exchangeTypeName("qpid.LinkExchange"); diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index 68de0ace98..312c425c95 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -187,6 +187,11 @@ class Link : public PersistableConfig, public management::Manageable { // replicate internal state of this Link for clustering void getState(framing::FieldTable& state) const; void setState(const framing::FieldTable& state); + + /** create a name for a link (if none supplied by user config) */ + static std::string createName(const std::string& transport, + const std::string& host, + uint16_t port); }; } } -- cgit v1.2.1