diff options
Diffstat (limited to 'cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 75 |
1 files changed, 46 insertions, 29 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 93333f1410..65ed38b731 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -20,6 +20,7 @@ */ #include "Bridge.h" #include "ConnectionState.h" +#include "Connection.h" #include "LinkRegistry.h" #include "qpid/agent/ManagementAgent.h" @@ -36,6 +37,11 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { namespace broker { +void Bridge::PushHandler::handle(framing::AMQFrame& frame) +{ + conn->received(frame); +} + Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, const _qmf::ArgsLinkBridge& _args) : link(_link), id(_id), args(_args), mgmtObject(0), @@ -46,7 +52,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, mgmtObject = new _qmf::Bridge (agent, this, link, id, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, - args.i_tag, args.i_excludes); + args.i_tag, args.i_excludes, args.i_dynamic); if (!args.i_durable) agent->addObject(mgmtObject); } @@ -59,39 +65,47 @@ Bridge::~Bridge() void Bridge::create(ConnectionState& c) { - channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput()))); + if (args.i_srcIsLocal) { + // Point the bridging commands at the local connection handler + Connection* conn = dynamic_cast<Connection*>(&c); + if (conn == 0) + return; + pushHandler.reset(new PushHandler(conn)); + channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get())); + } else { + // Point the bridging commands at the remote peer broker + channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput()))); + } + session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); session->attach(name, false); session->commandPoint(0,0); - - if (args.i_srcIsLocal) { - //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received() + + if (args.i_srcIsQueue) { + peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } else { - if (args.i_srcIsQueue) { - peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - } else { - string queue = "bridge_queue_"; - queue += Uuid(true).str(); - FieldTable queueSettings; - if (args.i_tag.size()) { - queueSettings.setString("qpid.trace.id", args.i_tag); - } - if (args.i_excludes.size()) { - queueSettings.setString("qpid.trace.exclude", args.i_excludes); - } - - bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues? - bool autoDelete = !durable;//auto delete transient queues? - peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); - peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); - peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + string queue = "bridge_queue_"; + queue += Uuid(true).str(); + FieldTable queueSettings; + if (args.i_tag.size()) { + queueSettings.setString("qpid.trace.id", args.i_tag); } + if (args.i_excludes.size()) { + queueSettings.setString("qpid.trace.exclude", args.i_excludes); + } + + bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues? + bool autoDelete = !durable;//auto delete transient queues? + peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); + if (!args.i_dynamic) + peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); + peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } } @@ -140,9 +154,10 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) bool is_local(buffer.getOctet()); buffer.getShortString(id); buffer.getShortString(excludes); + bool dynamic(buffer.getOctet()); return links.declare(host, port, durable, src, dest, key, - is_queue, is_local, id, excludes).first; + is_queue, is_local, id, excludes, dynamic).first; } void Bridge::encode(Buffer& buffer) const @@ -158,6 +173,7 @@ void Bridge::encode(Buffer& buffer) const buffer.putOctet(args.i_srcIsLocal ? 1 : 0); buffer.putShortString(args.i_tag); buffer.putShortString(args.i_excludes); + buffer.putOctet(args.i_dynamic ? 1 : 0); } uint32_t Bridge::encodedSize() const @@ -172,7 +188,8 @@ uint32_t Bridge::encodedSize() const + 1 // srcIsQueue + 1 // srcIsLocal + args.i_tag.size() + 1 - + args.i_excludes.size() + 1; + + args.i_excludes.size() + 1 + + 1; // dynamic } management::ManagementObject* Bridge::GetManagementObject (void) const |