diff options
Diffstat (limited to 'cpp/src/qpid/broker/FanOutExchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 71 |
1 files changed, 63 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index ea2327c788..714d4ea444 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -25,15 +25,36 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {} -FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} +FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) : + Exchange(_name, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} + +FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, + const FieldTable& _args, Manageable* _parent) : + Exchange(_name, _durable, _args, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ RWlock::ScopedWlock locker(lock); + std::vector<Binding::shared_ptr>::iterator i; + // Add if not already present. - Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); + for (i = bindings.begin (); i != bindings.end(); i++) + if ((*i)->queue == queue) + break; + if (i == bindings.end()) { - bindings.push_back(queue); + Binding::shared_ptr binding (new Binding ("", queue, this)); + bindings.push_back(binding); + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_bindings (); + } return true; } else { return false; @@ -42,9 +63,17 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ RWlock::ScopedWlock locker(lock); - Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); + std::vector<Binding::shared_ptr>::iterator i; + + for (i = bindings.begin (); i != bindings.end(); i++) + if ((*i)->queue == queue) + break; + if (i != bindings.end()) { bindings.erase(i); + if (mgmtExchange.get() != 0) { + mgmtExchange->dec_bindings (); + } return true; } else { return false; @@ -53,14 +82,40 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ RWlock::ScopedRlock locker(lock); - for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){ - msg.deliverTo(*i); + uint32_t count(0); + + for(std::vector<Binding::shared_ptr>::iterator i = bindings.begin(); i != bindings.end(); ++i, count++){ + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding.get() != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } + + if (mgmtExchange.get() != 0) + { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + if (count == 0) + { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + else + { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } } } bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const) { - return std::find(bindings.begin(), bindings.end(), queue) != bindings.end(); + std::vector<Binding::shared_ptr>::iterator i; + + for (i = bindings.begin (); i != bindings.end(); i++) + if ((*i)->queue == queue) + break; + + return i != bindings.end(); } |