diff options
author | Ted Ross <tross@apache.org> | 2008-10-10 19:24:40 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-10-10 19:24:40 +0000 |
commit | 5605be8d83cb6072780525f2183e637135a9004a (patch) | |
tree | 4ec6df99a1544025e25be0c14f72612025b1f96b | |
parent | 74e4bdcd985592b1bc786151ce16467dfdc7e471 (diff) | |
download | qpid-python-5605be8d83cb6072780525f2183e637135a9004a.tar.gz |
QPID-1349 - Push routing for federation (includes hook for dynamic routing)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@703561 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 75 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.h | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.h | 3 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 44 | ||||
-rwxr-xr-x | qpid/python/commands/qpid-route | 2 | ||||
-rw-r--r-- | qpid/specs/management-schema.xml | 12 |
8 files changed, 114 insertions, 38 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 93333f1410..65ed38b731 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -20,6 +20,7 @@ */ #include "Bridge.h" #include "ConnectionState.h" +#include "Connection.h" #include "LinkRegistry.h" #include "qpid/agent/ManagementAgent.h" @@ -36,6 +37,11 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { namespace broker { +void Bridge::PushHandler::handle(framing::AMQFrame& frame) +{ + conn->received(frame); +} + Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, const _qmf::ArgsLinkBridge& _args) : link(_link), id(_id), args(_args), mgmtObject(0), @@ -46,7 +52,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, mgmtObject = new _qmf::Bridge (agent, this, link, id, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, - args.i_tag, args.i_excludes); + args.i_tag, args.i_excludes, args.i_dynamic); if (!args.i_durable) agent->addObject(mgmtObject); } @@ -59,39 +65,47 @@ Bridge::~Bridge() void Bridge::create(ConnectionState& c) { - channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput()))); + if (args.i_srcIsLocal) { + // Point the bridging commands at the local connection handler + Connection* conn = dynamic_cast<Connection*>(&c); + if (conn == 0) + return; + pushHandler.reset(new PushHandler(conn)); + channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get())); + } else { + // Point the bridging commands at the remote peer broker + channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput()))); + } + session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); session->attach(name, false); session->commandPoint(0,0); - - if (args.i_srcIsLocal) { - //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received() + + if (args.i_srcIsQueue) { + peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } else { - if (args.i_srcIsQueue) { - peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - } else { - string queue = "bridge_queue_"; - queue += Uuid(true).str(); - FieldTable queueSettings; - if (args.i_tag.size()) { - queueSettings.setString("qpid.trace.id", args.i_tag); - } - if (args.i_excludes.size()) { - queueSettings.setString("qpid.trace.exclude", args.i_excludes); - } - - bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues? - bool autoDelete = !durable;//auto delete transient queues? - peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); - peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); - peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + string queue = "bridge_queue_"; + queue += Uuid(true).str(); + FieldTable queueSettings; + if (args.i_tag.size()) { + queueSettings.setString("qpid.trace.id", args.i_tag); } + if (args.i_excludes.size()) { + queueSettings.setString("qpid.trace.exclude", args.i_excludes); + } + + bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues? + bool autoDelete = !durable;//auto delete transient queues? + peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); + if (!args.i_dynamic) + peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); + peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } } @@ -140,9 +154,10 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) bool is_local(buffer.getOctet()); buffer.getShortString(id); buffer.getShortString(excludes); + bool dynamic(buffer.getOctet()); return links.declare(host, port, durable, src, dest, key, - is_queue, is_local, id, excludes).first; + is_queue, is_local, id, excludes, dynamic).first; } void Bridge::encode(Buffer& buffer) const @@ -158,6 +173,7 @@ void Bridge::encode(Buffer& buffer) const buffer.putOctet(args.i_srcIsLocal ? 1 : 0); buffer.putShortString(args.i_tag); buffer.putShortString(args.i_excludes); + buffer.putOctet(args.i_dynamic ? 1 : 0); } uint32_t Bridge::encodedSize() const @@ -172,7 +188,8 @@ uint32_t Bridge::encodedSize() const + 1 // srcIsQueue + 1 // srcIsLocal + args.i_tag.size() + 1 - + args.i_excludes.size() + 1; + + args.i_excludes.size() + 1 + + 1; // dynamic } management::ManagementObject* Bridge::GetManagementObject (void) const diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index 5ca73e4004..057bc68fe2 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -25,6 +25,7 @@ #include "qpid/framing/AMQP_ServerProxy.h" #include "qpid/framing/ChannelHandler.h" #include "qpid/framing/Buffer.h" +#include "qpid/framing/FrameHandler.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h" #include "qmf/org/apache/qpid/broker/Bridge.h" @@ -35,6 +36,7 @@ namespace qpid { namespace broker { +class Connection; class ConnectionState; class Link; class LinkRegistry; @@ -68,6 +70,13 @@ public: static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); private: + struct PushHandler : framing::FrameHandler { + PushHandler(Connection* c) { conn = c; } + void handle(framing::AMQFrame& frame); + Connection* conn; + }; + + std::auto_ptr<PushHandler> pushHandler; std::auto_ptr<framing::ChannelHandler> channelHandler; std::auto_ptr<framing::AMQP_ServerProxy::Session> session; std::auto_ptr<framing::AMQP_ServerProxy> peer; diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index f80e6078de..02c8833541 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -353,7 +353,8 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args std::pair<Bridge::shared_ptr, bool> result = links->declare (host, port, iargs.i_durable, iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, - iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes); + iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, + iargs.i_dynamic); if (result.second && iargs.i_durable) store->create(*result.first); diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index 31992d0af5..8b0bebfcb2 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -91,7 +91,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, bool isQueue, bool isLocal, std::string& tag, - std::string& excludes) + std::string& excludes, + bool dynamic) { Mutex::ScopedLock locker(lock); stringstream keystream; @@ -119,6 +120,7 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, args.i_srcIsLocal = isLocal; args.i_tag = tag; args.i_excludes = excludes; + args.i_dynamic = dynamic; bridge = Bridge::shared_ptr (new Bridge (l->second.get(), l->second->nextChannel(), diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h index 242c0d58ba..5b5fe4fec9 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.h +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h @@ -84,7 +84,8 @@ namespace broker { bool isQueue, bool isLocal, std::string& id, - std::string& excludes); + std::string& excludes, + bool dynamic); void destroy(const std::string& host, const uint16_t port); void destroy(const std::string& host, diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index b92df89839..7e7caeeec6 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -148,6 +148,50 @@ class FederationTests(TestBase010): mgmt.shutdown() + def test_push_to_exchange(self): + session = self.session + + mgmt = Helper(self) + broker = mgmt.get_object("broker") + + mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) + link = mgmt.get_object("link") + + mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", + "key":"my-key", "tag":"", "excludes":"", "srcIsQueue":0, + "srcIsLocal":1}) + bridge = mgmt.get_object("bridge") + + #setup queue to receive messages from remote broker + r_conn = self.connect(host=remote_host(), port=remote_port()) + r_session = r_conn.session("test_push_to_exchange") + r_session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + r_session.exchange_bind(queue="fed1", exchange="amq.fanout") + self.subscribe(session=r_session, queue="fed1", destination="f1") + queue = r_session.incoming("f1") + sleep(6) + + #send messages to local broker and confirm it is routed to remote broker + for i in range(1, 11): + dp = session.delivery_properties(routing_key="my-key") + session.message_transfer(destination="amq.direct", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + mgmt.call_method(bridge, "close") + mgmt.call_method(link, "close") + sleep(6) + self.assertEqual(len(mgmt.get_objects("bridge")), 0) + self.assertEqual(len(mgmt.get_objects("link")), 0) + + mgmt.shutdown() + def test_pull_from_queue(self): session = self.session diff --git a/qpid/python/commands/qpid-route b/qpid/python/commands/qpid-route index 4dadcd543b..8077e95278 100755 --- a/qpid/python/commands/qpid-route +++ b/qpid/python/commands/qpid-route @@ -150,7 +150,7 @@ class RouteManager: if _verbose: print "Creating inter-broker binding..." - res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, 0, 0) + res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, False, False) if res.status == 4: raise Exception("Can't create a durable route on a non-durable link") if _verbose: diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index da53a2f4a3..649832dffa 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -245,11 +245,12 @@ <arg name="durable" dir="I" type="bool"/> <arg name="src" dir="I" type="sstr"/> <arg name="dest" dir="I" type="sstr"/> - <arg name="key" dir="I" type="sstr" default=""/> - <arg name="tag" dir="I" type="sstr" default=""/> - <arg name="excludes" dir="I" type="sstr" default=""/> - <arg name="srcIsQueue" dir="I" type="bool" default="0"/> - <arg name="srcIsLocal" dir="I" type="bool" default="0"/> + <arg name="key" dir="I" type="sstr"/> + <arg name="tag" dir="I" type="sstr"/> + <arg name="excludes" dir="I" type="sstr"/> + <arg name="srcIsQueue" dir="I" type="bool"/> + <arg name="srcIsLocal" dir="I" type="bool"/> + <arg name="dynamic" dir="I" type="bool"/> </method> </class> @@ -270,6 +271,7 @@ <property name="srcIsLocal" type="bool" access="RC"/> <property name="tag" type="sstr" access="RC"/> <property name="excludes" type="sstr" access="RC"/> + <property name="dynamic" type="bool" access="RC"/> <method name="close"/> </class> |