diff options
Diffstat (limited to 'cpp/src/qpid/broker/XmlExchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/XmlExchange.cpp | 152 |
1 files changed, 89 insertions, 63 deletions
diff --git a/cpp/src/qpid/broker/XmlExchange.cpp b/cpp/src/qpid/broker/XmlExchange.cpp index 8577e9211c..1d8f2ae8d8 100644 --- a/cpp/src/qpid/broker/XmlExchange.cpp +++ b/cpp/src/qpid/broker/XmlExchange.cpp @@ -77,34 +77,38 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable, bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments) { - RWlock::ScopedWlock l(lock); - XmlBinding::vector& bindings(bindingsMap[routingKey]); - XmlBinding::vector::iterator i; - string queryText = bindingArguments->getString("xquery"); - for (i = bindings.begin(); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; + try { + RWlock::ScopedWlock l(lock); + XmlBinding::vector& bindings(bindingsMap[routingKey]); + XmlBinding::vector::iterator i; - if (i == bindings.end()) { + for (i = bindings.begin(); i != bindings.end(); i++) + if ((*i)->queue == queue) + break; + + if (i == bindings.end()) { - try { Query query(xqilla.parse(X(queryText.c_str()))); XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query)); XmlBinding::vector bindings(1, binding); - bindingsMap[routingKey] = bindings; - } - catch (XQException& e) { - throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); + bindingsMap[routingKey] = bindings; + QPID_LOG(trace, "Bound successfully with query: " << queryText ); + + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_bindings (); + } + return true; + } else{ + return false; } - - if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); - } - return true; - } else{ - return false; + } + catch (XQException& e) { + throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); + } + catch (...) { + throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText)); } } @@ -138,22 +142,31 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F // Hack alert - the following code does not work for really large messages string msgContent; - msg.getMessage().getFrames().getContent(msgContent); - - boost::scoped_ptr<DynamicContext> context(query->createDynamicContext()); try { - XERCES_CPP_NAMESPACE::MemBufInputSource xml((XMLByte*)msgContent.c_str(), msgContent.length(), "input" ); + msg.getMessage().getFrames().getContent(msgContent); + + QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]"); + QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); + + boost::scoped_ptr<DynamicContext> context(query->createDynamicContext()); + if (!context.get()) { + throw InternalErrorException(QPID_MSG("Query context looks munged ...")); + } + + XERCES_CPP_NAMESPACE::MemBufInputSource xml((XMLByte*) msgContent.c_str(), msgContent.length(), "input" ); Sequence seq(context->parseDocument(xml)); - FieldTable::ValueMap::const_iterator v = args->begin(); - for(; v != args->end(); ++v) { - // ### TODO: Do types properly - if (v->second->convertsTo<std::string>()) { - QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); - Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); - context->setExternalVariable(X(v->first.c_str()), value); - } + if (args) { + FieldTable::ValueMap::const_iterator v = args->begin(); + for(; v != args->end(); ++v) { + // ### TODO: Do types properly + if (v->second->convertsTo<std::string>()) { + QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); + Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); + context->setExternalVariable(X(v->first.c_str()), value); + } + } } if(!seq.isEmpty() && seq.first()->isNode()) { @@ -166,45 +179,58 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F } catch (XQException& e) { QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent); + return 0; + } + catch (...) { + QPID_LOG(warning, "Unexpected error routing message: " << msgContent); + return 0; } return 0; } void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args) { - RWlock::ScopedRlock l(lock); - XmlBinding::vector& bindings(bindingsMap[routingKey]); - XmlBinding::vector::iterator i; - int count(0); - - for (i = bindings.begin(); i != bindings.end(); i++, count++) { - - if (matches((*i)->xquery, msg, args)) { - msg.deliverTo((*i)->queue); - - if ((*i)->mgmtBinding.get() != 0) - (*i)->mgmtBinding->inc_msgMatched (); - } - } - - if(!count){ - QPID_LOG(warning, "XMLExchange " << getName() << " could not route message with query " << routingKey); - if (mgmtExchange.get() != 0) { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - } - else { - if (mgmtExchange.get() != 0) { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } + try { + RWlock::ScopedRlock l(lock); + XmlBinding::vector& bindings(bindingsMap[routingKey]); + XmlBinding::vector::iterator i; + int count(0); + + for (i = bindings.begin(); i != bindings.end(); i++) { + + if ((*i)->xquery && matches((*i)->xquery, msg, args)) { // Overly defensive? There should always be a query ... + msg.deliverTo((*i)->queue); + count++; + QPID_LOG(trace, "Delivered to queue" ); + + if ((*i)->mgmtBinding.get() != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } + + if(!count){ + QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey); + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + } + else { + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } + + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + } + } } - - if (mgmtExchange.get() != 0) { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); + catch (...) { + QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey); } + } |