diff options
author | Jonathan Robie <jonathan@apache.org> | 2010-10-20 22:15:24 +0000 |
---|---|---|
committer | Jonathan Robie <jonathan@apache.org> | 2010-10-20 22:15:24 +0000 |
commit | fdebc18142c0479cd4e603affac141e7bdd5b50b (patch) | |
tree | e24d10c9ced332377c466d21b7ab08ab2590b9a1 /cpp/src | |
parent | 9cec65bfe71c9a6a1ddb4822bd9e782c20c9f96d (diff) | |
download | qpid-python-fdebc18142c0479cd4e603affac141e7bdd5b50b.tar.gz |
Adds support for federation in the XML exchange.
Resolves QPID-2348 for the XML Exchange. Also made some changes to the file structure for fedop constants.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1025780 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/xml/XmlExchange.cpp | 333 | ||||
-rw-r--r-- | cpp/src/qpid/xml/XmlExchange.h | 60 | ||||
-rwxr-xr-x | cpp/src/tests/federation.py | 159 | ||||
-rwxr-xr-x | cpp/src/tests/run_federation_tests | 26 |
11 files changed, 486 insertions, 192 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 9381f00268..5911d916ad 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/broker/Bridge.h" +#include "qpid/broker/FedOps.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/Connection.h" #include "qpid/broker/Link.h" @@ -37,18 +38,6 @@ using qpid::management::ManagementAgent; using std::string; namespace _qmf = qmf::org::apache::qpid::broker; -namespace -{ -const std::string qpidFedOp("qpid.fed.op"); -const std::string qpidFedTags("qpid.fed.tags"); -const std::string qpidFedOrigin("qpid.fed.origin"); - -const std::string fedOpBind("B"); -const std::string fedOpUnbind("U"); -const std::string fedOpReorigin("R"); -const std::string fedOpHello("H"); -} - namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 0db941f93b..fc52ab3711 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -20,6 +20,7 @@ */ #include "qpid/log/Statement.h" +#include "qpid/broker/FedOps.h" #include "qpid/broker/Queue.h" #include "qpid/broker/DirectExchange.h" #include <iostream> @@ -32,15 +33,7 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { -const std::string qpidFedOp("qpid.fed.op"); -const std::string qpidFedTags("qpid.fed.tags"); -const std::string qpidFedOrigin("qpid.fed.origin"); -const std::string qpidExclusiveBinding("qpid.exclusive-binding"); - -const std::string fedOpBind("B"); -const std::string fedOpUnbind("U"); -const std::string fedOpReorigin("R"); -const std::string fedOpHello("H"); + const std::string qpidExclusiveBinding("qpid.exclusive-binding"); } DirectExchange::DirectExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b) @@ -67,7 +60,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con fedOp = args->getAsString(qpidFedOp); fedTags = args->getAsString(qpidFedTags); fedOrigin = args->getAsString(qpidFedOrigin); - exclusiveBinding = args->get(qpidExclusiveBinding); + exclusiveBinding = args->get(qpidExclusiveBinding); // only direct exchanges take exclusive bindings } bool propagate = false; diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index aaf0805543..b499171418 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -21,6 +21,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/broker/ExchangeRegistry.h" +#include "qpid/broker/FedOps.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Cluster.h" #include "qpid/management/ManagementAgent.h" @@ -43,19 +44,10 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { -const std::string qpidMsgSequence("qpid.msg_sequence"); -const std::string qpidSequenceCounter("qpid.sequence_counter"); -const std::string qpidIVE("qpid.ive"); -const std::string qpidFedOp("qpid.fed.op"); -const std::string qpidFedTags("qpid.fed.tags"); -const std::string qpidFedOrigin("qpid.fed.origin"); - -const std::string fedOpBind("B"); -const std::string fedOpUnbind("U"); -const std::string fedOpReorigin("R"); -const std::string fedOpHello("H"); - -const std::string QPID_MANAGEMENT("qpid.management"); + const std::string qpidMsgSequence("qpid.msg_sequence"); + const std::string qpidSequenceCounter("qpid.sequence_counter"); + const std::string qpidIVE("qpid.ive"); + const std::string QPID_MANAGEMENT("qpid.management"); } diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 4b6b90026b..26d7f41015 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -101,6 +101,13 @@ protected: public: FedBinding() : localBindings(0) {} bool hasLocal() const { return localBindings != 0; } + + /** + * Returns 'true' if and only if this is the first local + * binding. + * + * The first local binding may need to be propagated. + */ bool addOrigin(const std::string& origin) { if (origin.empty()) { localBindings++; @@ -113,6 +120,14 @@ protected: originSet.erase(origin); return true; } + + /** + * Returns 'true' if and only if the last local binding is + * deleted. + * + * When the last local binding is deleted, it may need to + * be propagated. + */ bool delOrigin() { if (localBindings > 0) localBindings--; @@ -145,6 +160,17 @@ public: bool inUseAsAlternate() { return alternateUsers > 0; } virtual std::string getType() const = 0; + + /** + * bind() is used for two distinct purposes: + * + * 1. To create a binding, in the conventional sense + * + * 2. As a vehicle for any FedOp, currently including federated + * binding, federated unbinding, federated reorigin. + * + */ + virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0; diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index d8e55decbd..a33eba1d09 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/broker/FanOutExchange.h" +#include "qpid/broker/FedOps.h" #include <algorithm> using namespace qpid::broker; @@ -26,18 +27,6 @@ using namespace qpid::framing; using namespace qpid::sys; namespace _qmf = qmf::org::apache::qpid::broker; -namespace -{ -const std::string qpidFedOp("qpid.fed.op"); -const std::string qpidFedTags("qpid.fed.tags"); -const std::string qpidFedOrigin("qpid.fed.origin"); - -const std::string fedOpBind("B"); -const std::string fedOpUnbind("U"); -const std::string fedOpReorigin("R"); -const std::string fedOpHello("H"); -} - FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b) { diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 6a723b373e..584cd4c481 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -94,10 +94,11 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co fedTags = args->getAsString(qpidFedTags); fedOrigin = args->getAsString(qpidFedOrigin); } + bool propagate = false; // The federation args get propagated directly, so we need to identify - // the non feteration args in case a federated propagate is needed + // the non federation args in case a federated propagate is needed FieldTable extra_args; getNonFedArgs(args, extra_args); diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 7372e58c4a..3f70f17ea4 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/broker/TopicExchange.h" +#include "qpid/broker/FedOps.h" #include "qpid/log/Statement.h" #include <algorithm> @@ -37,19 +38,6 @@ namespace _qmf = qmf::org::apache::qpid::broker; // - excessive string copying: should be 0 copy, match from original buffer. // - match/lookup: use descision tree or other more efficient structure. -namespace -{ -const std::string qpidFedOp("qpid.fed.op"); -const std::string qpidFedTags("qpid.fed.tags"); -const std::string qpidFedOrigin("qpid.fed.origin"); - -const std::string fedOpBind("B"); -const std::string fedOpUnbind("U"); -const std::string fedOpReorigin("R"); -const std::string fedOpHello("H"); -} - - namespace { // Iterate over a string of '.'-separated tokens. struct TokenIterator { diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp index f0afc8d451..85a6cb4f57 100644 --- a/cpp/src/qpid/xml/XmlExchange.cpp +++ b/cpp/src/qpid/xml/XmlExchange.cpp @@ -26,6 +26,7 @@ #include "qpid/broker/DeliverableMessage.h" #include "qpid/log/Statement.h" +#include "qpid/broker/FedOps.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/reply_exceptions.h" @@ -43,19 +44,63 @@ #include <xqilla/context/ItemFactory.hpp> #include <xqilla/xqilla-simple.hpp> +#include <boost/bind.hpp> +#include <functional> +#include <algorithm> #include <iostream> #include <sstream> using namespace qpid::framing; using namespace qpid::sys; using qpid::management::Manageable; -using std::string; namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { -namespace broker { - +namespace broker { + +XQilla XmlBinding::xqilla; + +XmlBinding::XmlBinding(const std::string& key, const Queue::shared_ptr queue, const std::string& _fedOrigin, Exchange* parent, + const ::qpid::framing::FieldTable& _arguments, const std::string& queryText ) + : Binding(key, queue, parent, _arguments), + xquery(), + parse_message_content(true), + fedOrigin(_fedOrigin) +{ + startManagement(); + + QPID_LOG(trace, "Creating binding with query: " << queryText ); + + try { + Query q(xqilla.parse(X(queryText.c_str()))); + xquery = q; + + QPID_LOG(trace, "Bound successfully with query: " << queryText ); + + parse_message_content = false; + + if (xquery->getQueryBody()->getStaticAnalysis().areContextFlagsUsed()) { + parse_message_content = true; + } + else { + GlobalVariables &vars = const_cast<GlobalVariables&>(xquery->getVariables()); + for (GlobalVariables::iterator it = vars.begin(); it != vars.end(); ++it) { + if ((*it)->getStaticAnalysis().areContextFlagsUsed()) { + parse_message_content = true; + break; + } + } + } + } + catch (XQException& e) { + throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); + } + catch (...) { + throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText)); + } +} + XmlExchange::XmlExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b) { if (mgmtExchange != 0) @@ -69,69 +114,83 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable, if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } + +bool XmlExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args) +{ + + // Federation uses bind for unbind and reorigin comands as well as for binds. + // + // Both federated and local binds are done in this method. Other + // federated requests are done by calling the relevent methods. + + string fedOp; + string fedTags; + string fedOrigin; + + if (args) + fedOp = args->getAsString(qpidFedOp); + if (! fedOp.empty()) { + fedTags = args->getAsString(qpidFedTags); + fedOrigin = args->getAsString(qpidFedOrigin); + } + if (fedOp == fedOpUnbind) { + return fedUnbind(fedOrigin, fedTags, queue, bindingKey, args); + } + else if (fedOp == fedOpReorigin) { + fedReorigin(); + return true; + } - // #### TODO: The Binding should take the query text - // #### only. Consider encapsulating the entire block, including - // #### the if condition. - - -bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments) -{ - string queryText = bindingArguments->getAsString("xquery"); - - try { - RWlock::ScopedWlock l(lock); - - XmlBinding::vector& bindings(bindingsMap[routingKey]); - XmlBinding::vector::ConstPtr p = bindings.snapshot(); - if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == p->end()) { - Query query(xqilla.parse(X(queryText.c_str()))); - XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, *bindingArguments, query)); + // OK, looks like we're really going to bind + + else if (fedOp.empty() || fedOp == fedOpBind) { - QPID_LOG(trace, "Bound successfully with query: " << queryText ); + string queryText = args->getAsString("xquery"); - binding->parse_message_content = false; + RWlock::ScopedWlock l(lock); + + XmlBinding::vector& bindings(bindingsMap[bindingKey]); + XmlBinding::vector::ConstPtr p = bindings.snapshot(); + + if (!p || std::find_if(p->begin(), p->end(), MatchQueueAndOrigin(queue, fedOrigin)) == p->end()) { - if (query->getQueryBody()->getStaticAnalysis().areContextFlagsUsed()) { - binding->parse_message_content = true; - } - else { - GlobalVariables &vars = const_cast<GlobalVariables&>(query->getVariables()); - for(GlobalVariables::iterator it = vars.begin(); it != vars.end(); ++it) { - if ((*it)->getStaticAnalysis().areContextFlagsUsed()) { - binding->parse_message_content = true; - break; - } - } - } + XmlBinding::shared_ptr binding(new XmlBinding (bindingKey, queue, fedOrigin, this, *args, queryText)); + bindings.add(binding); - bindings.add(binding); - if (mgmtExchange != 0) { - mgmtExchange->inc_bindingCount(); - ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); - } - } else { - return false; - } - } - catch (XQException& e) { - throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); + if (mgmtExchange != 0) { + mgmtExchange->inc_bindingCount(); + } + } else { + return false; + } } - catch (...) { - throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText)); + else { + QPID_LOG(warning, "Unknown Federation Op: " << fedOp); } + routeIVE(); - return true; + propagateFedOp(bindingKey, fedTags, fedOp, fedOrigin, args); + + return true; } -bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/) +bool XmlExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args) { + /* + * When called directly, no qpidFedOrigin argument will be + * present. When called from federation, it will be present. + * + * This is a bit of a hack - the binding needs the origin, but + * this interface, as originally defined, would not supply one. + */ + string fedOrigin; + if (args) fedOrigin = args->getAsString(qpidFedOrigin); + RWlock::ScopedWlock l(lock); - if (bindingsMap[routingKey].remove_if(MatchQueue(queue))) { + if (bindingsMap[bindingKey].remove_if(MatchQueueAndOrigin(queue, fedOrigin))) { if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); - ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount(); } return true; } else { @@ -141,65 +200,65 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content) { - string msgContent; + string msgContent; - try { - QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]"); + try { + QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]"); - boost::scoped_ptr<DynamicContext> context(query->createDynamicContext()); - if (!context.get()) { - throw InternalErrorException(QPID_MSG("Query context looks munged ...")); - } + boost::scoped_ptr<DynamicContext> context(query->createDynamicContext()); + if (!context.get()) { + throw InternalErrorException(QPID_MSG("Query context looks munged ...")); + } - if (parse_message_content) { + if (parse_message_content) { - msg.getMessage().getFrames().getContent(msgContent); + msg.getMessage().getFrames().getContent(msgContent); - QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); + QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); - XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), - msgContent.length(), "input" ); + XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), + msgContent.length(), "input" ); - // This will parse the document using either Xerces or FastXDM, depending - // on your XQilla configuration. FastXDM can be as much as 10x faster. + // This will parse the document using either Xerces or FastXDM, depending + // on your XQilla configuration. FastXDM can be as much as 10x faster. - Sequence seq(context->parseDocument(xml)); + Sequence seq(context->parseDocument(xml)); - if(!seq.isEmpty() && seq.first()->isNode()) { - context->setContextItem(seq.first()); - context->setContextPosition(1); - context->setContextSize(1); - } - } + if(!seq.isEmpty() && seq.first()->isNode()) { + context->setContextItem(seq.first()); + context->setContextPosition(1); + context->setContextSize(1); + } + } - if (args) { - FieldTable::ValueMap::const_iterator v = args->begin(); - for(; v != args->end(); ++v) { - // ### TODO: Do types properly - if (v->second->convertsTo<std::string>()) { - QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); - Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); - context->setExternalVariable(X(v->first.c_str()), value); - } - } - } + if (args) { + FieldTable::ValueMap::const_iterator v = args->begin(); + for(; v != args->end(); ++v) { + // ### TODO: Do types properly + if (v->second->convertsTo<std::string>()) { + QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); + Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); + context->setExternalVariable(X(v->first.c_str()), value); + } + } + } - Result result = query->execute(context.get()); + Result result = query->execute(context.get()); #ifdef XQ_EFFECTIVE_BOOLEAN_VALUE_HPP - Item::Ptr first_ = result->next(context.get()); - Item::Ptr second_ = result->next(context.get()); - return XQEffectiveBooleanValue::get(first_, second_, context.get(), 0); + Item::Ptr first_ = result->next(context.get()); + Item::Ptr second_ = result->next(context.get()); + return XQEffectiveBooleanValue::get(first_, second_, context.get(), 0); #else - return result->getEffectiveBooleanValue(context.get(), 0); + return result->getEffectiveBooleanValue(context.get(), 0); #endif - } - catch (XQException& e) { - QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent); - } - catch (...) { - QPID_LOG(warning, "Unexpected error routing message: " << msgContent); - } - return 0; + } + catch (XQException& e) { + QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent); + } + catch (...) { + QPID_LOG(warning, "Unexpected error routing message: " << msgContent); + } + return 0; } // Future optimization: If any query in a binding for a given routing key requires @@ -237,16 +296,16 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT } -bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) +bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const bindingKey, const FieldTable* const) { RWlock::ScopedRlock l(lock); - if (routingKey) { - XmlBindingsMap::iterator i = bindingsMap.find(*routingKey); + if (bindingKey) { + XmlBindingsMap::iterator i = bindingsMap.find(*bindingKey); if (i == bindingsMap.end()) - return false; + return false; if (!queue) - return true; + return true; XmlBinding::vector::ConstPtr p = i->second.snapshot(); return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end(); } else if (!queue) { @@ -254,20 +313,84 @@ bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKe return bindingsMap.size() > 0; } else { for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); i++) { - XmlBinding::vector::ConstPtr p = i->second.snapshot(); + XmlBinding::vector::ConstPtr p = i->second.snapshot(); if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true; - } - return false; + } + return false; } } - XmlExchange::~XmlExchange() { bindingsMap.clear(); } +void XmlExchange::propagateFedOp(const std::string& bindingKey, const std::string& fedTags, const std::string& fedOp, const std::string& fedOrigin, const qpid::framing::FieldTable* args) +{ + FieldTable nonFedArgs; + + if (args) { + for (qpid::framing::FieldTable::ValueMap::const_iterator i=args->begin(); i != args->end(); ++i) { + const string& name(i->first); + if (name != qpidFedOp && + name != qpidFedTags && + name != qpidFedOrigin) { + nonFedArgs.insert((*i)); + } + } + } + + FieldTable* propArgs = (nonFedArgs.count() > 0 ? &nonFedArgs : 0); + Exchange::propagateFedOp(bindingKey, fedTags, fedOp, fedOrigin, propArgs); +} + +bool XmlExchange::fedUnbind(const string& fedOrigin, const string& fedTags, Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args) +{ + RWlock::ScopedRlock l(lock); + + if (unbind(queue, bindingKey, args)) { + propagateFedOp(bindingKey, fedTags, fedOpUnbind, fedOrigin); + return true; + } + return false; +} + +void XmlExchange::fedReorigin() +{ + std::vector<std::string> keys2prop; + { + RWlock::ScopedRlock l(lock); + for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); ++i) { + XmlBinding::vector::ConstPtr p = i->second.snapshot(); + if (std::find_if(p->begin(), p->end(), MatchOrigin(string())) != p->end()) { + keys2prop.push_back(i->first); + } + } + } /* lock dropped */ + for (std::vector<std::string>::const_iterator key = keys2prop.begin(); + key != keys2prop.end(); key++) { + propagateFedOp( *key, string(), fedOpBind, string()); + } +} + + +XmlExchange::MatchOrigin::MatchOrigin(const string& _origin) : origin(_origin) {} + +bool XmlExchange::MatchOrigin::operator()(XmlBinding::shared_ptr b) +{ + return b->fedOrigin == origin; +} + + +XmlExchange::MatchQueueAndOrigin::MatchQueueAndOrigin(Queue::shared_ptr _queue, const string& _origin) : queue(_queue), origin(_origin) {} + +bool XmlExchange::MatchQueueAndOrigin::operator()(XmlBinding::shared_ptr b) +{ + return b->queue == queue and b->fedOrigin == origin; +} + + const std::string XmlExchange::typeName("xml"); } diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h index f34c417633..958bad4931 100644 --- a/cpp/src/qpid/xml/XmlExchange.h +++ b/cpp/src/qpid/xml/XmlExchange.h @@ -33,41 +33,45 @@ #include <map> #include <vector> +#include <string> + +using namespace std; namespace qpid { namespace broker { class Broker; -class XmlExchange : public virtual Exchange { - typedef boost::shared_ptr<XQQuery> Query; +typedef boost::shared_ptr<XQQuery> Query; - struct XmlBinding : public Exchange::Binding { - typedef boost::shared_ptr<XmlBinding> shared_ptr; - typedef qpid::sys::CopyOnWriteArray<XmlBinding::shared_ptr> vector; +struct XmlBinding : public Exchange::Binding { - boost::shared_ptr<XQQuery> xquery; - bool parse_message_content; - - XmlBinding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent, - const ::qpid::framing::FieldTable& _arguments, Query query): - Binding(key, queue, parent, _arguments), - xquery(query), - parse_message_content(true) { startManagement(); } - }; + static XQilla xqilla; + typedef boost::shared_ptr<XmlBinding> shared_ptr; + typedef qpid::sys::CopyOnWriteArray<XmlBinding::shared_ptr> vector; + + Query xquery; + bool parse_message_content; + const std::string fedOrigin; // empty for local bindings + + XmlBinding(const std::string& key, const Queue::shared_ptr queue, const std::string& fedOrigin, Exchange* parent, + const ::qpid::framing::FieldTable& _arguments, const std::string& ); - typedef std::map<std::string, XmlBinding::vector > XmlBindingsMap; +}; +class XmlExchange : public virtual Exchange { + + typedef std::map<string, XmlBinding::vector> XmlBindingsMap; XmlBindingsMap bindingsMap; - XQilla xqilla; + qpid::sys::RWlock lock; bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content); public: static const std::string typeName; - + XmlExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0); XmlExchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0); @@ -82,7 +86,29 @@ class XmlExchange : public virtual Exchange { virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args); + virtual void propagateFedOp(const std::string& bindingKey, const std::string& fedTags, const std::string& fedOp, const std::string& fedOrigin, const qpid::framing::FieldTable* args=0); + + virtual bool fedUnbind(const std::string& fedOrigin, const std::string& fedTags, Queue::shared_ptr queue, const std::string& bindingKey, const qpid::framing::FieldTable* args); + + virtual void fedReorigin(); + + virtual bool supportsDynamicBinding() { return true; } + virtual ~XmlExchange(); + + struct MatchOrigin { + const std::string origin; + MatchOrigin(const std::string& origin); + bool operator()(XmlBinding::shared_ptr b); + }; + + struct MatchQueueAndOrigin { + const Queue::shared_ptr queue; + const std::string origin; + MatchQueueAndOrigin(Queue::shared_ptr queue, const std::string& origin); + bool operator()(XmlBinding::shared_ptr b); + }; + }; diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index 8ca78499d3..92a28c01ad 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -802,6 +802,165 @@ class FederationTests(TestBase010): self.verify_cleanup() + + def test_dynamic_headers_xml(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_headers_xml") + + session.exchange_declare(exchange="fed.xml", type="xml") + r_session.exchange_declare(exchange="fed.xml", type="xml") + + self.startQmf() + qmf = self.qmf + + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.xml", "fed.xml", "", "", "", False, False, True, 0) + + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.xml", binding_key="key1", arguments={'xquery':'true()'}) + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + props = r_session.delivery_properties(routing_key="key1") + for i in range(1, 11): + r_session.message_transfer(destination="fed.xml", message=Message(props, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + content = msg.body + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_headers_reorigin_xml(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_headers_reorigin_xml") + + session.exchange_declare(exchange="fed.xml_reorigin", type="xml") + r_session.exchange_declare(exchange="fed.xml_reorigin", type="xml") + + session.exchange_declare(exchange="fed.xml_reorigin_2", type="xml") + r_session.exchange_declare(exchange="fed.xml_reorigin_2", type="xml") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + session.queue_declare(queue="fed2", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed2", exchange="fed.xml_reorigin_2", binding_key="key2", arguments={'xquery':'true()'}) + self.subscribe(queue="fed2", destination="f2") + queue2 = session.incoming("f2") + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.xml_reorigin", "fed.xml_reorigin", "", "", "", False, False, True, 0) + + self.assertEqual(result.status, 0) + result = link.bridge(False, "fed.xml_reorigin_2", "fed.xml_reorigin_2", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] + bridge2 = qmf.getObjects(_class="bridge")[1] + sleep(5) + + foo=qmf.getObjects(_class="link") + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.xml_reorigin", binding_key="key1", arguments={'xquery':'true()'}) + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + props = r_session.delivery_properties(routing_key="key1") + for i in range(1, 11): + r_session.message_transfer(destination="fed.xml_reorigin", message=Message(props, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + + # Extra test: don't explicitly close() bridge2. When the link is closed, + # it should clean up bridge2 automagically. verify_cleanup() will detect + # if bridge2 isn't cleaned up and will fail the test. + # + #result = bridge2.close() + #self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_headers_unbind_xml(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_xml_unbind") + + session.exchange_declare(exchange="fed.xml_unbind", type="xml") + r_session.exchange_declare(exchange="fed.xml_unbind", type="xml") + + self.startQmf() + qmf = self.qmf + + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.xml_unbind", "fed.xml_unbind", "", "", "", False, False, True, 0) + + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + queue = qmf.getObjects(_class="queue", name="fed1")[0] + queue.update() + self.assertEqual(queue.bindingCount, 1, + "bindings not accounted for (expected 1, got %d)" % queue.bindingCount) + + session.exchange_bind(queue="fed1", exchange="fed.xml_unbind", binding_key="key1", arguments={'xquery':'true()'}) + queue.update() + self.assertEqual(queue.bindingCount, 2, + "bindings not accounted for (expected 2, got %d)" % queue.bindingCount) + + session.exchange_unbind(queue="fed1", exchange="fed.xml_unbind", binding_key="key1") + queue.update() + self.assertEqual(queue.bindingCount, 1, + "bindings not accounted for (expected 1, got %d)" % queue.bindingCount) + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_topic_nodup(self): """Verify that a message whose routing key matches more than one binding does not get duplicated to the same queue. diff --git a/cpp/src/tests/run_federation_tests b/cpp/src/tests/run_federation_tests index f5bb123d0a..4be27a2e85 100755 --- a/cpp/src/tests/run_federation_tests +++ b/cpp/src/tests/run_federation_tests @@ -25,29 +25,37 @@ source ./test_env.sh trap stop_brokers INT TERM QUIT +if [ -f ../.libs/xml.so ] ; then + MODULES="--load-module xml" # Load the XML exchange and run XML exchange federation tests + SKIPTESTS="" +else + MODULES="--no-module-dir" + SKIPTESTS="-i *xml" +fi + start_brokers() { - ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port + ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port LOCAL_PORT=`cat qpidd.port` - ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port + ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port REMOTE_PORT=`cat qpidd.port` - ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port + ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port REMOTE_B1=`cat qpidd.port` - ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port + ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port REMOTE_B2=`cat qpidd.port` } stop_brokers() { - $QPIDD_EXEC --no-module-dir -q --port $LOCAL_PORT - $QPIDD_EXEC --no-module-dir -q --port $REMOTE_PORT - $QPIDD_EXEC --no-module-dir -q --port $REMOTE_B1 - $QPIDD_EXEC --no-module-dir -q --port $REMOTE_B2 + $QPIDD_EXEC $MODULES -q --port $LOCAL_PORT + $QPIDD_EXEC $MODULES -q --port $REMOTE_PORT + $QPIDD_EXEC $MODULES -q --port $REMOTE_B1 + $QPIDD_EXEC $MODULES -q --port $REMOTE_B2 } if test -d ${PYTHON_DIR} ; then start_brokers echo "Running federation tests using brokers on ports $LOCAL_PORT $REMOTE_PORT $REMOTE_B1 $REMOTE_B2" - $QPID_PYTHON_TEST -m federation -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT -Dextra-brokers="$REMOTE_B1 $REMOTE_B2" $@ + $QPID_PYTHON_TEST -m federation $SKIPTESTS -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT -Dextra-brokers="$REMOTE_B1 $REMOTE_B2" $@ RETCODE=$? stop_brokers if test x$RETCODE != x0; then |