summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-05-03 14:12:54 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-05-03 14:12:54 +0000
commit9a8d6e55a3e5c2a95dd6cfcfaf23de1e40ecdae0 (patch)
treefd03f154b603aa68ccd9f6e26e1e50ff8b6e9a0a
parent71bf4631e282b6a2be55cff6c6a5719907534c5e (diff)
downloadqpid-python-9a8d6e55a3e5c2a95dd6cfcfaf23de1e40ecdae0.tar.gz
QPID-3767: fix remote session and queue name to be unique
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3767@1333466 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp40
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h8
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp36
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h5
5 files changed, 63 insertions, 32 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index f9876d1ad8..53fe38a504 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -60,20 +60,18 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame)
Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
CancellationListener l, const _qmf::ArgsLinkBridge& _args,
InitializeCallback init) :
- link(_link), id(_id), args(_args), mgmtObject(0),
+ link(_link), channel(_id), args(_args), mgmtObject(0),
listener(l), name(_name), queueName("qpid.bridge_queue_"), persistenceId(0),
initialize(init), detached(false)
{
- std::stringstream title;
- title << id << "_" << name;
- queueName += title.str();
+ queueName += Uuid(true).str();
ManagementAgent* agent = link->getBroker()->getManagementAgent();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
(agent, this, link, name, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
- mgmtObject->set_channelId(id);
+ mgmtObject->set_channelId(channel);
agent->addObject(mgmtObject);
}
QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest);
@@ -91,7 +89,7 @@ void Bridge::create(Connection& c)
conn = &c;
FieldTable options;
if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
- SessionHandler& sessionHandler = c.getChannel(id);
+ SessionHandler& sessionHandler = c.getChannel(channel);
sessionHandler.setDetachedCallback(
boost::bind(&Bridge::sessionDetached, shared_from_this()));
if (args.i_srcIsLocal) {
@@ -99,15 +97,15 @@ void Bridge::create(Connection& c)
throw Exception("Dynamic routing not supported for push routes");
// Point the bridging commands at the local connection handler
pushHandler.reset(new PushHandler(&c));
- channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get()));
+ channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get()));
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
- session->attach(name, false);
+ session->attach(queueName, false);
session->commandPoint(0,0);
} else {
- sessionHandler.attachAs(name);
+ sessionHandler.attachAs(queueName);
// Point the bridging commands at the remote peer broker
peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
}
@@ -217,12 +215,8 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
Link::shared_ptr link;
if (kind == ENCODED_IDENTIFIER_V1) {
/** previous versions identified the bridge by host:port, not by name, and
- * transport wasn't provided. So create a unique name for the new bridge.
+ * transport wasn't provided. Try to find a link using those paramters.
*/
-
- framing::Uuid uuid(true);
- name = QPID_NAME_PREFIX + uuid.str();
-
buffer.getShortString(host);
port = buffer.getShort();
@@ -254,6 +248,12 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
bool dynamic(buffer.getOctet());
uint16_t sync = buffer.getShort();
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions did not provide a name for the bridge, so create one
+ */
+ name = createName(link->getName(), src, dest, key);
+ }
+
return links.declare(name, *link, durable, src, dest, key, is_queue,
is_local, id, excludes, dynamic, sync).first;
}
@@ -351,7 +351,7 @@ void Bridge::sendReorigin()
}
bool Bridge::resetProxy()
{
- SessionHandler& sessionHandler = conn->getChannel(id);
+ SessionHandler& sessionHandler = conn->getChannel(channel);
if (!sessionHandler.getSession()) peer.reset();
else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
return peer.get();
@@ -381,4 +381,14 @@ void Bridge::sessionDetached() {
detached = true;
}
+std::string Bridge::createName(const std::string& linkName,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key)
+{
+ std::stringstream keystream;
+ keystream << linkName << "!" << src << "!" << dest << "!" << key;
+ return keystream.str();
+}
+
}}
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index 286e8935c0..2ec9774dde 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -98,6 +98,12 @@ public:
std::string getQueueName() const { return queueName; }
const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
+ /** create a name for a bridge (if none supplied by user config) */
+ static std::string createName(const std::string& linkName,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key);
+
private:
// Callback when the bridge's session is detached.
void sessionDetached();
@@ -114,7 +120,7 @@ private:
std::auto_ptr<framing::AMQP_ServerProxy> peer;
Link* const link;
- framing::ChannelId id;
+ const framing::ChannelId channel;
qmf::org::apache::qpid::broker::ArgsLinkBridge args;
qmf::org::apache::qpid::broker::Bridge* mgmtObject;
CancellationListener listener;
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 249050d41f..c13ac19454 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -459,7 +459,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
* "create()" broker method if these features are needed.
* TBD: deprecate this interface.
*/
- QPID_LOG(warning, "The Broker::connect() method will be removed in a future release of QPID."
+ QPID_LOG(info, "The Broker::connect() method will be removed in a future release of QPID."
" Please use the Broker::create() method with type='link' instead.");
_qmf::ArgsBrokerConnect& hp=
dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
@@ -477,9 +477,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
// - this behavior is backward compatible with previous releases.
if (!links.getLink(hp.i_host, hp.i_port, transport)) {
// new link, need to generate a unique name for it
- framing::Uuid uuid(true);
std::pair<Link::shared_ptr, bool> response =
- links.declare(QPID_NAME_PREFIX + uuid.str(), hp.i_host, hp.i_port, transport,
+ links.declare(Link::createName(transport, hp.i_host, hp.i_port),
+ hp.i_host, hp.i_port, transport,
hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password);
if (!response.first) {
text = "Unable to create Link";
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index d2dea85dbf..1cc723a717 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -175,7 +175,7 @@ Link::Link(const string& _name,
broker->getTimer().add(timerTask);
stringstream exchangeName;
- exchangeName << "qpid.link." << transport << ":" << host << ":" << port;
+ exchangeName << "qpid.link." << name;
std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(exchangeName.str(),
exchangeTypeName);
failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
@@ -575,13 +575,8 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
string password;
string name;
- if (kind == ENCODED_IDENTIFIER_V1) {
- /** previous versions identified the Link by host:port, there was no name
- * assigned. So create a unique name for the new Link.
- */
- framing::Uuid uuid(true);
- name = QPID_NAME_PREFIX + uuid.str();
- } else {
+ if (kind == ENCODED_IDENTIFIER) {
+ // newer version provides a link name.
buffer.getShortString(name);
}
buffer.getShortString(host);
@@ -592,6 +587,13 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
buffer.getShortString(username);
buffer.getShortString(password);
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions identified the Link by host:port, there was no name
+ * assigned. So create a name for the new Link.
+ */
+ name = createName(transport, host, port);
+ }
+
return links.declare(name, host, port, transport, durable, authMechanism,
username, password).first;
}
@@ -652,7 +654,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te
/* TBD: deprecate this interface in favor of the Broker::create() method. The
* Broker::create() method allows the user to assign a name to the bridge.
*/
- QPID_LOG(warning, "The Link::bridge() method will be removed in a future release of QPID."
+ QPID_LOG(info, "The Link::bridge() method will be removed in a future release of QPID."
" Please use the Broker::create() method with type='bridge' instead.");
_qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
QPID_LOG(debug, "Link::bridge() request received; src=" << iargs.i_src <<
@@ -662,11 +664,10 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te
// existing bridge - this behavior is backward compatible with previous releases.
Bridge::shared_ptr bridge = links->getBridge(*this, iargs.i_src, iargs.i_dest, iargs.i_key);
if (!bridge) {
- // need to create a new bridge on this link
- framing::Uuid uuid(true);
- const std::string name(QPID_NAME_PREFIX + uuid.str());
+ // need to create a new bridge on this link.
std::pair<Bridge::shared_ptr, bool> rc =
- links->declare( name, *this, iargs.i_durable,
+ links->declare( Bridge::createName(name, iargs.i_src, iargs.i_dest, iargs.i_key),
+ *this, iargs.i_durable,
iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
iargs.i_dynamic, iargs.i_sync);
@@ -752,6 +753,15 @@ void Link::setState(const framing::FieldTable& state)
}
}
+std::string Link::createName(const std::string& transport,
+ const std::string& host,
+ uint16_t port)
+{
+ stringstream linkName;
+ linkName << QPID_NAME_PREFIX << transport << std::string(":")
+ << host << std::string(":") << port;
+ return linkName.str();
+}
const std::string Link::exchangeTypeName("qpid.LinkExchange");
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index 68de0ace98..312c425c95 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -187,6 +187,11 @@ class Link : public PersistableConfig, public management::Manageable {
// replicate internal state of this Link for clustering
void getState(framing::FieldTable& state) const;
void setState(const framing::FieldTable& state);
+
+ /** create a name for a link (if none supplied by user config) */
+ static std::string createName(const std::string& transport,
+ const std::string& host,
+ uint16_t port);
};
}
}