diff options
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 33 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 30 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 32 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 29 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 43 | ||||
-rw-r--r-- | cpp/src/qpid/xml/XmlExchange.cpp | 39 |
7 files changed, 67 insertions, 142 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index b9f24dee5f..29fe47beac 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -145,39 +145,12 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) { PreRoute pr(msg, this); - Queues::ConstPtr p; + ConstBindingList b; { Mutex::ScopedLock l(lock); - p = bindings[routingKey].queues.snapshot(); - } - int count(0); - - if (p) { - for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) { - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); - } - } - - if(!count){ - QPID_LOG(info, "DirectExchange " << getName() << " could not route message with key " << routingKey - << "; no matching binding found"); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgDrops(); - mgmtExchange->inc_byteDrops(msg.contentSize()); - } - } else { - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes(count); - mgmtExchange->inc_byteRoutes(count * msg.contentSize()); - } - } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives(); - mgmtExchange->inc_byteReceives(msg.contentSize()); + b = bindings[routingKey].queues.snapshot(); } + doRoute(msg, b); } diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 90d81b81c6..9852f84f5b 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -76,6 +76,36 @@ Exchange::PreRoute::~PreRoute(){ } } +void Exchange::doRoute(Deliverable& msg, ConstBindingList b) +{ + int count = 0; + + if (b.get()) { + for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); + } + } + + if (mgmtExchange != 0) + { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + if (count == 0) + { + //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found"); + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + else + { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } +} + void Exchange::routeIVE(){ if (ive && lastMsg.get()){ DeliverableMessage dmsg(lastMsg); diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index c1e878200f..9bea376c28 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -79,6 +79,9 @@ protected: Exchange* parent; }; + typedef boost::shared_ptr<const std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > ConstBindingList; + typedef boost::shared_ptr< std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList; + void doRoute(Deliverable& msg, ConstBindingList b); void routeIVE(); diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index e9007ba682..b7d46a33fe 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -106,36 +106,12 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons return true; } -void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ +void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/) +{ PreRoute pr(msg, this); - uint32_t count(0); - - BindingsArray::ConstPtr p = bindings.snapshot(); - if (p.get()){ - for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){ - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); - } - } - - if (mgmtExchange != 0) - { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - if (count == 0) - { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - else - { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } + doRoute(msg, bindings.snapshot()); } - + bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const) { BindingsArray::ConstPtr ptr = bindings.snapshot(); diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index c628c44909..a7c90156e1 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -104,7 +104,8 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, } -void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ +void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args) +{ if (!args) { //can't match if there were no headers passed in if (mgmtExchange != 0) { @@ -118,31 +119,17 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons PreRoute pr(msg, this); - uint32_t count(0); - - Bindings::ConstPtr p = bindings.snapshot(); - if (p.get()){ + ConstBindingList p = bindings.snapshot(); + BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); + if (p.get()) + { for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) { if (match((*i)->args, *args)) { - msg.deliverTo((*i)->queue); - count++; - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); + b->push_back(*i); } } } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives(); - mgmtExchange->inc_byteReceives(msg.contentSize()); - if (count == 0) { - mgmtExchange->inc_msgDrops(); - mgmtExchange->inc_byteDrops(msg.contentSize()); - } else { - mgmtExchange->inc_msgRoutes(count); - mgmtExchange->inc_byteRoutes(count * msg.contentSize()); - } - } + doRoute(msg, b); } diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 6bf0b104ea..cb04742677 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -293,44 +293,23 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern) return q != qv.end(); } -void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ +void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) +{ Binding::vector mb; + BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); PreRoute pr(msg, this); - uint32_t count(0); - { - RWlock::ScopedRlock l(lock); - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, routingKey)) { - Binding::vector& qv(i->second.bindingVector); - for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){ - mb.push_back(*j); + RWlock::ScopedRlock l(lock); + for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (match(i->first, routingKey)) { + Binding::vector& qv(i->second.bindingVector); + for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){ + b->push_back(*j); + } } } } - } - - for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) { - msg.deliverTo((*j)->queue); - if ((*j)->mgmtBinding != 0) - (*j)->mgmtBinding->inc_msgMatched (); - } - - if (mgmtExchange != 0) - { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - if (count == 0) - { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - else - { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } + doRoute(msg, b); } bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp index 8a1ef6149e..472ca28954 100644 --- a/cpp/src/qpid/xml/XmlExchange.cpp +++ b/cpp/src/qpid/xml/XmlExchange.cpp @@ -206,45 +206,22 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT PreRoute pr(msg, this); try { XmlBinding::vector::ConstPtr p; - { + BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); + { RWlock::ScopedRlock l(lock); - p = bindingsMap[routingKey].snapshot(); - if (!p) return; - } - int count(0); + p = bindingsMap[routingKey].snapshot(); + if (!p.get()) return; + } for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) { if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { - msg.deliverTo((*i)->queue); - count++; - QPID_LOG(trace, "Delivered to queue" ); - - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); + b->push_back(*i); } - } - if (!count) { - QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - } else { - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - } + } + doRoute(msg, b); } catch (...) { QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey); } - - } |