diff options
Diffstat (limited to 'cpp/src/qpid/broker/TopicExchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 77 |
1 files changed, 61 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index bc761cf34d..5330ee4fd0 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -115,9 +115,19 @@ bool TopicPattern::match(const Tokens& target) const return do_match(begin(), end(), target.begin(), target.end()); } -TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { } -TopicExchange::TopicExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} +TopicExchange::TopicExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} +TopicExchange::TopicExchange(const std::string& _name, bool _durable, + const FieldTable& _args, Manageable* _parent) : + Exchange(_name, _durable, _args, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedWlock l(lock); @@ -125,7 +135,11 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons if (isBound(queue, routingPattern)) { return false; } else { - bindings[routingPattern].push_back(queue); + Binding::shared_ptr binding (new Binding (routingKey, queue, this)); + bindings[routingPattern].push_back(binding); + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_bindings (); + } return true; } } @@ -133,12 +147,19 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedWlock l(lock); BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); - Queue::vector& qv(bi->second); + Binding::vector& qv(bi->second); if (bi == bindings.end()) return false; - Queue::vector::iterator q = find(qv.begin(), qv.end(), queue); + + Binding::vector::iterator q; + for (q = qv.begin(); q != qv.end(); q++) + if ((*q)->queue == queue) + break; if(q == qv.end()) return false; qv.erase(q); if(qv.empty()) bindings.erase(bi); + if (mgmtExchange.get() != 0) { + mgmtExchange->dec_bindings (); + } return true; } @@ -146,21 +167,45 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern) { BindingMap::iterator bi = bindings.find(pattern); if (bi == bindings.end()) return false; - Queue::vector& qv(bi->second); - return find(qv.begin(), qv.end(), queue) != qv.end(); + Binding::vector& qv(bi->second); + Binding::vector::iterator q; + for (q = qv.begin(); q != qv.end(); q++) + if ((*q)->queue == queue) + break; + return q != qv.end(); } void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedRlock l(lock); + uint32_t count(0); Tokens tokens(routingKey); + for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (i->first.match(tokens)) { - Queue::vector& qv(i->second); - for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){ - msg.deliverTo(*j); + Binding::vector& qv(i->second); + for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){ + msg.deliverTo((*j)->queue); + if ((*j)->mgmtBinding.get() != 0) + (*j)->mgmtBinding->inc_msgMatched (); } } } + + if (mgmtExchange.get() != 0) + { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + if (count == 0) + { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + else + { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } } bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) @@ -176,16 +221,16 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing return true; } } - return false; } else { for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - Queue::vector& qv(i->second); - if (find(qv.begin(), qv.end(), queue) != qv.end()) { - return true; - } + Binding::vector& qv(i->second); + Binding::vector::iterator q; + for (q = qv.begin(); q != qv.end(); q++) + if ((*q)->queue == queue) + return true; } - return false; } + return false; } TopicExchange::~TopicExchange() {} |