summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/xml/XmlExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/xml/XmlExchange.cpp')
-rw-r--r--qpid/cpp/src/qpid/xml/XmlExchange.cpp128
1 files changed, 75 insertions, 53 deletions
diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.cpp b/qpid/cpp/src/qpid/xml/XmlExchange.cpp
index 5197b239d0..4e9de49ad5 100644
--- a/qpid/cpp/src/qpid/xml/XmlExchange.cpp
+++ b/qpid/cpp/src/qpid/xml/XmlExchange.cpp
@@ -33,6 +33,9 @@
#include "qpid/Plugin.h"
#include <xercesc/framework/MemBufInputSource.hpp>
+
+#include <xqilla/ast/XQGlobalVariable.hpp>
+
#include <xqilla/context/ItemFactory.hpp>
#include <xqilla/xqilla-simple.hpp>
@@ -62,17 +65,6 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-/*
- * Use the name of the query as the binding key.
- *
- * The first time a given name is used in a binding, the query body
- * must be provided.After that, no query body should be present.
- *
- * To modify an installed query, the user must first unbind the
- * existing query, then replace it by binding again with the same
- * name.
- *
- */
// #### TODO: The Binding should take the query text
// #### only. Consider encapsulating the entire block, including
@@ -94,6 +86,21 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const
bindings.add(binding);
QPID_LOG(trace, "Bound successfully with query: " << queryText );
+ binding->parse_message_content = false;
+
+ if (query->getQueryBody()->getStaticAnalysis().areContextFlagsUsed()) {
+ binding->parse_message_content = true;
+ }
+ else {
+ GlobalVariables &vars = const_cast<GlobalVariables&>(query->getVariables());
+ for(GlobalVariables::iterator it = vars.begin(); it != vars.end(); ++it) {
+ if ((*it)->getStaticAnalysis().areContextFlagsUsed()) {
+ binding->parse_message_content = true;
+ break;
+ }
+ }
+ }
+
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
@@ -126,59 +133,74 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons
}
}
-bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args)
+bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content)
{
- // ### TODO: Need istream for frameset
- // Hack alert - the following code does not work for really large messages
-
string msgContent;
try {
- msg.getMessage().getFrames().getContent(msgContent);
-
- QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]");
- QPID_LOG(trace, "matches: message content is [" << msgContent << "]");
-
- boost::scoped_ptr<DynamicContext> context(query->createDynamicContext());
- if (!context.get()) {
- throw InternalErrorException(QPID_MSG("Query context looks munged ..."));
- }
-
- XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(),
- msgContent.length(), "input" );
- Sequence seq(context->parseDocument(xml));
-
- if (args) {
- FieldTable::ValueMap::const_iterator v = args->begin();
- for(; v != args->end(); ++v) {
- // ### TODO: Do types properly
- if (v->second->convertsTo<std::string>()) {
- QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str());
- Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get());
- context->setExternalVariable(X(v->first.c_str()), value);
- }
- }
- }
-
- if(!seq.isEmpty() && seq.first()->isNode()) {
- context->setContextItem(seq.first());
- context->setContextPosition(1);
- context->setContextSize(1);
- }
- Result result = query->execute(context.get());
- return result->getEffectiveBooleanValue(context.get(), 0);
+ QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]");
+
+ boost::scoped_ptr<DynamicContext> context(query->createDynamicContext());
+ if (!context.get()) {
+ throw InternalErrorException(QPID_MSG("Query context looks munged ..."));
+ }
+
+ if (parse_message_content) {
+
+ msg.getMessage().getFrames().getContent(msgContent);
+
+ QPID_LOG(trace, "matches: message content is [" << msgContent << "]");
+
+ XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(),
+ msgContent.length(), "input" );
+
+ // This will parse the document using either Xerces or FastXDM, depending
+ // on your XQilla configuration. FastXDM can be as much as 10x faster.
+
+ Sequence seq(context->parseDocument(xml));
+
+ if(!seq.isEmpty() && seq.first()->isNode()) {
+ context->setContextItem(seq.first());
+ context->setContextPosition(1);
+ context->setContextSize(1);
+ }
+ }
+
+ if (args) {
+ FieldTable::ValueMap::const_iterator v = args->begin();
+ for(; v != args->end(); ++v) {
+ // ### TODO: Do types properly
+ if (v->second->convertsTo<std::string>()) {
+ QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str());
+ Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get());
+ context->setExternalVariable(X(v->first.c_str()), value);
+ }
+ }
+ }
+
+ Result result = query->execute(context.get());
+ return result->getEffectiveBooleanValue(context.get(), 0);
}
catch (XQException& e) {
- QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent);
- return 0;
+ QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent);
}
catch (...) {
- QPID_LOG(warning, "Unexpected error routing message: " << msgContent);
- return 0;
+ QPID_LOG(warning, "Unexpected error routing message: " << msgContent);
}
return 0;
}
+// Future optimization: If any query in a binding for a given routing key requires
+// message content, parse the message once, and use that parsed form for all bindings.
+//
+// Future optimization: XQilla does not currently do document projection for data
+// accessed via the context item. If there is a single query for a given routing key,
+// and it accesses document data, this could be a big win.
+//
+// Document projection often is not a win if you have multiple queries on the same data.
+// But for very large messages, if all these queries are on the first part of the data,
+// it could still be a big win.
+
void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args)
{
PreRoute pr(msg, this);
@@ -192,7 +214,7 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT
int count(0);
for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) {
- if ((*i)->xquery && matches((*i)->xquery, msg, args)) { // Overly defensive? There should always be a query ...
+ if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) {
msg.deliverTo((*i)->queue);
count++;
QPID_LOG(trace, "Delivered to queue" );