diff options
Diffstat (limited to 'cpp/src/qpid/broker/FanOutExchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 130 |
1 files changed, 63 insertions, 67 deletions
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 373e9ab1cc..6d840b50df 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -18,106 +18,102 @@ * under the License. * */ -#include "FanOutExchange.h" +#include "qpid/broker/FanOutExchange.h" #include <algorithm> using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; +namespace _qmf = qmf::org::apache::qpid::broker; -FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) : - Exchange(_name, _parent) +namespace +{ +const std::string qpidFedOp("qpid.fed.op"); +const std::string qpidFedTags("qpid.fed.tags"); +const std::string qpidFedOrigin("qpid.fed.origin"); + +const std::string fedOpBind("B"); +const std::string fedOpUnbind("U"); +const std::string fedOpReorigin("R"); +const std::string fedOpHello("H"); +} + +FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent, Broker* b) : + Exchange(_name, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, - const FieldTable& _args, Manageable* _parent) : - Exchange(_name, _durable, _args, _parent) + const FieldTable& _args, Manageable* _parent, Broker* b) : + Exchange(_name, _durable, _args, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } -bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ - RWlock::ScopedWlock locker(lock); - std::vector<Binding::shared_ptr>::iterator i; - - // Add if not already present. - for (i = bindings.begin (); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; - - if (i == bindings.end()) { - Binding::shared_ptr binding (new Binding ("", queue, this)); - bindings.push_back(binding); - if (mgmtExchange != 0) { - mgmtExchange->inc_bindingCount(); - ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); +bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args) +{ + string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind); + string fedTags(args ? args->getAsString(qpidFedTags) : ""); + string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); + bool propagate = false; + + if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { + Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin)); + if (bindings.add_unless(binding, MatchQueue(queue))) { + propagate = fedBinding.addOrigin(fedOrigin); + if (mgmtExchange != 0) { + mgmtExchange->inc_bindingCount(); + } + } else { + return false; + } + } else if (fedOp == fedOpUnbind) { + propagate = fedBinding.delOrigin(fedOrigin); + if (fedBinding.count() == 0) + unbind(queue, "", 0); + } else if (fedOp == fedOpReorigin) { + if (fedBinding.hasLocal()) { + propagateFedOp(string(), string(), fedOpBind, string()); } - return true; - } else { - return false; } -} -bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ - RWlock::ScopedWlock locker(lock); - std::vector<Binding::shared_ptr>::iterator i; + routeIVE(); + if (propagate) + propagateFedOp(string(), fedTags, fedOp, fedOrigin); + return true; +} - for (i = bindings.begin (); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; +bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/) +{ + bool propagate = false; - if (i != bindings.end()) { - bindings.erase(i); + if (bindings.remove_if(MatchQueue(queue))) { + propagate = fedBinding.delOrigin(); if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); - ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); } - return true; } else { return false; } -} - -void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ - RWlock::ScopedRlock locker(lock); - uint32_t count(0); - - for(std::vector<Binding::shared_ptr>::iterator i = bindings.begin(); i != bindings.end(); ++i, count++){ - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); - } - if (mgmtExchange != 0) - { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - if (count == 0) - { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - else - { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } + if (propagate) + propagateFedOp(string(), string(), fedOpUnbind, string()); + return true; } +void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/) +{ + PreRoute pr(msg, this); + doRoute(msg, bindings.snapshot()); +} + bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const) { - std::vector<Binding::shared_ptr>::iterator i; - - for (i = bindings.begin (); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; - - return i != bindings.end(); + BindingsArray::ConstPtr ptr = bindings.snapshot(); + return ptr && std::find_if(ptr->begin(), ptr->end(), MatchQueue(queue)) != ptr->end(); } |