summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/FanOutExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/FanOutExchange.cpp')
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp71
1 files changed, 63 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index ea2327c788..714d4ea444 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -25,15 +25,36 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
-FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) :
+ Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+FanOutExchange::FanOutExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
RWlock::ScopedWlock locker(lock);
+ std::vector<Binding::shared_ptr>::iterator i;
+
// Add if not already present.
- Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ for (i = bindings.begin (); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
if (i == bindings.end()) {
- bindings.push_back(queue);
+ Binding::shared_ptr binding (new Binding ("", queue, this));
+ bindings.push_back(binding);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
} else {
return false;
@@ -42,9 +63,17 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/,
bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
RWlock::ScopedWlock locker(lock);
- Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = bindings.begin (); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
if (i != bindings.end()) {
bindings.erase(i);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
} else {
return false;
@@ -53,14 +82,40 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
RWlock::ScopedRlock locker(lock);
- for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
- msg.deliverTo(*i);
+ uint32_t count(0);
+
+ for(std::vector<Binding::shared_ptr>::iterator i = bindings.begin(); i != bindings.end(); ++i, count++){
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding.get() != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+
+ if (mgmtExchange.get() != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
}
}
bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
{
- return std::find(bindings.begin(), bindings.end(), queue) != bindings.end();
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = bindings.begin (); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
+ return i != bindings.end();
}