summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/XmlExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/XmlExchange.cpp')
-rw-r--r--cpp/src/qpid/broker/XmlExchange.cpp152
1 files changed, 89 insertions, 63 deletions
diff --git a/cpp/src/qpid/broker/XmlExchange.cpp b/cpp/src/qpid/broker/XmlExchange.cpp
index 8577e9211c..1d8f2ae8d8 100644
--- a/cpp/src/qpid/broker/XmlExchange.cpp
+++ b/cpp/src/qpid/broker/XmlExchange.cpp
@@ -77,34 +77,38 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable,
bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments)
{
- RWlock::ScopedWlock l(lock);
- XmlBinding::vector& bindings(bindingsMap[routingKey]);
- XmlBinding::vector::iterator i;
-
string queryText = bindingArguments->getString("xquery");
- for (i = bindings.begin(); i != bindings.end(); i++)
- if ((*i)->queue == queue)
- break;
+ try {
+ RWlock::ScopedWlock l(lock);
+ XmlBinding::vector& bindings(bindingsMap[routingKey]);
+ XmlBinding::vector::iterator i;
- if (i == bindings.end()) {
+ for (i = bindings.begin(); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
+ if (i == bindings.end()) {
- try {
Query query(xqilla.parse(X(queryText.c_str())));
XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query));
XmlBinding::vector bindings(1, binding);
- bindingsMap[routingKey] = bindings;
- }
- catch (XQException& e) {
- throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText));
+ bindingsMap[routingKey] = bindings;
+ QPID_LOG(trace, "Bound successfully with query: " << queryText );
+
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
+ return true;
+ } else{
+ return false;
}
-
- if (mgmtExchange.get() != 0) {
- mgmtExchange->inc_bindings ();
- }
- return true;
- } else{
- return false;
+ }
+ catch (XQException& e) {
+ throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText));
+ }
+ catch (...) {
+ throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText));
}
}
@@ -138,22 +142,31 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F
// Hack alert - the following code does not work for really large messages
string msgContent;
- msg.getMessage().getFrames().getContent(msgContent);
-
- boost::scoped_ptr<DynamicContext> context(query->createDynamicContext());
try {
- XERCES_CPP_NAMESPACE::MemBufInputSource xml((XMLByte*)msgContent.c_str(), msgContent.length(), "input" );
+ msg.getMessage().getFrames().getContent(msgContent);
+
+ QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]");
+ QPID_LOG(trace, "matches: message content is [" << msgContent << "]");
+
+ boost::scoped_ptr<DynamicContext> context(query->createDynamicContext());
+ if (!context.get()) {
+ throw InternalErrorException(QPID_MSG("Query context looks munged ..."));
+ }
+
+ XERCES_CPP_NAMESPACE::MemBufInputSource xml((XMLByte*) msgContent.c_str(), msgContent.length(), "input" );
Sequence seq(context->parseDocument(xml));
- FieldTable::ValueMap::const_iterator v = args->begin();
- for(; v != args->end(); ++v) {
- // ### TODO: Do types properly
- if (v->second->convertsTo<std::string>()) {
- QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str());
- Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get());
- context->setExternalVariable(X(v->first.c_str()), value);
- }
+ if (args) {
+ FieldTable::ValueMap::const_iterator v = args->begin();
+ for(; v != args->end(); ++v) {
+ // ### TODO: Do types properly
+ if (v->second->convertsTo<std::string>()) {
+ QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str());
+ Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get());
+ context->setExternalVariable(X(v->first.c_str()), value);
+ }
+ }
}
if(!seq.isEmpty() && seq.first()->isNode()) {
@@ -166,45 +179,58 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F
}
catch (XQException& e) {
QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent);
+ return 0;
+ }
+ catch (...) {
+ QPID_LOG(warning, "Unexpected error routing message: " << msgContent);
+ return 0;
}
return 0;
}
void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args)
{
- RWlock::ScopedRlock l(lock);
- XmlBinding::vector& bindings(bindingsMap[routingKey]);
- XmlBinding::vector::iterator i;
- int count(0);
-
- for (i = bindings.begin(); i != bindings.end(); i++, count++) {
-
- if (matches((*i)->xquery, msg, args)) {
- msg.deliverTo((*i)->queue);
-
- if ((*i)->mgmtBinding.get() != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
- }
- }
-
- if(!count){
- QPID_LOG(warning, "XMLExchange " << getName() << " could not route message with query " << routingKey);
- if (mgmtExchange.get() != 0) {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- }
- else {
- if (mgmtExchange.get() != 0) {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
+ try {
+ RWlock::ScopedRlock l(lock);
+ XmlBinding::vector& bindings(bindingsMap[routingKey]);
+ XmlBinding::vector::iterator i;
+ int count(0);
+
+ for (i = bindings.begin(); i != bindings.end(); i++) {
+
+ if ((*i)->xquery && matches((*i)->xquery, msg, args)) { // Overly defensive? There should always be a query ...
+ msg.deliverTo((*i)->queue);
+ count++;
+ QPID_LOG(trace, "Delivered to queue" );
+
+ if ((*i)->mgmtBinding.get() != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+
+ if(!count){
+ QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ }
+ else {
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
+
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ }
+ }
}
-
- if (mgmtExchange.get() != 0) {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
+ catch (...) {
+ QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey);
}
+
}