diff options
Diffstat (limited to 'qpid/cpp/src/qpid/xml/XmlExchange.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/xml/XmlExchange.cpp | 128 |
1 files changed, 75 insertions, 53 deletions
diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.cpp b/qpid/cpp/src/qpid/xml/XmlExchange.cpp index 5197b239d0..4e9de49ad5 100644 --- a/qpid/cpp/src/qpid/xml/XmlExchange.cpp +++ b/qpid/cpp/src/qpid/xml/XmlExchange.cpp @@ -33,6 +33,9 @@ #include "qpid/Plugin.h" #include <xercesc/framework/MemBufInputSource.hpp> + +#include <xqilla/ast/XQGlobalVariable.hpp> + #include <xqilla/context/ItemFactory.hpp> #include <xqilla/xqilla-simple.hpp> @@ -62,17 +65,6 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable, mgmtExchange->set_type (typeName); } -/* - * Use the name of the query as the binding key. - * - * The first time a given name is used in a binding, the query body - * must be provided.After that, no query body should be present. - * - * To modify an installed query, the user must first unbind the - * existing query, then replace it by binding again with the same - * name. - * - */ // #### TODO: The Binding should take the query text // #### only. Consider encapsulating the entire block, including @@ -94,6 +86,21 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const bindings.add(binding); QPID_LOG(trace, "Bound successfully with query: " << queryText ); + binding->parse_message_content = false; + + if (query->getQueryBody()->getStaticAnalysis().areContextFlagsUsed()) { + binding->parse_message_content = true; + } + else { + GlobalVariables &vars = const_cast<GlobalVariables&>(query->getVariables()); + for(GlobalVariables::iterator it = vars.begin(); it != vars.end(); ++it) { + if ((*it)->getStaticAnalysis().areContextFlagsUsed()) { + binding->parse_message_content = true; + break; + } + } + } + if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); @@ -126,59 +133,74 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons } } -bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args) +bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content) { - // ### TODO: Need istream for frameset - // Hack alert - the following code does not work for really large messages - string msgContent; try { - msg.getMessage().getFrames().getContent(msgContent); - - QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]"); - QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); - - boost::scoped_ptr<DynamicContext> context(query->createDynamicContext()); - if (!context.get()) { - throw InternalErrorException(QPID_MSG("Query context looks munged ...")); - } - - XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), - msgContent.length(), "input" ); - Sequence seq(context->parseDocument(xml)); - - if (args) { - FieldTable::ValueMap::const_iterator v = args->begin(); - for(; v != args->end(); ++v) { - // ### TODO: Do types properly - if (v->second->convertsTo<std::string>()) { - QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); - Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); - context->setExternalVariable(X(v->first.c_str()), value); - } - } - } - - if(!seq.isEmpty() && seq.first()->isNode()) { - context->setContextItem(seq.first()); - context->setContextPosition(1); - context->setContextSize(1); - } - Result result = query->execute(context.get()); - return result->getEffectiveBooleanValue(context.get(), 0); + QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]"); + + boost::scoped_ptr<DynamicContext> context(query->createDynamicContext()); + if (!context.get()) { + throw InternalErrorException(QPID_MSG("Query context looks munged ...")); + } + + if (parse_message_content) { + + msg.getMessage().getFrames().getContent(msgContent); + + QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); + + XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), + msgContent.length(), "input" ); + + // This will parse the document using either Xerces or FastXDM, depending + // on your XQilla configuration. FastXDM can be as much as 10x faster. + + Sequence seq(context->parseDocument(xml)); + + if(!seq.isEmpty() && seq.first()->isNode()) { + context->setContextItem(seq.first()); + context->setContextPosition(1); + context->setContextSize(1); + } + } + + if (args) { + FieldTable::ValueMap::const_iterator v = args->begin(); + for(; v != args->end(); ++v) { + // ### TODO: Do types properly + if (v->second->convertsTo<std::string>()) { + QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); + Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); + context->setExternalVariable(X(v->first.c_str()), value); + } + } + } + + Result result = query->execute(context.get()); + return result->getEffectiveBooleanValue(context.get(), 0); } catch (XQException& e) { - QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent); - return 0; + QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent); } catch (...) { - QPID_LOG(warning, "Unexpected error routing message: " << msgContent); - return 0; + QPID_LOG(warning, "Unexpected error routing message: " << msgContent); } return 0; } +// Future optimization: If any query in a binding for a given routing key requires +// message content, parse the message once, and use that parsed form for all bindings. +// +// Future optimization: XQilla does not currently do document projection for data +// accessed via the context item. If there is a single query for a given routing key, +// and it accesses document data, this could be a big win. +// +// Document projection often is not a win if you have multiple queries on the same data. +// But for very large messages, if all these queries are on the first part of the data, +// it could still be a big win. + void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args) { PreRoute pr(msg, this); @@ -192,7 +214,7 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT int count(0); for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) { - if ((*i)->xquery && matches((*i)->xquery, msg, args)) { // Overly defensive? There should always be a query ... + if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { msg.deliverTo((*i)->queue); count++; QPID_LOG(trace, "Delivered to queue" ); |