diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index f6411c98dd..0965381fcd 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -37,9 +37,11 @@ #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" #include "qpid/broker/AclModule.h" +#include "qpid/broker/FedOps.h" #include <boost/bind.hpp> #include <boost/format.hpp> +#include <boost/tuple/tuple_comparison.hpp> #include <iostream> #include <sstream> @@ -48,6 +50,11 @@ #include <assert.h> +namespace { +const std::string X_SCOPE("x-scope"); +const std::string SESSION("session"); +} + namespace qpid { namespace broker { @@ -87,6 +94,7 @@ void SemanticState::closed() { if (dtxBuffer.get()) { dtxBuffer->fail(); } + unbindSessionBindings(); requeue(); //now unsubscribe, which may trigger queue deletion and thus @@ -803,4 +811,63 @@ void SemanticState::detached() } } +void SemanticState::addBinding(const string& queueName, const string& exchangeName, + const string& routingKey, const framing::FieldTable& arguments) +{ + QPID_LOG (debug, "SemanticState::addBinding [" + << "queue=" << queueName << ", " + << "exchange=" << exchangeName << ", " + << "key=" << routingKey << ", " + << "args=" << arguments << "]"); + std::string fedOp = arguments.getAsString(qpidFedOp); + if ((arguments.isSet(qpidFedOp)) && (fedOp.empty())) { + fedOp = fedOpBind; + } + std::string fedOrigin = arguments.getAsString(qpidFedOrigin); + if ((arguments.getAsString(X_SCOPE) == SESSION) || (fedOp == fedOpBind)) { + bindings.insert(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin)); + } + else if (fedOp == fedOpUnbind) { + bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin)); + } +} + +void SemanticState::removeBinding(const string& queueName, const string& exchangeName, + const string& routingKey) +{ + QPID_LOG (debug, "SemanticState::removeBinding [" + << "queue=" << queueName << ", " + << "exchange=" << exchangeName << ", " + << "key=" << routingKey) + bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, "")); +} + +void SemanticState::unbindSessionBindings() +{ + //unbind session-scoped bindings + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { + QPID_LOG (debug, "SemanticState::unbindSessionBindings [" + << "queue=" << i->get<0>() << ", " + << "exchange=" << i->get<1>()<< ", " + << "key=" << i->get<2>() << ", " + << "fedOrigin=" << i->get<3>() << "]"); + try { + std::string fedOrigin = i->get<3>(); + if (!fedOrigin.empty()) { + framing::FieldTable fedArguments; + fedArguments.setString(qpidFedOp, fedOpUnbind); + fedArguments.setString(qpidFedOrigin, fedOrigin); + session.getBroker().bind(i->get<0>(), i->get<1>(), i->get<2>(), fedArguments, + userID, connectionId); + } else { + session.getBroker().unbind(i->get<0>(), i->get<1>(), i->get<2>(), + userID, connectionId); + } + } + catch (...) { + } + } + bindings.clear(); +} + }} // namespace qpid::broker |
