diff options
Diffstat (limited to 'cpp/src/qpid/broker/DirectExchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 97 |
1 files changed, 76 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 87f363ff3d..43b707a5c8 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -25,16 +25,37 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; +using qpid::management::Manageable; -DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {} -DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} +DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} + +DirectExchange::DirectExchange(const std::string& _name, bool _durable, + const FieldTable& _args, Manageable* _parent) : + Exchange(_name, _durable, _args, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){ RWlock::ScopedWlock l(lock); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); + std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); + std::vector<Binding::shared_ptr>::iterator i; + + for (i = queues.begin(); i != queues.end(); i++) + if ((*i)->queue == queue) + break; + if (i == queues.end()) { - bindings[routingKey].push_back(queue); + Binding::shared_ptr binding (new Binding (routingKey, queue, this)); + bindings[routingKey].push_back(binding); + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_bindings (); + } return true; } else{ return false; @@ -43,14 +64,21 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedWlock l(lock); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); + std::vector<Binding::shared_ptr>::iterator i; + + for (i = queues.begin(); i != queues.end(); i++) + if ((*i)->queue == queue) + break; - std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); if (i < queues.end()) { queues.erase(i); - if(queues.empty()){ + if (queues.empty()) { bindings.erase(routingKey); } + if (mgmtExchange.get() != 0) { + mgmtExchange->dec_bindings (); + } return true; } else { return false; @@ -59,38 +87,65 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedRlock l(lock); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); + std::vector<Binding::shared_ptr>::iterator i; int count(0); - for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){ - msg.deliverTo(*i); - } + + for(i = queues.begin(); i != queues.end(); i++, count++) { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding.get() != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } + if(!count){ QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey); + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + } + else { + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } + + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); } } bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) { + std::vector<Binding::shared_ptr>::iterator j; + if (routingKey) { Bindings::iterator i = bindings.find(*routingKey); - return i != bindings.end() && (!queue || find(i->second.begin(), i->second.end(), queue) != i->second.end()); + + if (i == bindings.end()) + return false; + if (!queue) + return true; + for (j = i->second.begin(); j != i->second.end(); j++) + if ((*j)->queue == queue) + return true; } else if (!queue) { //if no queue or routing key is specified, just report whether any bindings exist return bindings.size() > 0; } else { - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { - if (find(i->second.begin(), i->second.end(), queue) != i->second.end()) { - return true; - } - } + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) + for (j = i->second.begin(); j != i->second.end(); j++) + if ((*j)->queue == queue) + return true; return false; } -} - -DirectExchange::~DirectExchange(){ + return false; } +DirectExchange::~DirectExchange() {} const std::string DirectExchange::typeName("direct"); |