diff options
Diffstat (limited to 'cpp/src/qpid/broker/FanOutExchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 59 |
1 files changed, 48 insertions, 11 deletions
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 2628d8952f..aa1f7ff30a 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -26,6 +26,18 @@ using namespace qpid::framing; using namespace qpid::sys; namespace _qmf = qmf::org::apache::qpid::broker; +namespace +{ +const std::string qpidFedOp("qpid.fed.op"); +const std::string qpidFedTags("qpid.fed.tags"); +const std::string qpidFedOrigin("qpid.fed.origin"); + +const std::string fedOpBind("B"); +const std::string fedOpUnbind("U"); +const std::string fedOpReorigin("R"); +const std::string fedOpHello("H"); +} + FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) : Exchange(_name, _parent) { @@ -41,32 +53,57 @@ FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, mgmtExchange->set_type (typeName); } -bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/) +bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args) { - Binding::shared_ptr binding (new Binding ("", queue, this)); - if (bindings.add_unless(binding, MatchQueue(queue))) { - if (mgmtExchange != 0) { - mgmtExchange->inc_bindingCount(); - ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); + string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind); + string fedTags(args ? args->getAsString(qpidFedTags) : ""); + string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); + bool propagate = false; + + if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { + Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin)); + if (bindings.add_unless(binding, MatchQueue(queue))) { + propagate = fedBinding.addOrigin(fedOrigin); + if (mgmtExchange != 0) { + mgmtExchange->inc_bindingCount(); + ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); + } + } else { + return false; + } + } else if (fedOp == fedOpUnbind) { + propagate = fedBinding.delOrigin(fedOrigin); + if (fedBinding.count() == 0) + unbind(queue, "", 0); + } else if (fedOp == fedOpReorigin) { + if (fedBinding.hasLocal()) { + propagateFedOp(string(), string(), fedOpBind, string()); } - routeIVE(); - return true; - } else { - return false; } + + routeIVE(); + if (propagate) + propagateFedOp(string(), fedTags, fedOp, fedOrigin); + return true; } bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/) { + bool propagate = false; + if (bindings.remove_if(MatchQueue(queue))) { + propagate = fedBinding.delOrigin(); if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount(); } - return true; } else { return false; } + + if (propagate) + propagateFedOp(string(), string(), fedOpUnbind, string()); + return true; } void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ |