diff options
author | Ted Ross <tross@apache.org> | 2008-10-24 00:45:11 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-10-24 00:45:11 +0000 |
commit | d33da71d0e2dbf23345c6bd45895eb918739c0c5 (patch) | |
tree | f820c248a9aa8bae3826811bb9fca8d970e993f5 | |
parent | 8a17f96b1254b5a827726e892edaf373d00dd0b8 (diff) | |
download | qpid-python-d33da71d0e2dbf23345c6bd45895eb918739c0c5.tar.gz |
QPID-1348 - Dynamic binding for federation. Parameterized exchange names for CPP examples
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707515 13f79535-47bb-0310-9956-ffa450edef68
20 files changed, 724 insertions, 227 deletions
diff --git a/qpid/cpp/examples/direct/declare_queues.cpp b/qpid/cpp/examples/direct/declare_queues.cpp index 0cdb472665..3289efb872 100644 --- a/qpid/cpp/examples/direct/declare_queues.cpp +++ b/qpid/cpp/examples/direct/declare_queues.cpp @@ -56,6 +56,7 @@ using std::string; int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; + string exchange(argc>3 ? argv[3] : "amq.direct"); Connection connection; try { @@ -69,7 +70,7 @@ int main(int argc, char** argv) { // routing key is "routing_key" to this newly created queue. session.queueDeclare(arg::queue="message_queue"); - session.exchangeBind(arg::exchange="amq.direct", arg::queue="message_queue", arg::bindingKey="routing_key"); + session.exchangeBind(arg::exchange=exchange, arg::queue="message_queue", arg::bindingKey="routing_key"); //----------------------------------------------------------------------------- diff --git a/qpid/cpp/examples/direct/direct_producer.cpp b/qpid/cpp/examples/direct/direct_producer.cpp index baa8d9092b..9ea3c812a6 100644 --- a/qpid/cpp/examples/direct/direct_producer.cpp +++ b/qpid/cpp/examples/direct/direct_producer.cpp @@ -65,6 +65,7 @@ int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; int count = argc>3 ? atoi(argv[3]) : 10; + string exchange(argc>4 ? argv[4] : "amq.direct"); Connection connection; Message message; try { @@ -89,14 +90,14 @@ int main(int argc, char** argv) { message.setData(message_data.str()); // Asynchronous transfer sends messages as quickly as // possible without waiting for confirmation. - // async(session).messageTransfer(arg::content=message, arg::destination="amq.direct"); - session.messageTransfer(arg::content=message, arg::destination="amq.direct"); + // async(session).messageTransfer(arg::content=message, arg::destination=exchange); + session.messageTransfer(arg::content=message, arg::destination=exchange); } // And send a final message to indicate termination. message.setData("That's all, folks!"); - session.messageTransfer(arg::content=message, arg::destination="amq.direct"); + session.messageTransfer(arg::content=message, arg::destination=exchange); //----------------------------------------------------------------------------- diff --git a/qpid/cpp/examples/fanout/fanout_producer.cpp b/qpid/cpp/examples/fanout/fanout_producer.cpp index a1ca407847..bb253d7027 100644 --- a/qpid/cpp/examples/fanout/fanout_producer.cpp +++ b/qpid/cpp/examples/fanout/fanout_producer.cpp @@ -64,6 +64,7 @@ using std::string; int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; + string exchange = argc>3 ? argv[3] : "amq.fanout"; Connection connection; Message message; try { @@ -86,13 +87,13 @@ int main(int argc, char** argv) { message.setData(message_data.str()); // Asynchronous transfer sends messages as quickly as // possible without waiting for confirmation. - async(session).messageTransfer(arg::content=message, arg::destination="amq.fanout"); + async(session).messageTransfer(arg::content=message, arg::destination=exchange); } // And send a final message to indicate termination. message.setData("That's all, folks!"); - session.messageTransfer(arg::content=message, arg::destination="amq.fanout"); + session.messageTransfer(arg::content=message, arg::destination=exchange); //----------------------------------------------------------------------------- diff --git a/qpid/cpp/examples/fanout/listener.cpp b/qpid/cpp/examples/fanout/listener.cpp index b29c82d3d9..2938125f4b 100644 --- a/qpid/cpp/examples/fanout/listener.cpp +++ b/qpid/cpp/examples/fanout/listener.cpp @@ -60,6 +60,7 @@ void Listener::received(Message& message) { int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; + string exchange = argc>3 ? argv[3] : "amq.fanout"; Connection connection; Message msg; try { @@ -82,7 +83,7 @@ int main(int argc, char** argv) { session.queueDeclare(arg::queue=myQueue, arg::exclusive=true, arg::autoDelete=true); - session.exchangeBind(arg::exchange="amq.fanout", arg::queue=myQueue, arg::bindingKey="my-key"); + session.exchangeBind(arg::exchange=exchange, arg::queue=myQueue, arg::bindingKey="my-key"); // Create a listener and subscribe it to my queue. SubscriptionManager subscriptions(session); diff --git a/qpid/cpp/examples/pub-sub/topic_listener.cpp b/qpid/cpp/examples/pub-sub/topic_listener.cpp index 9996abab19..af70cc2672 100644 --- a/qpid/cpp/examples/pub-sub/topic_listener.cpp +++ b/qpid/cpp/examples/pub-sub/topic_listener.cpp @@ -61,7 +61,7 @@ class Listener : public MessageListener { SubscriptionManager subscriptions; public: Listener(Session& session); - virtual void prepareQueue(std::string queue, std::string routing_key); + virtual void prepareQueue(std::string queue, std::string exchange, std::string routing_key); virtual void received(Message& message); virtual void listen(); ~Listener() { }; @@ -84,7 +84,7 @@ Listener::Listener(Session& session) : } -void Listener::prepareQueue(std::string queue, std::string routing_key) { +void Listener::prepareQueue(std::string queue, std::string exchange, std::string routing_key) { /* Create a unique queue name for this consumer by concatenating * the queue name parameter with the Session ID. @@ -106,8 +106,8 @@ void Listener::prepareQueue(std::string queue, std::string routing_key) { * "control" routing key, when it is finished. */ - session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey=routing_key); - session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey="control"); + session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=routing_key); + session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey="control"); /* * subscribe to the queue using the subscription manager. @@ -134,6 +134,7 @@ void Listener::listen() { int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; + std::string exchange = argc>3 ? argv[3] : "amq.topic"; Connection connection; try { connection.open(host, port); @@ -147,12 +148,12 @@ int main(int argc, char** argv) { // Subscribe to messages on the queues we are interested in - listener.prepareQueue("usa", "usa.#"); - listener.prepareQueue("europe", "europe.#"); - listener.prepareQueue("news", "#.news"); - listener.prepareQueue("weather", "#.weather"); + listener.prepareQueue("usa", exchange, "usa.#"); + listener.prepareQueue("europe", exchange, "europe.#"); + listener.prepareQueue("news", exchange, "#.news"); + listener.prepareQueue("weather", exchange, "#.weather"); - std::cout << "Listening for messages ..." << std::endl; + std::cout << "Listening for messages ..." << std::endl; // Give up control and receive messages listener.listen(); diff --git a/qpid/cpp/examples/pub-sub/topic_publisher.cpp b/qpid/cpp/examples/pub-sub/topic_publisher.cpp index ab485fec8f..d11e807259 100644 --- a/qpid/cpp/examples/pub-sub/topic_publisher.cpp +++ b/qpid/cpp/examples/pub-sub/topic_publisher.cpp @@ -60,7 +60,7 @@ using namespace qpid::framing; using std::stringstream; using std::string; -void publish_messages(Session& session, string routing_key) +void publish_messages(Session& session, string exchange, string routing_key) { Message message; @@ -75,7 +75,7 @@ void publish_messages(Session& session, string routing_key) message.setData(message_data.str()); // Asynchronous transfer sends messages as quickly as // possible without waiting for confirmation. - async(session).messageTransfer(arg::content=message, arg::destination="amq.topic"); + async(session).messageTransfer(arg::content=message, arg::destination=exchange); } } @@ -88,18 +88,19 @@ void publish_messages(Session& session, string routing_key) * */ -void no_more_messages(Session& session) +void no_more_messages(Session& session, string exchange) { Message message; message.getDeliveryProperties().setRoutingKey("control"); message.setData("That's all, folks!"); - session.messageTransfer(arg::content=message, arg::destination="amq.topic"); + session.messageTransfer(arg::content=message, arg::destination=exchange); } int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; + std::string exchange = argc>3 ? argv[3] : "amq.topic"; Connection connection; Message message; try { @@ -108,12 +109,12 @@ int main(int argc, char** argv) { //--------- Main body of program -------------------------------------------- - publish_messages(session, "usa.news"); - publish_messages(session, "usa.weather"); - publish_messages(session, "europe.news"); - publish_messages(session, "europe.weather"); + publish_messages(session, exchange, "usa.news"); + publish_messages(session, exchange, "usa.weather"); + publish_messages(session, exchange, "europe.news"); + publish_messages(session, exchange, "europe.weather"); - no_more_messages(session); + no_more_messages(session, exchange); //----------------------------------------------------------------------------- diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 5064320efb..cc76cf7f21 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -34,6 +34,18 @@ using qpid::framing::Buffer; using qpid::management::ManagementAgent; namespace _qmf = qmf::org::apache::qpid::broker; +namespace +{ +const std::string qpidFedOp("qpid.fed.op"); +const std::string qpidFedTags("qpid.fed.tags"); +const std::string qpidFedOrigin("qpid.fed.origin"); + +const std::string fedOpBind("B"); +const std::string fedOpUnbind("U"); +const std::string fedOpReorigin("R"); +const std::string fedOpHello("H"); +} + namespace qpid { namespace broker { @@ -45,8 +57,9 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, const _qmf::ArgsLinkBridge& _args) : link(_link), id(_id), args(_args), mgmtObject(0), - listener(l), name(Uuid(true).str()), persistenceId(0) + listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0) { + queueName += name; ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0) { mgmtObject = new _qmf::Bridge @@ -65,7 +78,10 @@ Bridge::~Bridge() void Bridge::create(ConnectionState& c) { + connState = &c; if (args.i_srcIsLocal) { + if (args.i_dynamic) + throw Exception("Dynamic routing not supported for push routes"); // Point the bridging commands at the local connection handler Connection* conn = dynamic_cast<Connection*>(&c); if (conn == 0) @@ -74,7 +90,7 @@ void Bridge::create(ConnectionState& c) channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get())); } else { // Point the bridging commands at the remote peer broker - channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput()))); + channelHandler.reset(new framing::ChannelHandler(id, &(connState->getOutput()))); } session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); @@ -88,8 +104,6 @@ void Bridge::create(ConnectionState& c) peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } else { - string queue = "bridge_queue_"; - queue += Uuid(true).str(); FieldTable queueSettings; if (args.i_tag.size()) { @@ -103,19 +117,26 @@ void Bridge::create(ConnectionState& c) if (args.i_excludes.size()) { queueSettings.setString("qpid.trace.exclude", args.i_excludes); } else { - const string& peerTag = c.getFederationPeerTag(); + const string& peerTag = connState->getFederationPeerTag(); if (peerTag.size()) queueSettings.setString("qpid.trace.exclude", peerTag); } bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues? bool autoDelete = !durable;//auto delete transient queues? - peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); + peer->getQueue().declare(queueName, "", false, durable, true, autoDelete, queueSettings); if (!args.i_dynamic) - peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); - peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable()); + peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + + if (args.i_dynamic) { + Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); + if (exchange.get() == 0) + throw Exception("Exchange not found for dynamic route"); + exchange->registerDynamicBridge(this); + } } } @@ -123,6 +144,11 @@ void Bridge::cancel() { peer->getMessage().cancel(args.i_dest); peer->getSession().detach(name); + if (args.i_dynamic) { + Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); + if (exchange.get() != 0) + exchange->removeDynamicBridge(this); + } } void Bridge::destroy() @@ -220,4 +246,46 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, } } +void Bridge::propagateBinding(const string& key, const string& tagList, + const string& op, const string& origin) +{ + const string& localTag = link->getBroker()->getFederationTag(); + const string& peerTag = connState->getFederationPeerTag(); + + if (tagList.find(peerTag) == tagList.npos) { + FieldTable bindArgs; + string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag); + + bindArgs.setString(qpidFedOp, op); + bindArgs.setString(qpidFedTags, newTagList); + if (origin.empty()) + bindArgs.setString(qpidFedOrigin, localTag); + else + bindArgs.setString(qpidFedOrigin, origin); + + peer->getExchange().bind(queueName, args.i_src, key, bindArgs); + } +} + +void Bridge::sendReorigin() +{ + FieldTable bindArgs; + + bindArgs.setString(qpidFedOp, fedOpReorigin); + bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag()); + + peer->getExchange().bind(queueName, args.i_src, args.i_key, bindArgs); +} + +bool Bridge::containsLocalTag(const string& tagList) const +{ + const string& localTag = link->getBroker()->getFederationTag(); + return (tagList.find(localTag) != tagList.npos); +} + +const string& Bridge::getLocalTag() const +{ + return link->getBroker()->getFederationTag(); +} + }} diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index 057bc68fe2..c530a5d696 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -27,6 +27,7 @@ #include "qpid/framing/Buffer.h" #include "qpid/framing/FrameHandler.h" #include "qpid/management/Manageable.h" +#include "Exchange.h" #include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h" #include "qmf/org/apache/qpid/broker/Bridge.h" @@ -41,7 +42,7 @@ class ConnectionState; class Link; class LinkRegistry; -class Bridge : public PersistableConfig, public management::Manageable +class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge { public: typedef boost::shared_ptr<Bridge> shared_ptr; @@ -69,6 +70,12 @@ public: const std::string& getName() const; static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); + // Exchange::DynamicBridge methods + void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin); + void sendReorigin(); + bool containsLocalTag(const std::string& tagList) const; + const std::string& getLocalTag() const; + private: struct PushHandler : framing::FrameHandler { PushHandler(Connection* c) { conn = c; } @@ -87,7 +94,9 @@ private: qmf::org::apache::qpid::broker::Bridge* mgmtObject; CancellationListener listener; std::string name; + std::string queueName; mutable uint64_t persistenceId; + ConnectionState* connState; }; diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index 8fc8260f9e..9976167fa9 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -28,25 +28,45 @@ using namespace qpid::sys; using qpid::management::Manageable; namespace _qmf = qmf::org::apache::qpid::broker; +namespace +{ +const std::string qpidFedOp("qpid.fed.op"); +const std::string qpidFedTags("qpid.fed.tags"); +const std::string qpidFedOrigin("qpid.fed.origin"); + +const std::string fedOpBind("B"); +const std::string fedOpUnbind("U"); +const std::string fedOpReorigin("R"); +const std::string fedOpHello("H"); +} + DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) { if (mgmtExchange != 0) - mgmtExchange->set_type (typeName); + mgmtExchange->set_type(typeName); } -DirectExchange::DirectExchange(const std::string& _name, bool _durable, +DirectExchange::DirectExchange(const string& _name, bool _durable, const FieldTable& _args, Manageable* _parent) : Exchange(_name, _durable, _args, _parent) { if (mgmtExchange != 0) - mgmtExchange->set_type (typeName); + mgmtExchange->set_type(typeName); } -bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){ - { +bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args) +{ + string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind); + string fedTags(args ? args->getAsString(qpidFedTags) : ""); + string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); + bool propagate = false; + + if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { Mutex::ScopedLock l(lock); - Binding::shared_ptr b(new Binding (routingKey, queue, this)); - if (bindings[routingKey].add_unless(b, MatchQueue(queue))) { + Binding::shared_ptr b(new Binding(routingKey, queue, this, FieldTable(), fedOrigin)); + BoundKey& bk = bindings[routingKey]; + if (bk.queues.add_unless(b, MatchQueue(queue))) { + propagate = bk.fedBinding.addOrigin(fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); @@ -54,30 +74,58 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con } else { return false; } + } else if (fedOp == fedOpUnbind) { + Mutex::ScopedLock l(lock); + BoundKey& bk = bindings[routingKey]; + propagate = bk.fedBinding.delOrigin(fedOrigin); + if (bk.fedBinding.count() == 0) + unbind(queue, routingKey, 0); + } else if (fedOp == fedOpReorigin) { + for (std::map<string, BoundKey>::iterator iter = bindings.begin(); + iter != bindings.end(); iter++) { + const BoundKey& bk = iter->second; + if (bk.fedBinding.hasLocal()) { + propagateFedOp(iter->first, string(), fedOpBind, string()); + } + } } + routeIVE(); + if (propagate) + propagateFedOp(routingKey, fedTags, fedOp, fedOrigin); return true; } -bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ - Mutex::ScopedLock l(lock); - if (bindings[routingKey].remove_if(MatchQueue(queue))) { - if (mgmtExchange != 0) { - mgmtExchange->dec_bindingCount(); - ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount(); +bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/) +{ + bool propagate = false; + + { + Mutex::ScopedLock l(lock); + BoundKey& bk = bindings[routingKey]; + if (bk.queues.remove_if(MatchQueue(queue))) { + propagate = bk.fedBinding.delOrigin(); + if (mgmtExchange != 0) { + mgmtExchange->dec_bindingCount(); + ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount(); + } + } else { + return false; } - return true; - } else { - return false; } + + if (propagate) + propagateFedOp(routingKey, string(), fedOpUnbind, string()); + return true; } -void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ +void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) +{ PreRoute pr(msg, this); Queues::ConstPtr p; { Mutex::ScopedLock l(lock); - p = bindings[routingKey].snapshot(); + p = bindings[routingKey].queues.snapshot(); } int count(0); @@ -85,26 +133,26 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) { msg.deliverTo((*i)->queue); if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); + (*i)->mgmtBinding->inc_msgMatched(); } } if(!count){ QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey); if (mgmtExchange != 0) { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); + mgmtExchange->inc_msgDrops(); + mgmtExchange->inc_byteDrops(msg.contentSize()); } } else { if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + mgmtExchange->inc_msgRoutes(count); + mgmtExchange->inc_byteRoutes(count * msg.contentSize()); } } if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); + mgmtExchange->inc_msgReceives(); + mgmtExchange->inc_byteReceives(msg.contentSize()); } } @@ -120,14 +168,14 @@ bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routin if (!queue) return true; - Queues::ConstPtr p = i->second.snapshot(); + Queues::ConstPtr p = i->second.queues.snapshot(); return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end(); } else if (!queue) { //if no queue or routing key is specified, just report whether any bindings exist return bindings.size() > 0; } else { for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { - Queues::ConstPtr p = i->second.snapshot(); + Queues::ConstPtr p = i->second.queues.snapshot(); if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true; } return false; diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.h b/qpid/cpp/src/qpid/broker/DirectExchange.h index 2516ce4a13..ba60469df8 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.h +++ b/qpid/cpp/src/qpid/broker/DirectExchange.h @@ -31,33 +31,35 @@ namespace qpid { namespace broker { - class DirectExchange : public virtual Exchange{ - typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Queues; - typedef std::map<string, Queues> Bindings; - Bindings bindings; - qpid::sys::Mutex lock; - - public: - static const std::string typeName; - - DirectExchange(const std::string& name, management::Manageable* parent = 0); - DirectExchange(const string& _name, bool _durable, - const qpid::framing::FieldTable& _args, management::Manageable* parent = 0); +class DirectExchange : public virtual Exchange { + typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Queues; + struct BoundKey { + Queues queues; + FedBinding fedBinding; + }; + typedef std::map<string, BoundKey> Bindings; + Bindings bindings; + qpid::sys::Mutex lock; - virtual std::string getType() const { return typeName; } +public: + static const std::string typeName; - virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - - virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + DirectExchange(const std::string& name, management::Manageable* parent = 0); + DirectExchange(const string& _name, bool _durable, + const qpid::framing::FieldTable& _args, management::Manageable* parent = 0); - virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual std::string getType() const { return typeName; } + + virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args); - virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args); + virtual ~DirectExchange(); - virtual ~DirectExchange(); - }; -} -} + virtual bool supportsDynamicBinding() { return true; } +}; +}} #endif diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 3cea904676..f437946194 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -40,6 +40,14 @@ namespace { const std::string qpidMsgSequence("qpid.msg_sequence"); const std::string qpidIVE("qpid.ive"); +const std::string qpidFedOp("qpid.fed.op"); +const std::string qpidFedTags("qpid.fed.tags"); +const std::string qpidFedOrigin("qpid.fed.origin"); + +const std::string fedOpBind("B"); +const std::string fedOpUnbind("U"); +const std::string fedOpReorigin("R"); +const std::string fedOpHello("H"); } @@ -73,7 +81,7 @@ void Exchange::routeIVE(){ Exchange::Exchange (const string& _name, Manageable* parent) : name(_name), durable(false), persistenceId(0), sequence(false), - sequenceNo(0), ive(false), mgmtExchange(0) + sequenceNo(0), ive(false), mgmtExchange(0) { if (parent != 0) { @@ -89,7 +97,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent) : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), - sequence(false), sequenceNo(0), ive(false), mgmtExchange(0) + sequence(false), sequenceNo(0), ive(false), mgmtExchange(0) { if (parent != 0) { @@ -107,7 +115,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel } } } - + sequence = _args.get(qpidMsgSequence); if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing"); @@ -166,8 +174,46 @@ ManagementObject* Exchange::GetManagementObject (void) const return (ManagementObject*) mgmtExchange; } +void Exchange::registerDynamicBridge(DynamicBridge* db) +{ + if (!supportsDynamicBinding()) + throw Exception("Exchange type does not support dynamic binding"); + + for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); + iter != bridgeVector.end(); iter++) + (*iter)->sendReorigin(); + + bridgeVector.push_back(db); + FieldTable args; + args.setString(qpidFedOp, fedOpReorigin); + bind(Queue::shared_ptr(), string(), &args); +} + +void Exchange::removeDynamicBridge(DynamicBridge* db) +{ + for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); + iter != bridgeVector.end(); iter++) + if (*iter == db) { + bridgeVector.erase(iter); + break; + } +} + +void Exchange::handleHelloRequest() +{ +} + +void Exchange::propagateFedOp(const string& routingKey, const string& tags, const string& op, const string& origin) +{ + string myOp(op.empty() ? fedOpBind : op); + + for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); + iter != bridgeVector.end(); iter++) + (*iter)->propagateBinding(routingKey, tags, op, origin); +} + Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent, - FieldTable _args) + FieldTable _args, const string& origin) : queue(_queue), key(_key), args(_args), mgmtBinding(0) { if (parent != 0) @@ -181,6 +227,8 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang management::ObjectId queueId = mo->getObjectId(); mgmtBinding = new _qmf::Binding (agent, this, (Manageable*) parent, queueId, key, args); + if (!origin.empty()) + mgmtBinding->set_origin(origin); agent->addObject (mgmtBinding); } } diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index 1963ba1134..d1b38ea8b6 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -74,7 +74,7 @@ protected: qmf::org::apache::qpid::broker::Binding* mgmtBinding; Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0, - framing::FieldTable args = framing::FieldTable()); + framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string()); ~Binding(); management::ManagementObject* GetManagementObject() const; }; @@ -85,6 +85,34 @@ protected: bool operator()(Exchange::Binding::shared_ptr b); }; + class FedBinding { + uint32_t localBindings; + std::set<std::string> originSet; + public: + FedBinding() : localBindings(0) {} + bool hasLocal() const { return localBindings != 0; } + bool addOrigin(const std::string& origin) { + if (origin.empty()) { + localBindings++; + return localBindings == 1; + } + originSet.insert(origin); + return true; + } + bool delOrigin(const std::string& origin) { + originSet.erase(origin); + return true; + } + bool delOrigin() { + if (localBindings > 0) + localBindings--; + return localBindings == 0; + } + uint32_t count() { + return localBindings + originSet.size(); + } + }; + qmf::org::apache::qpid::broker::Exchange* mgmtExchange; public: @@ -121,6 +149,27 @@ public: // Manageable entry points management::ManagementObject* GetManagementObject(void) const; + + // Federation hooks + class DynamicBridge { + public: + virtual ~DynamicBridge() {} + virtual void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin) = 0; + virtual void sendReorigin() = 0; + virtual bool containsLocalTag(const std::string& tagList) const = 0; + virtual const std::string& getLocalTag() const = 0; + }; + + void registerDynamicBridge(DynamicBridge* db); + void removeDynamicBridge(DynamicBridge* db); + virtual bool supportsDynamicBinding() { return false; } + +protected: + std::vector<DynamicBridge*> bridgeVector; + + virtual void handleHelloRequest(); + void propagateFedOp(const std::string& routingKey, const std::string& tags, + const std::string& op, const std::string& origin); }; }} diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index 2628d8952f..aa1f7ff30a 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -26,6 +26,18 @@ using namespace qpid::framing; using namespace qpid::sys; namespace _qmf = qmf::org::apache::qpid::broker; +namespace +{ +const std::string qpidFedOp("qpid.fed.op"); +const std::string qpidFedTags("qpid.fed.tags"); +const std::string qpidFedOrigin("qpid.fed.origin"); + +const std::string fedOpBind("B"); +const std::string fedOpUnbind("U"); +const std::string fedOpReorigin("R"); +const std::string fedOpHello("H"); +} + FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) : Exchange(_name, _parent) { @@ -41,32 +53,57 @@ FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, mgmtExchange->set_type (typeName); } -bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/) +bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args) { - Binding::shared_ptr binding (new Binding ("", queue, this)); - if (bindings.add_unless(binding, MatchQueue(queue))) { - if (mgmtExchange != 0) { - mgmtExchange->inc_bindingCount(); - ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); + string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind); + string fedTags(args ? args->getAsString(qpidFedTags) : ""); + string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); + bool propagate = false; + + if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { + Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin)); + if (bindings.add_unless(binding, MatchQueue(queue))) { + propagate = fedBinding.addOrigin(fedOrigin); + if (mgmtExchange != 0) { + mgmtExchange->inc_bindingCount(); + ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); + } + } else { + return false; + } + } else if (fedOp == fedOpUnbind) { + propagate = fedBinding.delOrigin(fedOrigin); + if (fedBinding.count() == 0) + unbind(queue, "", 0); + } else if (fedOp == fedOpReorigin) { + if (fedBinding.hasLocal()) { + propagateFedOp(string(), string(), fedOpBind, string()); } - routeIVE(); - return true; - } else { - return false; } + + routeIVE(); + if (propagate) + propagateFedOp(string(), fedTags, fedOp, fedOrigin); + return true; } bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/) { + bool propagate = false; + if (bindings.remove_if(MatchQueue(queue))) { + propagate = fedBinding.delOrigin(); if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount(); } - return true; } else { return false; } + + if (propagate) + propagateFedOp(string(), string(), fedOpUnbind, string()); + return true; } void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.h b/qpid/cpp/src/qpid/broker/FanOutExchange.h index cfe9875024..5884a19732 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.h +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.h @@ -34,6 +34,7 @@ namespace broker { class FanOutExchange : public virtual Exchange { typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> BindingsArray; BindingsArray bindings; + FedBinding fedBinding; public: static const std::string typeName; @@ -53,6 +54,7 @@ class FanOutExchange : public virtual Exchange { virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args); virtual ~FanOutExchange(); + virtual bool supportsDynamicBinding() { return true; } }; } diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 7a06d7e0b9..a814c12eed 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -336,7 +336,7 @@ ManagementObject* Link::GetManagementObject (void) const return (ManagementObject*) mgmtObject; } -Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args, string&) +Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args, string& text) { switch (op) { @@ -350,8 +350,22 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args; // Durable bridges are only valid on durable links - if (iargs.i_durable && !durable) - return Manageable::STATUS_INVALID_PARAMETER; + if (iargs.i_durable && !durable) { + text = "Can't create a durable route on a non-durable link"; + return Manageable::STATUS_USER; + } + + if (iargs.i_dynamic) { + Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src); + if (exchange.get() == 0) { + text = "Exchange not found"; + return Manageable::STATUS_USER; + } + if (!exchange->supportsDynamicBinding()) { + text = "Exchange type does not support dynamic routing"; + return Manageable::STATUS_USER; + } + } std::pair<Bridge::shared_ptr, bool> result = links->declare (host, port, iargs.i_durable, iargs.i_src, diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index 691b42a1ae..853c131571 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -31,6 +31,18 @@ namespace _qmf = qmf::org::apache::qpid::broker; // - excessive string copying: should be 0 copy, match from original buffer. // - match/lookup: use descision tree or other more efficient structure. +namespace +{ +const std::string qpidFedOp("qpid.fed.op"); +const std::string qpidFedTags("qpid.fed.tags"); +const std::string qpidFedOrigin("qpid.fed.origin"); + +const std::string fedOpBind("B"); +const std::string fedOpUnbind("U"); +const std::string fedOpReorigin("R"); +const std::string fedOpHello("H"); +} + Tokens& Tokens::operator=(const std::string& s) { clear(); if (s.empty()) return *this; @@ -51,6 +63,15 @@ TopicPattern& TopicPattern::operator=(const Tokens& tokens) { return *this; } +void Tokens::key(string& keytext) const +{ + for (std::vector<string>::const_iterator iter = begin(); iter != end(); iter++) { + if (iter != begin()) + keytext += "."; + keytext += *iter; + } +} + namespace { const std::string hashmark("#"); const std::string star("*"); @@ -81,7 +102,7 @@ void TopicPattern::normalize() { namespace { -// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string. +// TODO aconway 2006-09-20: Inefficient to convert every routingKey to a string. // Need StringRef class that operates on a string in place witout copy. // Should be applied everywhere strings are extracted from frames. // @@ -130,30 +151,63 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable, mgmtExchange->set_type (typeName); } -bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ - { +bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args) +{ + string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind); + string fedTags(args ? args->getAsString(qpidFedTags) : ""); + string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); + bool propagate = false; + bool reallyUnbind; + TopicPattern routingPattern(routingKey); + + if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { RWlock::ScopedWlock l(lock); - TopicPattern routingPattern(routingKey); if (isBound(queue, routingPattern)) { return false; } else { - Binding::shared_ptr binding (new Binding (routingKey, queue, this)); - bindings[routingPattern].push_back(binding); + Binding::shared_ptr binding (new Binding (routingKey, queue, this, FieldTable(), fedOrigin)); + BoundKey& bk = bindings[routingPattern]; + bk.bindingVector.push_back(binding); + propagate = bk.fedBinding.addOrigin(fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); } } + } else if (fedOp == fedOpUnbind) { + { + RWlock::ScopedWlock l(lock); + BoundKey& bk = bindings[routingPattern]; + propagate = bk.fedBinding.delOrigin(fedOrigin); + reallyUnbind = bk.fedBinding.count() == 0; + } + if (reallyUnbind) + unbind(queue, routingKey, 0); + } else if (fedOp == fedOpReorigin) { + for (std::map<TopicPattern, BoundKey>::iterator iter = bindings.begin(); + iter != bindings.end(); iter++) { + const BoundKey& bk = iter->second; + if (bk.fedBinding.hasLocal()) { + string propKey; + iter->first.key(propKey); + propagateFedOp(propKey, string(), fedOpBind, string()); + } + } } + routeIVE(); + if (propagate) + propagateFedOp(routingKey, fedTags, fedOp, fedOrigin); return true; } bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedWlock l(lock); BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); - Binding::vector& qv(bi->second); if (bi == bindings.end()) return false; + BoundKey& bk = bi->second; + Binding::vector& qv(bk.bindingVector); + bool propagate = false; Binding::vector::iterator q; for (q = qv.begin(); q != qv.end(); q++) @@ -161,11 +215,15 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, co break; if(q == qv.end()) return false; qv.erase(q); + propagate = bk.fedBinding.delOrigin(); if(qv.empty()) bindings.erase(bi); if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount(); } + + if (propagate) + propagateFedOp(routingKey, string(), fedOpUnbind, string()); return true; } @@ -173,7 +231,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern) { BindingMap::iterator bi = bindings.find(pattern); if (bi == bindings.end()) return false; - Binding::vector& qv(bi->second); + Binding::vector& qv(bi->second.bindingVector); Binding::vector::iterator q; for (q = qv.begin(); q != qv.end(); q++) if ((*q)->queue == queue) @@ -189,7 +247,7 @@ void TopicExchange::route(Deliverable& msg, const string& routingKey, const Fiel for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (i->first.match(tokens)) { - Binding::vector& qv(i->second); + Binding::vector& qv(i->second.bindingVector); for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){ msg.deliverTo((*j)->queue); if ((*j)->mgmtBinding != 0) @@ -230,7 +288,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing } } else { for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - Binding::vector& qv(i->second); + Binding::vector& qv(i->second.bindingVector); Binding::vector::iterator q; for (q = qv.begin(); q != qv.end(); q++) if ((*q)->queue == queue) diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.h b/qpid/cpp/src/qpid/broker/TopicExchange.h index 2e107142b7..f3a2e221f7 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.h +++ b/qpid/cpp/src/qpid/broker/TopicExchange.h @@ -41,6 +41,7 @@ class Tokens : public std::vector<std::string> { Tokens(const std::string& s) { operator=(s); } /** Tokenizing assignment operator s */ Tokens & operator=(const std::string& s); + void key(std::string& key) const; private: size_t hash; @@ -70,8 +71,12 @@ class TopicPattern : public Tokens void normalize(); }; -class TopicExchange : public virtual Exchange{ - typedef std::map<TopicPattern, Binding::vector> BindingMap; +class TopicExchange : public virtual Exchange { + struct BoundKey { + Binding::vector bindingVector; + FedBinding fedBinding; + }; + typedef std::map<TopicPattern, BoundKey> BindingMap; BindingMap bindings; qpid::sys::RWlock lock; @@ -94,6 +99,7 @@ class TopicExchange : public virtual Exchange{ virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args); virtual ~TopicExchange(); + virtual bool supportsDynamicBinding() { return true; } }; diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index d9bafd9d88..92162fd98d 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -20,7 +20,6 @@ import sys from qpid.testlib import TestBase010, testrunner -from qpid.management import managementChannel, managementClient from qpid.datatypes import Message from qpid.queue import Empty from time import sleep @@ -54,68 +53,45 @@ def remote_host(): def remote_port(): return int(scan_args("--remote-port")) -class Helper: - def __init__(self, parent): - self.parent = parent - self.session = parent.conn.session("Helper") - self.mc = managementClient(self.session.spec) - self.mch = self.mc.addChannel(self.session) - self.mc.syncWaitForStable(self.mch) - - def shutdown (self): - self.mc.removeChannel (self.mch) +class FederationTests(TestBase010): - def get_objects(self, type): - return self.mc.syncGetObjects(self.mch, type) + def test_bridge_create_and_close(self): + self.startQmf(); + qmf = self.qmf - def get_object(self, type, position = 1, expected = None): - objects = self.get_objects(type) - if not expected: expected = position - self.assertEqual(len(objects), expected) - return objects[(position - 1)] + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) - - def call_method(self, object, method, args=None): - res = self.mc.syncCallMethod(self.mch, object.id, object.classKey, method, args) - self.assertEqual(res.status, 0) - self.assertEqual(res.statusText, "OK") - return res - - def assertEqual(self, a, b): - self.parent.assertEqual(a, b) - -class FederationTests(TestBase010): + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False) + self.assertEqual(result.status, 0) - def test_bridge_create_and_close(self): - mgmt = Helper(self) - broker = mgmt.get_object("broker") - - mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) - link = mgmt.get_object("link") - - mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.direct", "key":"my-key"}) - bridge = mgmt.get_object("bridge") - - mgmt.call_method(bridge, "close") - mgmt.call_method(link, "close") + bridge = qmf.getObjects(_class="bridge")[0] + result = bridge.close() + self.assertEqual(result.status, 0) - sleep(6) - self.assertEqual(len(mgmt.get_objects("bridge")), 0) - self.assertEqual(len(mgmt.get_objects("link")), 0) + result = link.close() + self.assertEqual(result.status, 0) - mgmt.shutdown () + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) def test_pull_from_exchange(self): session = self.session - mgmt = Helper(self) - broker = mgmt.get_object("broker") + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) - mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) - link = mgmt.get_object("link") - - mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key"}) - bridge = mgmt.get_object("bridge") + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] #setup queue to receive messages from local broker session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) @@ -140,27 +116,29 @@ class FederationTests(TestBase010): self.fail("Got unexpected message in queue: " + extra.body) except Empty: None - mgmt.call_method(bridge, "close") - mgmt.call_method(link, "close") - sleep(6) - self.assertEqual(len(mgmt.get_objects("bridge")), 0) - self.assertEqual(len(mgmt.get_objects("link")), 0) + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) - mgmt.shutdown() + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) def test_push_to_exchange(self): session = self.session - mgmt = Helper(self) - broker = mgmt.get_object("broker") + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) - mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) - link = mgmt.get_object("link") - - mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", - "key":"my-key", "tag":"", "excludes":"", "srcIsQueue":0, - "srcIsLocal":1}) - bridge = mgmt.get_object("bridge") + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] #setup queue to receive messages from remote broker r_conn = self.connect(host=remote_host(), port=remote_port()) @@ -184,13 +162,14 @@ class FederationTests(TestBase010): self.fail("Got unexpected message in queue: " + extra.body) except Empty: None - mgmt.call_method(bridge, "close") - mgmt.call_method(link, "close") - sleep(6) - self.assertEqual(len(mgmt.get_objects("bridge")), 0) - self.assertEqual(len(mgmt.get_objects("link")), 0) + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) - mgmt.shutdown() + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) def test_pull_from_queue(self): session = self.session @@ -209,16 +188,18 @@ class FederationTests(TestBase010): self.subscribe(queue="fed1", destination="f1") queue = session.incoming("f1") - mgmt = Helper(self) - broker = mgmt.get_object("broker") + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) - mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) - link = mgmt.get_object("link") + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False) + self.assertEqual(result.status, 0) - mgmt.call_method(link, "bridge", {"durable":0, "src":"my-bridge-queue", "dest":"amq.fanout", - "key":"", "tag":"", "excludes":"", "srcIsQueue":1}) - sleep(6) - bridge = mgmt.get_object("bridge") + bridge = qmf.getObjects(_class="bridge")[0] + sleep(3) #add some more messages (i.e. after bridge was created) for i in range(6, 11): @@ -236,14 +217,14 @@ class FederationTests(TestBase010): self.fail("Got unexpected message in queue: " + extra.body) except Empty: None + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) - mgmt.call_method(bridge, "close") - mgmt.call_method(link, "close") - sleep(6) - self.assertEqual(len(mgmt.get_objects("bridge")), 0) - self.assertEqual(len(mgmt.get_objects("link")), 0) - - mgmt.shutdown () + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) def test_tracing_automatic(self): remoteUrl = "%s:%d" % (remote_host(), remote_port()) @@ -307,22 +288,24 @@ class FederationTests(TestBase010): def test_tracing(self): session = self.session - mgmt = Helper(self) - broker = mgmt.get_object("broker") + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) - mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) - link = mgmt.get_object("link") - - mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key", - "tag":"my-bridge-id", "excludes":"exclude-me,also-exclude-me"}) - sleep(6) - bridge = mgmt.get_object("bridge") + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "my-bridge-id", + "exclude-me,also-exclude-me", False, False, False) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] #setup queue to receive messages from local broker session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) session.exchange_bind(queue="fed1", exchange="amq.fanout") self.subscribe(queue="fed1", destination="f1") queue = session.incoming("f1") + sleep(6) #send messages to remote broker and confirm it is routed to local broker r_conn = self.connect(host=remote_host(), port=remote_port()) @@ -347,13 +330,155 @@ class FederationTests(TestBase010): self.fail("Got unexpected message in queue: " + extra.body) except Empty: None - mgmt.call_method(bridge, "close") - mgmt.call_method(link, "close") - sleep(6) - self.assertEqual(len(mgmt.get_objects("bridge")), 0) - self.assertEqual(len(mgmt.get_objects("link")), 0) + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) + + def test_dynamic_fanout(self): + session = self.session + r_conn = self.connect(host=remote_host(), port=remote_port()) + r_session = r_conn.session("test_dynamic_fanout") + + session.exchange_declare(exchange="fed.fanout", type="fanout") + r_session.exchange_declare(exchange="fed.fanout", type="fanout") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.fanout") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties() + r_session.message_transfer(destination="fed.fanout", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) + + + def test_dynamic_direct(self): + session = self.session + r_conn = self.connect(host=remote_host(), port=remote_port()) + r_session = r_conn.session("test_dynamic_direct") + + session.exchange_declare(exchange="fed.direct", type="direct") + r_session.exchange_declare(exchange="fed.direct", type="direct") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.direct", binding_key="fd-key") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="fd-key") + r_session.message_transfer(destination="fed.direct", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) + + + def test_dynamic_topic(self): + session = self.session + r_conn = self.connect(host=remote_host(), port=remote_port()) + r_session = r_conn.session("test_dynamic_topic") + + session.exchange_declare(exchange="fed.topic", type="topic") + r_session.exchange_declare(exchange="fed.topic", type="topic") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.topic", binding_key="ft-key.#") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="ft-key.one.two") + r_session.message_transfer(destination="fed.topic", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) - mgmt.shutdown () def getProperty(self, msg, name): for h in msg.headers: @@ -364,7 +489,8 @@ class FederationTests(TestBase010): headers = self.getProperty(msg, "application_headers") if headers: return headers[name] - return None + return None + if __name__ == '__main__': args = sys.argv[1:] diff --git a/qpid/python/commands/qpid-route b/qpid/python/commands/qpid-route index 8a779bb7a3..7d6d3e333e 100755 --- a/qpid/python/commands/qpid-route +++ b/qpid/python/commands/qpid-route @@ -34,6 +34,8 @@ def Usage (): print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>" print " qpid-route [OPTIONS] route list [<dest-broker>]" print " qpid-route [OPTIONS] route flush [<dest-broker>]" + print " qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]" + print " qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>" print print "Options:" print " -v [ --verbose ] Verbose output" @@ -118,7 +120,7 @@ class RouteManager: print "%-16s%-8d %c %-18s%s" % \ (link.host, link.port, YN(link.durable), link.state, link.lastError) - def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes): + def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes, dynamic=False): self.src = qmfconsole.BrokerURL(srcBroker) if self.dest.match(self.src.host, self.src.port): raise Exception("Linking broker to itself is not permitted") @@ -155,13 +157,13 @@ class RouteManager: if _verbose: print "Creating inter-broker binding..." - res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, False, False) - if res.status == 4: - raise Exception("Can't create a durable route on a non-durable link") + res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, False, dynamic) + if res.status != 0: + raise Exception(res.text) if _verbose: print "Bridge method returned:", res.status, res.text - def DelRoute (self, srcBroker, exchange, routingKey): + def DelRoute (self, srcBroker, exchange, routingKey, dynamic=False): self.src = qmfconsole.BrokerURL(srcBroker) link = self.getLink() if link == None: @@ -171,7 +173,8 @@ class RouteManager: bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: - if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey: + if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \ + and bridge.dynamic == dynamic: if _verbose: print "Closing bridge..." res = bridge.close() @@ -201,7 +204,11 @@ class RouteManager: myLink = link break if myLink != None: - print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key) + if bridge.dynamic: + keyText = "<dynamic>" + else: + keyText = bridge.key + print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, keyText) def ClearAllRoutes (self): links = self.qmf.getObjects(_class="link") @@ -285,6 +292,22 @@ try: elif cmd == "list": rm.ListLinks () + elif group == "dynamic": + if cmd == "add": + if nargs < 5 or nargs > 7: + Usage () + + tag = "" + excludes = "" + if nargs > 5: tag = cargs[5] + if nargs > 6: excludes = cargs[6] + rm.AddRoute (cargs[3], cargs[4], "", tag, excludes, dynamic=True) + elif cmd == "del": + if nargs != 5: + Usage () + else: + rm.DelRoute (cargs[3], cargs[4], "", dynamic=True) + elif group == "route": if cmd == "add": if nargs < 6 or nargs > 8: @@ -294,12 +317,12 @@ try: excludes = "" if nargs > 6: tag = cargs[6] if nargs > 7: excludes = cargs[7] - rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes) + rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes, dynamic=False) elif cmd == "del": if nargs != 6: Usage () else: - rm.DelRoute (cargs[3], cargs[4], cargs[5]) + rm.DelRoute (cargs[3], cargs[4], cargs[5], dynamic=False) else: if cmd == "list": rm.ListRoutes () diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index 1f811347e7..96a79f9ee8 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -196,6 +196,7 @@ <property name="queueRef" type="objId" references="Queue" access="RC" index="y"/> <property name="bindingKey" type="sstr" access="RC" index="y"/> <property name="arguments" type="map" access="RC"/> + <property name="origin" type="sstr" access="RO" optional="y"/> <statistic name="msgMatched" type="count64"/> </class> |