summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Bridge.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp75
1 files changed, 46 insertions, 29 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 93333f1410..65ed38b731 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -20,6 +20,7 @@
*/
#include "Bridge.h"
#include "ConnectionState.h"
+#include "Connection.h"
#include "LinkRegistry.h"
#include "qpid/agent/ManagementAgent.h"
@@ -36,6 +37,11 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace qpid {
namespace broker {
+void Bridge::PushHandler::handle(framing::AMQFrame& frame)
+{
+ conn->received(frame);
+}
+
Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
const _qmf::ArgsLinkBridge& _args) :
link(_link), id(_id), args(_args), mgmtObject(0),
@@ -46,7 +52,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
mgmtObject = new _qmf::Bridge
(agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
- args.i_tag, args.i_excludes);
+ args.i_tag, args.i_excludes, args.i_dynamic);
if (!args.i_durable)
agent->addObject(mgmtObject);
}
@@ -59,39 +65,47 @@ Bridge::~Bridge()
void Bridge::create(ConnectionState& c)
{
- channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput())));
+ if (args.i_srcIsLocal) {
+ // Point the bridging commands at the local connection handler
+ Connection* conn = dynamic_cast<Connection*>(&c);
+ if (conn == 0)
+ return;
+ pushHandler.reset(new PushHandler(conn));
+ channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get()));
+ } else {
+ // Point the bridging commands at the remote peer broker
+ channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput())));
+ }
+
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
session->attach(name, false);
session->commandPoint(0,0);
-
- if (args.i_srcIsLocal) {
- //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
+
+ if (args.i_srcIsQueue) {
+ peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
- if (args.i_srcIsQueue) {
- peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
- } else {
- string queue = "bridge_queue_";
- queue += Uuid(true).str();
- FieldTable queueSettings;
- if (args.i_tag.size()) {
- queueSettings.setString("qpid.trace.id", args.i_tag);
- }
- if (args.i_excludes.size()) {
- queueSettings.setString("qpid.trace.exclude", args.i_excludes);
- }
-
- bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues?
- bool autoDelete = !durable;//auto delete transient queues?
- peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
- peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
- peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ string queue = "bridge_queue_";
+ queue += Uuid(true).str();
+ FieldTable queueSettings;
+ if (args.i_tag.size()) {
+ queueSettings.setString("qpid.trace.id", args.i_tag);
}
+ if (args.i_excludes.size()) {
+ queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+ }
+
+ bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues?
+ bool autoDelete = !durable;//auto delete transient queues?
+ peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
+ if (!args.i_dynamic)
+ peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
+ peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
}
}
@@ -140,9 +154,10 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
bool is_local(buffer.getOctet());
buffer.getShortString(id);
buffer.getShortString(excludes);
+ bool dynamic(buffer.getOctet());
return links.declare(host, port, durable, src, dest, key,
- is_queue, is_local, id, excludes).first;
+ is_queue, is_local, id, excludes, dynamic).first;
}
void Bridge::encode(Buffer& buffer) const
@@ -158,6 +173,7 @@ void Bridge::encode(Buffer& buffer) const
buffer.putOctet(args.i_srcIsLocal ? 1 : 0);
buffer.putShortString(args.i_tag);
buffer.putShortString(args.i_excludes);
+ buffer.putOctet(args.i_dynamic ? 1 : 0);
}
uint32_t Bridge::encodedSize() const
@@ -172,7 +188,8 @@ uint32_t Bridge::encodedSize() const
+ 1 // srcIsQueue
+ 1 // srcIsLocal
+ args.i_tag.size() + 1
- + args.i_excludes.size() + 1;
+ + args.i_excludes.size() + 1
+ + 1; // dynamic
}
management::ManagementObject* Bridge::GetManagementObject (void) const