diff options
author | Gordon Sim <gsim@apache.org> | 2008-09-08 19:23:30 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-09-08 19:23:30 +0000 |
commit | e884da88a52c44e238b05b66c60136d4e83c55fb (patch) | |
tree | bcfedf0ce0fba30009ee8600f58ec8f2c3d502a1 | |
parent | 7d4317b2ff39b61f529a2e7c187238c1370cae5c (diff) | |
download | qpid-python-e884da88a52c44e238b05b66c60136d4e83c55fb.tar.gz |
Fixes to xml exchange:
* changed locking for QPID-1264
* allow multiple queues to be bound with the same binding key
* correct log message and management stats update on route
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@693208 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/XmlExchange.cpp | 139 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/XmlExchange.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/tests/XmlClientSessionTest.cpp | 30 |
3 files changed, 91 insertions, 81 deletions
diff --git a/qpid/cpp/src/qpid/broker/XmlExchange.cpp b/qpid/cpp/src/qpid/broker/XmlExchange.cpp index cb0f9a9606..b51791796d 100644 --- a/qpid/cpp/src/qpid/broker/XmlExchange.cpp +++ b/qpid/cpp/src/qpid/broker/XmlExchange.cpp @@ -81,29 +81,23 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const try { RWlock::ScopedWlock l(lock); - XmlBinding::vector& bindings(bindingsMap[routingKey]); - XmlBinding::vector::iterator i; - for (i = bindings.begin(); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; - - if (i == bindings.end()) { - - Query query(xqilla.parse(X(queryText.c_str()))); - XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query)); - XmlBinding::vector bindings(1, binding); - bindingsMap[routingKey] = bindings; - QPID_LOG(trace, "Bound successfully with query: " << queryText ); - - if (mgmtExchange != 0) { - mgmtExchange->inc_bindingCount(); - ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); - } - return true; - } else{ - return false; - } + XmlBinding::vector& bindings(bindingsMap[routingKey]); + XmlBinding::vector::ConstPtr p = bindings.snapshot(); + if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == p->end()) { + Query query(xqilla.parse(X(queryText.c_str()))); + XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query)); + bindings.add(binding); + QPID_LOG(trace, "Bound successfully with query: " << queryText ); + + if (mgmtExchange != 0) { + mgmtExchange->inc_bindingCount(); + ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); + } + return true; + } else { + return false; + } } catch (XQException& e) { throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); @@ -116,25 +110,14 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/) { RWlock::ScopedWlock l(lock); - XmlBinding::vector& bindings(bindingsMap[routingKey]); - XmlBinding::vector::iterator i; - - for (i = bindings.begin(); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; - - if (i < bindings.end()) { - bindings.erase(i); - if (bindings.empty()) { - bindingsMap.erase(routingKey); - } + if (bindingsMap[routingKey].remove_if(MatchQueue(queue))) { if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); } return true; } else { - return false; + return false; } } @@ -193,13 +176,15 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args) { try { - RWlock::ScopedRlock l(lock); - XmlBinding::vector& bindings(bindingsMap[routingKey]); - XmlBinding::vector::iterator i; + XmlBinding::vector::ConstPtr p; + { + RWlock::ScopedRlock l(lock); + p = bindingsMap[routingKey].snapshot(); + if (!p) return; + } int count(0); - for (i = bindings.begin(); i != bindings.end(); i++) { - + for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) { if ((*i)->xquery && matches((*i)->xquery, msg, args)) { // Overly defensive? There should always be a query ... msg.deliverTo((*i)->queue); count++; @@ -208,28 +193,25 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT if ((*i)->mgmtBinding != 0) (*i)->mgmtBinding->inc_msgMatched (); } - - if(!count){ - QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - } - else { - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - } - } - } - catch (...) { + } + if (!count) { + QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey); + if (mgmtExchange != 0) { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + } else { + if (mgmtExchange != 0) { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } + + if (mgmtExchange != 0) { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + } + } catch (...) { QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey); } @@ -239,30 +221,27 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) { - XmlBinding::vector::iterator j; - + RWlock::ScopedRlock l(lock); if (routingKey) { - XmlBindingsMap::iterator i = bindingsMap.find(*routingKey); + XmlBindingsMap::iterator i = bindingsMap.find(*routingKey); - if (i == bindingsMap.end()) - return false; - if (!queue) - return true; - for (j = i->second.begin(); j != i->second.end(); j++) - if ((*j)->queue == queue) - return true; + if (i == bindingsMap.end()) + return false; + if (!queue) + return true; + XmlBinding::vector::ConstPtr p = i->second.snapshot(); + return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end(); } else if (!queue) { - //if no queue or routing key is specified, just report whether any bindings exist - return bindingsMap.size() > 0; + //if no queue or routing key is specified, just report whether any bindings exist + return bindingsMap.size() > 0; } else { - for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); i++) - for (j = i->second.begin(); j != i->second.end(); j++) - if ((*j)->queue == queue) - return true; - return false; + for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); i++) { + XmlBinding::vector::ConstPtr p = i->second.snapshot(); + if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true; + } + return false; } - return false; } diff --git a/qpid/cpp/src/qpid/broker/XmlExchange.h b/qpid/cpp/src/qpid/broker/XmlExchange.h index 883bfceaca..57d6c26e0d 100644 --- a/qpid/cpp/src/qpid/broker/XmlExchange.h +++ b/qpid/cpp/src/qpid/broker/XmlExchange.h @@ -23,6 +23,7 @@ #include "Exchange.h" #include "qpid/framing/FieldTable.h" +#include "qpid/sys/CopyOnWriteArray.h" #include "qpid/sys/Monitor.h" #include "Queue.h" @@ -42,7 +43,7 @@ class XmlExchange : public virtual Exchange { struct XmlBinding : public Exchange::Binding { typedef boost::shared_ptr<XmlBinding> shared_ptr; - typedef std::vector<XmlBinding::shared_ptr> vector; + typedef qpid::sys::CopyOnWriteArray<XmlBinding::shared_ptr> vector; boost::shared_ptr<XQQuery> xquery; diff --git a/qpid/cpp/src/tests/XmlClientSessionTest.cpp b/qpid/cpp/src/tests/XmlClientSessionTest.cpp index d0a1520c81..fc92a338a8 100644 --- a/qpid/cpp/src/tests/XmlClientSessionTest.cpp +++ b/qpid/cpp/src/tests/XmlClientSessionTest.cpp @@ -146,6 +146,36 @@ QPID_AUTO_TEST_CASE(testXmlBinding) { BOOST_CHECK_EQUAL(m, m2.getData()); } +/** + * Ensure that multiple queues can be bound using the same routing key + */ +QPID_AUTO_TEST_CASE(testBindMultipleQueues) { + ClientSessionFixture f; + + f.session.exchangeDeclare(arg::exchange="xml", arg::type="xml"); + f.session.queueDeclare(arg::queue="blue", arg::exclusive=true, arg::autoDelete=true); + f.session.queueDeclare(arg::queue="red", arg::exclusive=true, arg::autoDelete=true); + + FieldTable blue; + blue.setString("xquery", "./colour = 'blue'"); + f.session.exchangeBind(arg::exchange="xml", arg::queue="blue", arg::bindingKey="by-colour", arg::arguments=blue); + FieldTable red; + red.setString("xquery", "./colour = 'red'"); + f.session.exchangeBind(arg::exchange="xml", arg::queue="red", arg::bindingKey="by-colour", arg::arguments=red); + + Message sent1("<colour>blue</colour>", "by-colour"); + f.session.messageTransfer(arg::content=sent1, arg::destination="xml"); + + Message sent2("<colour>red</colour>", "by-colour"); + f.session.messageTransfer(arg::content=sent2, arg::destination="xml"); + + Message received; + BOOST_CHECK(f.subs.get(received, "blue")); + BOOST_CHECK_EQUAL(sent1.getData(), received.getData()); + BOOST_CHECK(f.subs.get(received, "red")); + BOOST_CHECK_EQUAL(sent2.getData(), received.getData()); +} + //### Test: Bad XML does not kill the server //### Test: Bad XQuery does not kill the server |