diff options
Diffstat (limited to 'cpp/src/qpid/xml/XmlExchange.cpp')
-rw-r--r-- | cpp/src/qpid/xml/XmlExchange.cpp | 333 |
1 files changed, 228 insertions, 105 deletions
diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp index f0afc8d451..85a6cb4f57 100644 --- a/cpp/src/qpid/xml/XmlExchange.cpp +++ b/cpp/src/qpid/xml/XmlExchange.cpp @@ -26,6 +26,7 @@ #include "qpid/broker/DeliverableMessage.h" #include "qpid/log/Statement.h" +#include "qpid/broker/FedOps.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/reply_exceptions.h" @@ -43,19 +44,63 @@ #include <xqilla/context/ItemFactory.hpp> #include <xqilla/xqilla-simple.hpp> +#include <boost/bind.hpp> +#include <functional> +#include <algorithm> #include <iostream> #include <sstream> using namespace qpid::framing; using namespace qpid::sys; using qpid::management::Manageable; -using std::string; namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { -namespace broker { - +namespace broker { + +XQilla XmlBinding::xqilla; + +XmlBinding::XmlBinding(const std::string& key, const Queue::shared_ptr queue, const std::string& _fedOrigin, Exchange* parent, + const ::qpid::framing::FieldTable& _arguments, const std::string& queryText ) + : Binding(key, queue, parent, _arguments), + xquery(), + parse_message_content(true), + fedOrigin(_fedOrigin) +{ + startManagement(); + + QPID_LOG(trace, "Creating binding with query: " << queryText ); + + try { + Query q(xqilla.parse(X(queryText.c_str()))); + xquery = q; + + QPID_LOG(trace, "Bound successfully with query: " << queryText ); + + parse_message_content = false; + + if (xquery->getQueryBody()->getStaticAnalysis().areContextFlagsUsed()) { + parse_message_content = true; + } + else { + GlobalVariables &vars = const_cast<GlobalVariables&>(xquery->getVariables()); + for (GlobalVariables::iterator it = vars.begin(); it != vars.end(); ++it) { + if ((*it)->getStaticAnalysis().areContextFlagsUsed()) { + parse_message_content = true; + break; + } + } + } + } + catch (XQException& e) { + throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); + } + catch (...) { + throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText)); + } +} + XmlExchange::XmlExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b) { if (mgmtExchange != 0) @@ -69,69 +114,83 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable, if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } + +bool XmlExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args) +{ + + // Federation uses bind for unbind and reorigin comands as well as for binds. + // + // Both federated and local binds are done in this method. Other + // federated requests are done by calling the relevent methods. + + string fedOp; + string fedTags; + string fedOrigin; + + if (args) + fedOp = args->getAsString(qpidFedOp); + if (! fedOp.empty()) { + fedTags = args->getAsString(qpidFedTags); + fedOrigin = args->getAsString(qpidFedOrigin); + } + if (fedOp == fedOpUnbind) { + return fedUnbind(fedOrigin, fedTags, queue, bindingKey, args); + } + else if (fedOp == fedOpReorigin) { + fedReorigin(); + return true; + } - // #### TODO: The Binding should take the query text - // #### only. Consider encapsulating the entire block, including - // #### the if condition. - - -bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments) -{ - string queryText = bindingArguments->getAsString("xquery"); - - try { - RWlock::ScopedWlock l(lock); - - XmlBinding::vector& bindings(bindingsMap[routingKey]); - XmlBinding::vector::ConstPtr p = bindings.snapshot(); - if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == p->end()) { - Query query(xqilla.parse(X(queryText.c_str()))); - XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, *bindingArguments, query)); + // OK, looks like we're really going to bind + + else if (fedOp.empty() || fedOp == fedOpBind) { - QPID_LOG(trace, "Bound successfully with query: " << queryText ); + string queryText = args->getAsString("xquery"); - binding->parse_message_content = false; + RWlock::ScopedWlock l(lock); + + XmlBinding::vector& bindings(bindingsMap[bindingKey]); + XmlBinding::vector::ConstPtr p = bindings.snapshot(); + + if (!p || std::find_if(p->begin(), p->end(), MatchQueueAndOrigin(queue, fedOrigin)) == p->end()) { - if (query->getQueryBody()->getStaticAnalysis().areContextFlagsUsed()) { - binding->parse_message_content = true; - } - else { - GlobalVariables &vars = const_cast<GlobalVariables&>(query->getVariables()); - for(GlobalVariables::iterator it = vars.begin(); it != vars.end(); ++it) { - if ((*it)->getStaticAnalysis().areContextFlagsUsed()) { - binding->parse_message_content = true; - break; - } - } - } + XmlBinding::shared_ptr binding(new XmlBinding (bindingKey, queue, fedOrigin, this, *args, queryText)); + bindings.add(binding); - bindings.add(binding); - if (mgmtExchange != 0) { - mgmtExchange->inc_bindingCount(); - ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); - } - } else { - return false; - } - } - catch (XQException& e) { - throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); + if (mgmtExchange != 0) { + mgmtExchange->inc_bindingCount(); + } + } else { + return false; + } } - catch (...) { - throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText)); + else { + QPID_LOG(warning, "Unknown Federation Op: " << fedOp); } + routeIVE(); - return true; + propagateFedOp(bindingKey, fedTags, fedOp, fedOrigin, args); + + return true; } -bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/) +bool XmlExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args) { + /* + * When called directly, no qpidFedOrigin argument will be + * present. When called from federation, it will be present. + * + * This is a bit of a hack - the binding needs the origin, but + * this interface, as originally defined, would not supply one. + */ + string fedOrigin; + if (args) fedOrigin = args->getAsString(qpidFedOrigin); + RWlock::ScopedWlock l(lock); - if (bindingsMap[routingKey].remove_if(MatchQueue(queue))) { + if (bindingsMap[bindingKey].remove_if(MatchQueueAndOrigin(queue, fedOrigin))) { if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); - ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount(); } return true; } else { @@ -141,65 +200,65 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content) { - string msgContent; + string msgContent; - try { - QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]"); + try { + QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]"); - boost::scoped_ptr<DynamicContext> context(query->createDynamicContext()); - if (!context.get()) { - throw InternalErrorException(QPID_MSG("Query context looks munged ...")); - } + boost::scoped_ptr<DynamicContext> context(query->createDynamicContext()); + if (!context.get()) { + throw InternalErrorException(QPID_MSG("Query context looks munged ...")); + } - if (parse_message_content) { + if (parse_message_content) { - msg.getMessage().getFrames().getContent(msgContent); + msg.getMessage().getFrames().getContent(msgContent); - QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); + QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); - XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), - msgContent.length(), "input" ); + XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), + msgContent.length(), "input" ); - // This will parse the document using either Xerces or FastXDM, depending - // on your XQilla configuration. FastXDM can be as much as 10x faster. + // This will parse the document using either Xerces or FastXDM, depending + // on your XQilla configuration. FastXDM can be as much as 10x faster. - Sequence seq(context->parseDocument(xml)); + Sequence seq(context->parseDocument(xml)); - if(!seq.isEmpty() && seq.first()->isNode()) { - context->setContextItem(seq.first()); - context->setContextPosition(1); - context->setContextSize(1); - } - } + if(!seq.isEmpty() && seq.first()->isNode()) { + context->setContextItem(seq.first()); + context->setContextPosition(1); + context->setContextSize(1); + } + } - if (args) { - FieldTable::ValueMap::const_iterator v = args->begin(); - for(; v != args->end(); ++v) { - // ### TODO: Do types properly - if (v->second->convertsTo<std::string>()) { - QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); - Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); - context->setExternalVariable(X(v->first.c_str()), value); - } - } - } + if (args) { + FieldTable::ValueMap::const_iterator v = args->begin(); + for(; v != args->end(); ++v) { + // ### TODO: Do types properly + if (v->second->convertsTo<std::string>()) { + QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); + Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); + context->setExternalVariable(X(v->first.c_str()), value); + } + } + } - Result result = query->execute(context.get()); + Result result = query->execute(context.get()); #ifdef XQ_EFFECTIVE_BOOLEAN_VALUE_HPP - Item::Ptr first_ = result->next(context.get()); - Item::Ptr second_ = result->next(context.get()); - return XQEffectiveBooleanValue::get(first_, second_, context.get(), 0); + Item::Ptr first_ = result->next(context.get()); + Item::Ptr second_ = result->next(context.get()); + return XQEffectiveBooleanValue::get(first_, second_, context.get(), 0); #else - return result->getEffectiveBooleanValue(context.get(), 0); + return result->getEffectiveBooleanValue(context.get(), 0); #endif - } - catch (XQException& e) { - QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent); - } - catch (...) { - QPID_LOG(warning, "Unexpected error routing message: " << msgContent); - } - return 0; + } + catch (XQException& e) { + QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent); + } + catch (...) { + QPID_LOG(warning, "Unexpected error routing message: " << msgContent); + } + return 0; } // Future optimization: If any query in a binding for a given routing key requires @@ -237,16 +296,16 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT } -bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) +bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const bindingKey, const FieldTable* const) { RWlock::ScopedRlock l(lock); - if (routingKey) { - XmlBindingsMap::iterator i = bindingsMap.find(*routingKey); + if (bindingKey) { + XmlBindingsMap::iterator i = bindingsMap.find(*bindingKey); if (i == bindingsMap.end()) - return false; + return false; if (!queue) - return true; + return true; XmlBinding::vector::ConstPtr p = i->second.snapshot(); return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end(); } else if (!queue) { @@ -254,20 +313,84 @@ bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKe return bindingsMap.size() > 0; } else { for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); i++) { - XmlBinding::vector::ConstPtr p = i->second.snapshot(); + XmlBinding::vector::ConstPtr p = i->second.snapshot(); if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true; - } - return false; + } + return false; } } - XmlExchange::~XmlExchange() { bindingsMap.clear(); } +void XmlExchange::propagateFedOp(const std::string& bindingKey, const std::string& fedTags, const std::string& fedOp, const std::string& fedOrigin, const qpid::framing::FieldTable* args) +{ + FieldTable nonFedArgs; + + if (args) { + for (qpid::framing::FieldTable::ValueMap::const_iterator i=args->begin(); i != args->end(); ++i) { + const string& name(i->first); + if (name != qpidFedOp && + name != qpidFedTags && + name != qpidFedOrigin) { + nonFedArgs.insert((*i)); + } + } + } + + FieldTable* propArgs = (nonFedArgs.count() > 0 ? &nonFedArgs : 0); + Exchange::propagateFedOp(bindingKey, fedTags, fedOp, fedOrigin, propArgs); +} + +bool XmlExchange::fedUnbind(const string& fedOrigin, const string& fedTags, Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args) +{ + RWlock::ScopedRlock l(lock); + + if (unbind(queue, bindingKey, args)) { + propagateFedOp(bindingKey, fedTags, fedOpUnbind, fedOrigin); + return true; + } + return false; +} + +void XmlExchange::fedReorigin() +{ + std::vector<std::string> keys2prop; + { + RWlock::ScopedRlock l(lock); + for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); ++i) { + XmlBinding::vector::ConstPtr p = i->second.snapshot(); + if (std::find_if(p->begin(), p->end(), MatchOrigin(string())) != p->end()) { + keys2prop.push_back(i->first); + } + } + } /* lock dropped */ + for (std::vector<std::string>::const_iterator key = keys2prop.begin(); + key != keys2prop.end(); key++) { + propagateFedOp( *key, string(), fedOpBind, string()); + } +} + + +XmlExchange::MatchOrigin::MatchOrigin(const string& _origin) : origin(_origin) {} + +bool XmlExchange::MatchOrigin::operator()(XmlBinding::shared_ptr b) +{ + return b->fedOrigin == origin; +} + + +XmlExchange::MatchQueueAndOrigin::MatchQueueAndOrigin(Queue::shared_ptr _queue, const string& _origin) : queue(_queue), origin(_origin) {} + +bool XmlExchange::MatchQueueAndOrigin::operator()(XmlBinding::shared_ptr b) +{ + return b->queue == queue and b->fedOrigin == origin; +} + + const std::string XmlExchange::typeName("xml"); } |