summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Bridge.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp84
1 files changed, 76 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 5064320efb..cc76cf7f21 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -34,6 +34,18 @@ using qpid::framing::Buffer;
using qpid::management::ManagementAgent;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
namespace qpid {
namespace broker {
@@ -45,8 +57,9 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame)
Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
const _qmf::ArgsLinkBridge& _args) :
link(_link), id(_id), args(_args), mgmtObject(0),
- listener(l), name(Uuid(true).str()), persistenceId(0)
+ listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0)
{
+ queueName += name;
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
@@ -65,7 +78,10 @@ Bridge::~Bridge()
void Bridge::create(ConnectionState& c)
{
+ connState = &c;
if (args.i_srcIsLocal) {
+ if (args.i_dynamic)
+ throw Exception("Dynamic routing not supported for push routes");
// Point the bridging commands at the local connection handler
Connection* conn = dynamic_cast<Connection*>(&c);
if (conn == 0)
@@ -74,7 +90,7 @@ void Bridge::create(ConnectionState& c)
channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get()));
} else {
// Point the bridging commands at the remote peer broker
- channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput())));
+ channelHandler.reset(new framing::ChannelHandler(id, &(connState->getOutput())));
}
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
@@ -88,8 +104,6 @@ void Bridge::create(ConnectionState& c)
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
- string queue = "bridge_queue_";
- queue += Uuid(true).str();
FieldTable queueSettings;
if (args.i_tag.size()) {
@@ -103,19 +117,26 @@ void Bridge::create(ConnectionState& c)
if (args.i_excludes.size()) {
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
} else {
- const string& peerTag = c.getFederationPeerTag();
+ const string& peerTag = connState->getFederationPeerTag();
if (peerTag.size())
queueSettings.setString("qpid.trace.exclude", peerTag);
}
bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues?
bool autoDelete = !durable;//auto delete transient queues?
- peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
+ peer->getQueue().declare(queueName, "", false, durable, true, autoDelete, queueSettings);
if (!args.i_dynamic)
- peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
- peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
+ peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+ if (args.i_dynamic) {
+ Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
+ if (exchange.get() == 0)
+ throw Exception("Exchange not found for dynamic route");
+ exchange->registerDynamicBridge(this);
+ }
}
}
@@ -123,6 +144,11 @@ void Bridge::cancel()
{
peer->getMessage().cancel(args.i_dest);
peer->getSession().detach(name);
+ if (args.i_dynamic) {
+ Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
+ if (exchange.get() != 0)
+ exchange->removeDynamicBridge(this);
+ }
}
void Bridge::destroy()
@@ -220,4 +246,46 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId,
}
}
+void Bridge::propagateBinding(const string& key, const string& tagList,
+ const string& op, const string& origin)
+{
+ const string& localTag = link->getBroker()->getFederationTag();
+ const string& peerTag = connState->getFederationPeerTag();
+
+ if (tagList.find(peerTag) == tagList.npos) {
+ FieldTable bindArgs;
+ string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag);
+
+ bindArgs.setString(qpidFedOp, op);
+ bindArgs.setString(qpidFedTags, newTagList);
+ if (origin.empty())
+ bindArgs.setString(qpidFedOrigin, localTag);
+ else
+ bindArgs.setString(qpidFedOrigin, origin);
+
+ peer->getExchange().bind(queueName, args.i_src, key, bindArgs);
+ }
+}
+
+void Bridge::sendReorigin()
+{
+ FieldTable bindArgs;
+
+ bindArgs.setString(qpidFedOp, fedOpReorigin);
+ bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag());
+
+ peer->getExchange().bind(queueName, args.i_src, args.i_key, bindArgs);
+}
+
+bool Bridge::containsLocalTag(const string& tagList) const
+{
+ const string& localTag = link->getBroker()->getFederationTag();
+ return (tagList.find(localTag) != tagList.npos);
+}
+
+const string& Bridge::getLocalTag() const
+{
+ return link->getBroker()->getFederationTag();
+}
+
}}