summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Bridge.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp110
1 files changed, 82 insertions, 28 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 5b531e4636..53fe38a504 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -57,22 +57,21 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame)
conn->received(frame);
}
-Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
- const _qmf::ArgsLinkBridge& _args,
+Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
+ CancellationListener l, const _qmf::ArgsLinkBridge& _args,
InitializeCallback init) :
- link(_link), id(_id), args(_args), mgmtObject(0),
- listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0),
+ link(_link), channel(_id), args(_args), mgmtObject(0),
+ listener(l), name(_name), queueName("qpid.bridge_queue_"), persistenceId(0),
initialize(init), detached(false)
{
- std::stringstream title;
- title << id << "_" << name;
- queueName += title.str();
+ queueName += Uuid(true).str();
ManagementAgent* agent = link->getBroker()->getManagementAgent();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
- (agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
+ (agent, this, link, name, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
+ mgmtObject->set_channelId(channel);
agent->addObject(mgmtObject);
}
QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest);
@@ -90,7 +89,7 @@ void Bridge::create(Connection& c)
conn = &c;
FieldTable options;
if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
- SessionHandler& sessionHandler = c.getChannel(id);
+ SessionHandler& sessionHandler = c.getChannel(channel);
sessionHandler.setDetachedCallback(
boost::bind(&Bridge::sessionDetached, shared_from_this()));
if (args.i_srcIsLocal) {
@@ -98,15 +97,15 @@ void Bridge::create(Connection& c)
throw Exception("Dynamic routing not supported for push routes");
// Point the bridging commands at the local connection handler
pushHandler.reset(new PushHandler(&c));
- channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get()));
+ channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get()));
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
- session->attach(name, false);
+ session->attach(queueName, false);
session->commandPoint(0,0);
} else {
- sessionHandler.attachAs(name);
+ sessionHandler.attachAs(queueName);
// Point the bridging commands at the remote peer broker
peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
}
@@ -168,6 +167,7 @@ void Bridge::cancel(Connection&)
QPID_LOG(debug, "Cancelled bridge " << name);
}
+/** Notify the bridge that the connection has closed */
void Bridge::closed()
{
if (args.i_dynamic) {
@@ -177,9 +177,10 @@ void Bridge::closed()
QPID_LOG(debug, "Closed bridge " << name);
}
-void Bridge::destroy()
+/** Shut down the bridge */
+void Bridge::close()
{
- listener(this);
+ listener(this); // ask the LinkRegistry to destroy us
}
void Bridge::setPersistenceId(uint64_t pId) const
@@ -187,8 +188,21 @@ void Bridge::setPersistenceId(uint64_t pId) const
persistenceId = pId;
}
+
+const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2");
+const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge");
+
+bool Bridge::isEncodedBridge(const std::string& key)
+{
+ return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1;
+}
+
+
Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
{
+ string kind;
+ buffer.getShortString(kind);
+
string host;
uint16_t port;
string src;
@@ -196,9 +210,33 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
string key;
string id;
string excludes;
+ string name;
+
+ Link::shared_ptr link;
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions identified the bridge by host:port, not by name, and
+ * transport wasn't provided. Try to find a link using those paramters.
+ */
+ buffer.getShortString(host);
+ port = buffer.getShort();
+
+ link = links.getLink(host, port);
+ if (!link) {
+ QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port);
+ return Bridge::shared_ptr();
+ }
+ } else {
+ string linkName;
+
+ buffer.getShortString(name);
+ buffer.getShortString(linkName);
+ link = links.getLink(linkName);
+ if (!link) {
+ QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'");
+ return Bridge::shared_ptr();
+ }
+ }
- buffer.getShortString(host);
- port = buffer.getShort();
bool durable(buffer.getOctet());
buffer.getShortString(src);
buffer.getShortString(dest);
@@ -210,15 +248,21 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
bool dynamic(buffer.getOctet());
uint16_t sync = buffer.getShort();
- return links.declare(host, port, durable, src, dest, key,
- is_queue, is_local, id, excludes, dynamic, sync).first;
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions did not provide a name for the bridge, so create one
+ */
+ name = createName(link->getName(), src, dest, key);
+ }
+
+ return links.declare(name, *link, durable, src, dest, key, is_queue,
+ is_local, id, excludes, dynamic, sync).first;
}
void Bridge::encode(Buffer& buffer) const
{
- buffer.putShortString(string("bridge"));
- buffer.putShortString(link->getHost());
- buffer.putShort(link->getPort());
+ buffer.putShortString(ENCODED_IDENTIFIER);
+ buffer.putShortString(name);
+ buffer.putShortString(link->getName());
buffer.putOctet(args.i_durable ? 1 : 0);
buffer.putShortString(args.i_src);
buffer.putShortString(args.i_dest);
@@ -233,9 +277,9 @@ void Bridge::encode(Buffer& buffer) const
uint32_t Bridge::encodedSize() const
{
- return link->getHost().size() + 1 // short-string (host)
- + 7 // short-string ("bridge")
- + 2 // port
+ return ENCODED_IDENTIFIER.size() + 1 // +1 byte length
+ + name.size() + 1
+ + link->getName().size() + 1
+ 1 // durable
+ args.i_src.size() + 1
+ args.i_dest.size() + 1
@@ -259,7 +303,8 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId,
{
if (methodId == _qmf::Bridge::METHOD_CLOSE) {
//notify that we are closed
- destroy();
+ QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'");
+ close();
return management::Manageable::STATUS_OK;
} else {
return management::Manageable::STATUS_UNKNOWN_METHOD;
@@ -306,7 +351,7 @@ void Bridge::sendReorigin()
}
bool Bridge::resetProxy()
{
- SessionHandler& sessionHandler = conn->getChannel(id);
+ SessionHandler& sessionHandler = conn->getChannel(channel);
if (!sessionHandler.getSession()) peer.reset();
else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
return peer.get();
@@ -318,7 +363,7 @@ void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchang
peer->getExchange().bind(queue, exchange, key, args);
} else {
QPID_LOG(error, "Cannot propagate binding for dynamic bridge as session has been detached, deleting dynamic bridge");
- destroy();
+ close();
}
}
@@ -332,9 +377,18 @@ const string& Bridge::getLocalTag() const
{
return link->getBroker()->getFederationTag();
}
-
void Bridge::sessionDetached() {
detached = true;
}
+std::string Bridge::createName(const std::string& linkName,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key)
+{
+ std::stringstream keystream;
+ keystream << linkName << "!" << src << "!" << dest << "!" << key;
+ return keystream.str();
+}
+
}}