summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-10-10 19:24:40 +0000
committerTed Ross <tross@apache.org>2008-10-10 19:24:40 +0000
commit5605be8d83cb6072780525f2183e637135a9004a (patch)
tree4ec6df99a1544025e25be0c14f72612025b1f96b
parent74e4bdcd985592b1bc786151ce16467dfdc7e471 (diff)
downloadqpid-python-5605be8d83cb6072780525f2183e637135a9004a.tar.gz
QPID-1349 - Push routing for federation (includes hook for dynamic routing)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@703561 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp75
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h9
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.h3
-rwxr-xr-xqpid/cpp/src/tests/federation.py44
-rwxr-xr-xqpid/python/commands/qpid-route2
-rw-r--r--qpid/specs/management-schema.xml12
8 files changed, 114 insertions, 38 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 93333f1410..65ed38b731 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -20,6 +20,7 @@
*/
#include "Bridge.h"
#include "ConnectionState.h"
+#include "Connection.h"
#include "LinkRegistry.h"
#include "qpid/agent/ManagementAgent.h"
@@ -36,6 +37,11 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace qpid {
namespace broker {
+void Bridge::PushHandler::handle(framing::AMQFrame& frame)
+{
+ conn->received(frame);
+}
+
Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
const _qmf::ArgsLinkBridge& _args) :
link(_link), id(_id), args(_args), mgmtObject(0),
@@ -46,7 +52,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
mgmtObject = new _qmf::Bridge
(agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
- args.i_tag, args.i_excludes);
+ args.i_tag, args.i_excludes, args.i_dynamic);
if (!args.i_durable)
agent->addObject(mgmtObject);
}
@@ -59,39 +65,47 @@ Bridge::~Bridge()
void Bridge::create(ConnectionState& c)
{
- channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput())));
+ if (args.i_srcIsLocal) {
+ // Point the bridging commands at the local connection handler
+ Connection* conn = dynamic_cast<Connection*>(&c);
+ if (conn == 0)
+ return;
+ pushHandler.reset(new PushHandler(conn));
+ channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get()));
+ } else {
+ // Point the bridging commands at the remote peer broker
+ channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput())));
+ }
+
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
session->attach(name, false);
session->commandPoint(0,0);
-
- if (args.i_srcIsLocal) {
- //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
+
+ if (args.i_srcIsQueue) {
+ peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
- if (args.i_srcIsQueue) {
- peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
- } else {
- string queue = "bridge_queue_";
- queue += Uuid(true).str();
- FieldTable queueSettings;
- if (args.i_tag.size()) {
- queueSettings.setString("qpid.trace.id", args.i_tag);
- }
- if (args.i_excludes.size()) {
- queueSettings.setString("qpid.trace.exclude", args.i_excludes);
- }
-
- bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues?
- bool autoDelete = !durable;//auto delete transient queues?
- peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
- peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
- peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ string queue = "bridge_queue_";
+ queue += Uuid(true).str();
+ FieldTable queueSettings;
+ if (args.i_tag.size()) {
+ queueSettings.setString("qpid.trace.id", args.i_tag);
}
+ if (args.i_excludes.size()) {
+ queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+ }
+
+ bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues?
+ bool autoDelete = !durable;//auto delete transient queues?
+ peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
+ if (!args.i_dynamic)
+ peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
+ peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
}
}
@@ -140,9 +154,10 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
bool is_local(buffer.getOctet());
buffer.getShortString(id);
buffer.getShortString(excludes);
+ bool dynamic(buffer.getOctet());
return links.declare(host, port, durable, src, dest, key,
- is_queue, is_local, id, excludes).first;
+ is_queue, is_local, id, excludes, dynamic).first;
}
void Bridge::encode(Buffer& buffer) const
@@ -158,6 +173,7 @@ void Bridge::encode(Buffer& buffer) const
buffer.putOctet(args.i_srcIsLocal ? 1 : 0);
buffer.putShortString(args.i_tag);
buffer.putShortString(args.i_excludes);
+ buffer.putOctet(args.i_dynamic ? 1 : 0);
}
uint32_t Bridge::encodedSize() const
@@ -172,7 +188,8 @@ uint32_t Bridge::encodedSize() const
+ 1 // srcIsQueue
+ 1 // srcIsLocal
+ args.i_tag.size() + 1
- + args.i_excludes.size() + 1;
+ + args.i_excludes.size() + 1
+ + 1; // dynamic
}
management::ManagementObject* Bridge::GetManagementObject (void) const
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index 5ca73e4004..057bc68fe2 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -25,6 +25,7 @@
#include "qpid/framing/AMQP_ServerProxy.h"
#include "qpid/framing/ChannelHandler.h"
#include "qpid/framing/Buffer.h"
+#include "qpid/framing/FrameHandler.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h"
#include "qmf/org/apache/qpid/broker/Bridge.h"
@@ -35,6 +36,7 @@
namespace qpid {
namespace broker {
+class Connection;
class ConnectionState;
class Link;
class LinkRegistry;
@@ -68,6 +70,13 @@ public:
static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
private:
+ struct PushHandler : framing::FrameHandler {
+ PushHandler(Connection* c) { conn = c; }
+ void handle(framing::AMQFrame& frame);
+ Connection* conn;
+ };
+
+ std::auto_ptr<PushHandler> pushHandler;
std::auto_ptr<framing::ChannelHandler> channelHandler;
std::auto_ptr<framing::AMQP_ServerProxy::Session> session;
std::auto_ptr<framing::AMQP_ServerProxy> peer;
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index f80e6078de..02c8833541 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -353,7 +353,8 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args
std::pair<Bridge::shared_ptr, bool> result =
links->declare (host, port, iargs.i_durable, iargs.i_src,
iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
- iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes);
+ iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
+ iargs.i_dynamic);
if (result.second && iargs.i_durable)
store->create(*result.first);
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index 31992d0af5..8b0bebfcb2 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -91,7 +91,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
bool isQueue,
bool isLocal,
std::string& tag,
- std::string& excludes)
+ std::string& excludes,
+ bool dynamic)
{
Mutex::ScopedLock locker(lock);
stringstream keystream;
@@ -119,6 +120,7 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
args.i_srcIsLocal = isLocal;
args.i_tag = tag;
args.i_excludes = excludes;
+ args.i_dynamic = dynamic;
bridge = Bridge::shared_ptr
(new Bridge (l->second.get(), l->second->nextChannel(),
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index 242c0d58ba..5b5fe4fec9 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -84,7 +84,8 @@ namespace broker {
bool isQueue,
bool isLocal,
std::string& id,
- std::string& excludes);
+ std::string& excludes,
+ bool dynamic);
void destroy(const std::string& host, const uint16_t port);
void destroy(const std::string& host,
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index b92df89839..7e7caeeec6 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -148,6 +148,50 @@ class FederationTests(TestBase010):
mgmt.shutdown()
+ def test_push_to_exchange(self):
+ session = self.session
+
+ mgmt = Helper(self)
+ broker = mgmt.get_object("broker")
+
+ mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
+ link = mgmt.get_object("link")
+
+ mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout",
+ "key":"my-key", "tag":"", "excludes":"", "srcIsQueue":0,
+ "srcIsLocal":1})
+ bridge = mgmt.get_object("bridge")
+
+ #setup queue to receive messages from remote broker
+ r_conn = self.connect(host=remote_host(), port=remote_port())
+ r_session = r_conn.session("test_push_to_exchange")
+ r_session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ r_session.exchange_bind(queue="fed1", exchange="amq.fanout")
+ self.subscribe(session=r_session, queue="fed1", destination="f1")
+ queue = r_session.incoming("f1")
+ sleep(6)
+
+ #send messages to local broker and confirm it is routed to remote broker
+ for i in range(1, 11):
+ dp = session.delivery_properties(routing_key="my-key")
+ session.message_transfer(destination="amq.direct", message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ mgmt.call_method(bridge, "close")
+ mgmt.call_method(link, "close")
+ sleep(6)
+ self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+ self.assertEqual(len(mgmt.get_objects("link")), 0)
+
+ mgmt.shutdown()
+
def test_pull_from_queue(self):
session = self.session
diff --git a/qpid/python/commands/qpid-route b/qpid/python/commands/qpid-route
index 4dadcd543b..8077e95278 100755
--- a/qpid/python/commands/qpid-route
+++ b/qpid/python/commands/qpid-route
@@ -150,7 +150,7 @@ class RouteManager:
if _verbose:
print "Creating inter-broker binding..."
- res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, 0, 0)
+ res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, False, False)
if res.status == 4:
raise Exception("Can't create a durable route on a non-durable link")
if _verbose:
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index da53a2f4a3..649832dffa 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -245,11 +245,12 @@
<arg name="durable" dir="I" type="bool"/>
<arg name="src" dir="I" type="sstr"/>
<arg name="dest" dir="I" type="sstr"/>
- <arg name="key" dir="I" type="sstr" default=""/>
- <arg name="tag" dir="I" type="sstr" default=""/>
- <arg name="excludes" dir="I" type="sstr" default=""/>
- <arg name="srcIsQueue" dir="I" type="bool" default="0"/>
- <arg name="srcIsLocal" dir="I" type="bool" default="0"/>
+ <arg name="key" dir="I" type="sstr"/>
+ <arg name="tag" dir="I" type="sstr"/>
+ <arg name="excludes" dir="I" type="sstr"/>
+ <arg name="srcIsQueue" dir="I" type="bool"/>
+ <arg name="srcIsLocal" dir="I" type="bool"/>
+ <arg name="dynamic" dir="I" type="bool"/>
</method>
</class>
@@ -270,6 +271,7 @@
<property name="srcIsLocal" type="bool" access="RC"/>
<property name="tag" type="sstr" access="RC"/>
<property name="excludes" type="sstr" access="RC"/>
+ <property name="dynamic" type="bool" access="RC"/>
<method name="close"/>
</class>