summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/TopicExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/TopicExchange.cpp')
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp77
1 files changed, 61 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index bc761cf34d..5330ee4fd0 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -115,9 +115,19 @@ bool TopicPattern::match(const Tokens& target) const
return do_match(begin(), end(), target.begin(), target.end());
}
-TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
-TopicExchange::TopicExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+TopicExchange::TopicExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+TopicExchange::TopicExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
@@ -125,7 +135,11 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
if (isBound(queue, routingPattern)) {
return false;
} else {
- bindings[routingPattern].push_back(queue);
+ Binding::shared_ptr binding (new Binding (routingKey, queue, this));
+ bindings[routingPattern].push_back(binding);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
}
}
@@ -133,12 +147,19 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
- Queue::vector& qv(bi->second);
+ Binding::vector& qv(bi->second);
if (bi == bindings.end()) return false;
- Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
+
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++)
+ if ((*q)->queue == queue)
+ break;
if(q == qv.end()) return false;
qv.erase(q);
if(qv.empty()) bindings.erase(bi);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
}
@@ -146,21 +167,45 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern)
{
BindingMap::iterator bi = bindings.find(pattern);
if (bi == bindings.end()) return false;
- Queue::vector& qv(bi->second);
- return find(qv.begin(), qv.end(), queue) != qv.end();
+ Binding::vector& qv(bi->second);
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++)
+ if ((*q)->queue == queue)
+ break;
+ return q != qv.end();
}
void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedRlock l(lock);
+ uint32_t count(0);
Tokens tokens(routingKey);
+
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (i->first.match(tokens)) {
- Queue::vector& qv(i->second);
- for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
- msg.deliverTo(*j);
+ Binding::vector& qv(i->second);
+ for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
+ msg.deliverTo((*j)->queue);
+ if ((*j)->mgmtBinding.get() != 0)
+ (*j)->mgmtBinding->inc_msgMatched ();
}
}
}
+
+ if (mgmtExchange.get() != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
}
bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
@@ -176,16 +221,16 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing
return true;
}
}
- return false;
} else {
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- Queue::vector& qv(i->second);
- if (find(qv.begin(), qv.end(), queue) != qv.end()) {
- return true;
- }
+ Binding::vector& qv(i->second);
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++)
+ if ((*q)->queue == queue)
+ return true;
}
- return false;
}
+ return false;
}
TopicExchange::~TopicExchange() {}