summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorJonathan Robie <jonathan@apache.org>2009-02-27 20:45:01 +0000
committerJonathan Robie <jonathan@apache.org>2009-02-27 20:45:01 +0000
commitc641e8cc88d1a28cd0e636f81878ec46098b55d9 (patch)
treec5a6b28d70f35cf1d893ab0c7d3ba07d34726778 /cpp/src
parentf7ea3f1a2c287eb294c69453fde1f406c6dd902b (diff)
downloadqpid-python-c641e8cc88d1a28cd0e636f81878ec46098b55d9.tar.gz
Modified so that the content of the message is only parsed if it is used in a binding query.
Headers can be queried without parsing the message. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@748689 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/xml/XmlExchange.cpp128
-rw-r--r--cpp/src/qpid/xml/XmlExchange.h5
2 files changed, 78 insertions, 55 deletions
diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp
index 5197b239d0..4e9de49ad5 100644
--- a/cpp/src/qpid/xml/XmlExchange.cpp
+++ b/cpp/src/qpid/xml/XmlExchange.cpp
@@ -33,6 +33,9 @@
#include "qpid/Plugin.h"
#include <xercesc/framework/MemBufInputSource.hpp>
+
+#include <xqilla/ast/XQGlobalVariable.hpp>
+
#include <xqilla/context/ItemFactory.hpp>
#include <xqilla/xqilla-simple.hpp>
@@ -62,17 +65,6 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-/*
- * Use the name of the query as the binding key.
- *
- * The first time a given name is used in a binding, the query body
- * must be provided.After that, no query body should be present.
- *
- * To modify an installed query, the user must first unbind the
- * existing query, then replace it by binding again with the same
- * name.
- *
- */
// #### TODO: The Binding should take the query text
// #### only. Consider encapsulating the entire block, including
@@ -94,6 +86,21 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const
bindings.add(binding);
QPID_LOG(trace, "Bound successfully with query: " << queryText );
+ binding->parse_message_content = false;
+
+ if (query->getQueryBody()->getStaticAnalysis().areContextFlagsUsed()) {
+ binding->parse_message_content = true;
+ }
+ else {
+ GlobalVariables &vars = const_cast<GlobalVariables&>(query->getVariables());
+ for(GlobalVariables::iterator it = vars.begin(); it != vars.end(); ++it) {
+ if ((*it)->getStaticAnalysis().areContextFlagsUsed()) {
+ binding->parse_message_content = true;
+ break;
+ }
+ }
+ }
+
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
@@ -126,59 +133,74 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons
}
}
-bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args)
+bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content)
{
- // ### TODO: Need istream for frameset
- // Hack alert - the following code does not work for really large messages
-
string msgContent;
try {
- msg.getMessage().getFrames().getContent(msgContent);
-
- QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]");
- QPID_LOG(trace, "matches: message content is [" << msgContent << "]");
-
- boost::scoped_ptr<DynamicContext> context(query->createDynamicContext());
- if (!context.get()) {
- throw InternalErrorException(QPID_MSG("Query context looks munged ..."));
- }
-
- XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(),
- msgContent.length(), "input" );
- Sequence seq(context->parseDocument(xml));
-
- if (args) {
- FieldTable::ValueMap::const_iterator v = args->begin();
- for(; v != args->end(); ++v) {
- // ### TODO: Do types properly
- if (v->second->convertsTo<std::string>()) {
- QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str());
- Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get());
- context->setExternalVariable(X(v->first.c_str()), value);
- }
- }
- }
-
- if(!seq.isEmpty() && seq.first()->isNode()) {
- context->setContextItem(seq.first());
- context->setContextPosition(1);
- context->setContextSize(1);
- }
- Result result = query->execute(context.get());
- return result->getEffectiveBooleanValue(context.get(), 0);
+ QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]");
+
+ boost::scoped_ptr<DynamicContext> context(query->createDynamicContext());
+ if (!context.get()) {
+ throw InternalErrorException(QPID_MSG("Query context looks munged ..."));
+ }
+
+ if (parse_message_content) {
+
+ msg.getMessage().getFrames().getContent(msgContent);
+
+ QPID_LOG(trace, "matches: message content is [" << msgContent << "]");
+
+ XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(),
+ msgContent.length(), "input" );
+
+ // This will parse the document using either Xerces or FastXDM, depending
+ // on your XQilla configuration. FastXDM can be as much as 10x faster.
+
+ Sequence seq(context->parseDocument(xml));
+
+ if(!seq.isEmpty() && seq.first()->isNode()) {
+ context->setContextItem(seq.first());
+ context->setContextPosition(1);
+ context->setContextSize(1);
+ }
+ }
+
+ if (args) {
+ FieldTable::ValueMap::const_iterator v = args->begin();
+ for(; v != args->end(); ++v) {
+ // ### TODO: Do types properly
+ if (v->second->convertsTo<std::string>()) {
+ QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str());
+ Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get());
+ context->setExternalVariable(X(v->first.c_str()), value);
+ }
+ }
+ }
+
+ Result result = query->execute(context.get());
+ return result->getEffectiveBooleanValue(context.get(), 0);
}
catch (XQException& e) {
- QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent);
- return 0;
+ QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent);
}
catch (...) {
- QPID_LOG(warning, "Unexpected error routing message: " << msgContent);
- return 0;
+ QPID_LOG(warning, "Unexpected error routing message: " << msgContent);
}
return 0;
}
+// Future optimization: If any query in a binding for a given routing key requires
+// message content, parse the message once, and use that parsed form for all bindings.
+//
+// Future optimization: XQilla does not currently do document projection for data
+// accessed via the context item. If there is a single query for a given routing key,
+// and it accesses document data, this could be a big win.
+//
+// Document projection often is not a win if you have multiple queries on the same data.
+// But for very large messages, if all these queries are on the first part of the data,
+// it could still be a big win.
+
void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args)
{
PreRoute pr(msg, this);
@@ -192,7 +214,7 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT
int count(0);
for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) {
- if ((*i)->xquery && matches((*i)->xquery, msg, args)) { // Overly defensive? There should always be a query ...
+ if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) {
msg.deliverTo((*i)->queue);
count++;
QPID_LOG(trace, "Delivered to queue" );
diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h
index 066a26489d..7bec480bde 100644
--- a/cpp/src/qpid/xml/XmlExchange.h
+++ b/cpp/src/qpid/xml/XmlExchange.h
@@ -46,9 +46,10 @@ class XmlExchange : public virtual Exchange {
typedef qpid::sys::CopyOnWriteArray<XmlBinding::shared_ptr> vector;
boost::shared_ptr<XQQuery> xquery;
+ bool parse_message_content;
XmlBinding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent, Query query):
- Binding(key, queue, parent), xquery(query) {}
+ Binding(key, queue, parent), xquery(query), parse_message_content(true) {}
};
@@ -58,7 +59,7 @@ class XmlExchange : public virtual Exchange {
XQilla xqilla;
qpid::sys::RWlock lock;
- bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args);
+ bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content);
public:
static const std::string typeName;