diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionAdapter.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 390 |
1 files changed, 218 insertions, 172 deletions
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 03022b00bb..a7743d95ab 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -15,17 +15,23 @@ * limitations under the License. * */ -#include "SessionAdapter.h" -#include "Connection.h" -#include "DeliveryToken.h" -#include "MessageDelivery.h" -#include "Queue.h" +#include "qpid/broker/SessionAdapter.h" +#include "qpid/broker/Connection.h" +#include "qpid/broker/Queue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/constants.h" +#include "qpid/framing/enum.h" #include "qpid/log/Statement.h" -#include "qpid/amqp_0_10/exceptions.h" #include "qpid/framing/SequenceSet.h" +#include "qpid/management/ManagementAgent.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" +#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" +#include "qmf/org/apache/qpid/broker/EventBind.h" +#include "qmf/org/apache/qpid/broker/EventUnbind.h" +#include "qmf/org/apache/qpid/broker/EventSubscribe.h" +#include "qmf/org/apache/qpid/broker/EventUnsubscribe.h" #include <boost/format.hpp> #include <boost/cast.hpp> #include <boost/bind.hpp> @@ -35,6 +41,9 @@ namespace broker { using namespace qpid; using namespace qpid::framing; +using namespace qpid::framing::dtx; +using namespace qpid::management; +namespace _qmf = qmf::org::apache::qpid::broker; typedef std::vector<Queue::shared_ptr> QueueVector; @@ -48,23 +57,24 @@ SessionAdapter::SessionAdapter(SemanticState& s) : dtxImpl(s) {} +static const std::string _TRUE("true"); +static const std::string _FALSE("false"); void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type, const string& alternateExchange, bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ - AclModule* acl = getBroker().getAcl(); - if (acl) - { - std::map<std::string, std::string> params; - params.insert(make_pair("TYPE", type)); - params.insert(make_pair("ALT", alternateExchange)); - params.insert(make_pair("PAS", std::string(passive ? "Y" : "N") )); - params.insert(make_pair("DURA", std::string(durable ? "Y" : "N"))); - if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::EXCHANGE,exchange,¶ms) ) - throw NotAllowedException("ACL denied exhange declare request"); - } - + AclModule* acl = getBroker().getAcl(); + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_TYPE, type)); + params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); + params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) )); + params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE))); + if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) ) + throw NotAllowedException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId())); + } + //TODO: implement autoDelete Exchange::shared_ptr alternate; if (!alternateExchange.empty()) { @@ -75,21 +85,31 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const checkType(actual, type); checkAlternate(actual, alternate); }else{ + if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) { + throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")")); + } try{ std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args); if (response.second) { - if (durable) { - getBroker().getStore().create(*response.first, args); - } if (alternate) { response.first->setAlternate(alternate); alternate->incAlternateUsers(); } + if (durable) { + getBroker().getStore().create(*response.first, args); + } } else { checkType(response.first, type); checkAlternate(response.first, alternate); } - }catch(UnknownExchangeTypeException& e){ + + ManagementAgent* agent = getBroker().getManagementAgent(); + if (agent) + agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type, + alternateExchange, durable, false, args, + response.second ? "created" : "existing")); + + }catch(UnknownExchangeTypeException& /*e*/){ throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type)); } } @@ -104,57 +124,62 @@ void SessionAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchang void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate) { - if (alternate && alternate != exchange->getAlternate()) - throw NotAllowedException( - QPID_MSG("Exchange declared with alternate-exchange " - << exchange->getAlternate()->getName() << ", requested " - << alternate->getName())); + if (alternate && ((exchange->getAlternate() && alternate != exchange->getAlternate()) + || !exchange->getAlternate())) + throw NotAllowedException(QPID_MSG("Exchange declared with alternate-exchange " + << (exchange->getAlternate() ? exchange->getAlternate()->getName() : "<nonexistent>") + << ", requested " + << alternate->getName())); } -void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/){ - - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::DELETE,acl::EXCHANGE,name,NULL) ) - throw NotAllowedException("ACL denied exhange delete request"); +void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/) +{ + AclModule* acl = getBroker().getAcl(); + if (acl) { + if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) ) + throw NotAllowedException(QPID_MSG("ACL denied exchange delete request from " << getConnection().getUserId())); } - //TODO: implement unused Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); if (exchange->isDurable()) getBroker().getStore().destroy(*exchange); if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); getBroker().getExchanges().destroy(name); -} + + ManagementAgent* agent = getBroker().getManagementAgent(); + if (agent) + agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name)); +} ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name) { - - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::ACCESS,acl::EXCHANGE,name,NULL) ) - throw NotAllowedException("ACL denied exhange query request"); + AclModule* acl = getBroker().getAcl(); + if (acl) { + if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,name,NULL) ) + throw NotAllowedException(QPID_MSG("ACL denied exchange query request from " << getConnection().getUserId())); } try { Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); - } catch (const NotFoundException& e) { + } catch (const NotFoundException& /*e*/) { return ExchangeQueryResult("", false, true, FieldTable()); } } + void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, const string& exchangeName, const string& routingKey, - const FieldTable& arguments){ + const FieldTable& arguments) +{ + AclModule* acl = getBroker().getAcl(); + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); + params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey)); - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::BIND,acl::EXCHANGE,exchangeName,routingKey) ) - throw NotAllowedException("ACL denied exhange bind request"); + if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,¶ms)) + throw NotAllowedException(QPID_MSG("ACL denied exchange bind request from " << getConnection().getUserId())); } Queue::shared_ptr queue = getQueue(queueName); @@ -166,30 +191,29 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, if (exchange->isDurable() && queue->isDurable()) { getBroker().getStore().bind(*exchange, *queue, routingKey, arguments); } + + ManagementAgent* agent = getBroker().getManagementAgent(); + if (agent) + agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, arguments)); } }else{ - throw NotFoundException( - "Bind failed. No such exchange: " + exchangeName); + throw NotFoundException("Bind failed. No such exchange: " + exchangeName); } } -void -SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, - const string& exchangeName, - const string& routingKey) +void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, + const string& exchangeName, + const string& routingKey) { - - AclModule* acl = getBroker().getAcl(); - if (acl) - { - std::map<std::string, std::string> params; - params.insert(make_pair("QN", queueName)); - params.insert(make_pair("RKEY", routingKey)); - if (!acl->authorise(getConnection().getUserId(),acl::UNBIND,acl::EXCHANGE,exchangeName,¶ms) ) - throw NotAllowedException("ACL denied exchange unbind request"); + AclModule* acl = getBroker().getAcl(); + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); + params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey)); + if (!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) + throw NotAllowedException(QPID_MSG("ACL denied exchange unbind request from " << getConnection().getUserId())); } - Queue::shared_ptr queue = getQueue(queueName); if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); @@ -197,10 +221,14 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); //TODO: revise unbind to rely solely on binding key (not args) - if (exchange->unbind(queue, routingKey, 0) && exchange->isDurable() && queue->isDurable()) { - getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable()); - } + if (exchange->unbind(queue, routingKey, 0)) { + if (exchange->isDurable() && queue->isDurable()) + getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable()); + ManagementAgent* agent = getBroker().getManagementAgent(); + if (agent) + agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey)); + } } ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName, @@ -208,16 +236,15 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string const std::string& key, const framing::FieldTable& args) { - AclModule* acl = getBroker().getAcl(); - if (acl) - { - std::map<std::string, std::string> params; - params.insert(make_pair("QUEUE", queueName)); - params.insert(make_pair("RKEY", queueName)); - if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::EXCHANGE,exchangeName,¶ms) ) - throw NotAllowedException("ACL denied exhange bound request"); + AclModule* acl = getBroker().getAcl(); + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); + params.insert(make_pair(acl::PROP_ROUTINGKEY, key)); + if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) + throw NotAllowedException(QPID_MSG("ACL denied exchange bound request from " << getConnection().getUserId())); } - + Exchange::shared_ptr exchange; try { exchange = getBroker().getExchanges().get(exchangeName); @@ -229,7 +256,7 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string } if (!exchange) { - return ExchangeBoundResult(true, false, false, false, false); + return ExchangeBoundResult(true, (!queueName.empty() && !queue), false, false, false); } else if (!queueName.empty() && !queue) { return ExchangeBoundResult(false, true, false, false, false); } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { @@ -268,7 +295,6 @@ void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues() exclusiveQueues.erase(exclusiveQueues.begin()); } } - bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const { @@ -278,13 +304,12 @@ bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) { - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::ACCESS,acl::QUEUE,name,NULL) ) - throw NotAllowedException("ACL denied queue query request"); + AclModule* acl = getBroker().getAcl(); + if (acl) { + if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,NULL) ) + throw NotAllowedException(QPID_MSG("ACL denied queue query request from " << getConnection().getUserId())); } - + Queue::shared_ptr queue = session.getBroker().getQueues().find(name); if (queue) { @@ -304,20 +329,23 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) } void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange, - bool passive, bool durable, bool exclusive, - bool autoDelete, const qpid::framing::FieldTable& arguments){ - - AclModule* acl = getBroker().getAcl(); - if (acl) - { - std::map<std::string, std::string> params; - params.insert(make_pair("ALT", alternateExchange)); - params.insert(make_pair("PAS", std::string(passive ? "Y" : "N") )); - params.insert(make_pair("DURA", std::string(durable ? "Y" : "N"))); - params.insert(make_pair("EXCLUS", std::string(exclusive ? "Y" : "N"))); - params.insert(make_pair("AUTOD", std::string(autoDelete ? "Y" : "N"))); - if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::QUEUE,name,¶ms) ) - throw NotAllowedException("ACL denied queue create request"); + bool passive, bool durable, bool exclusive, + bool autoDelete, const qpid::framing::FieldTable& arguments) +{ + AclModule* acl = getBroker().getAcl(); + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); + params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) )); + params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE))); + params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE))); + params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE))); + params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type"))); + params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count")))); + params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size")))); + + if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) + throw NotAllowedException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId())); } Exchange::shared_ptr alternate; @@ -326,17 +354,16 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& } Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = getQueue(name); + queue = getQueue(name); //TODO: check alternate-exchange is as expected } else { - std::pair<Queue::shared_ptr, bool> queue_created = - getBroker().getQueues().declare( - name, durable, - autoDelete, - exclusive ? this : 0); - queue = queue_created.first; - assert(queue); - if (queue_created.second) { // This is a new queue + std::pair<Queue::shared_ptr, bool> queue_created = + getBroker().getQueues().declare(name, durable, + autoDelete, + exclusive ? &session : 0); + queue = queue_created.first; + assert(queue); + if (queue_created.second) { // This is a new queue if (alternate) { queue->setAlternateExchange(alternate); alternate->incAlternateUsers(); @@ -345,48 +372,56 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& //apply settings & create persistent record if required queue_created.first->create(arguments); - //add default binding: - getBroker().getExchanges().getDefault()->bind(queue, name, 0); + //add default binding: + getBroker().getExchanges().getDefault()->bind(queue, name, 0); queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments); //handle automatic cleanup: - if (exclusive) { - exclusiveQueues.push_back(queue); - } - } else { - if (exclusive && queue->setExclusiveOwner(this)) { - exclusiveQueues.push_back(queue); + if (exclusive) { + exclusiveQueues.push_back(queue); + } + } else { + if (exclusive && queue->setExclusiveOwner(&session)) { + exclusiveQueues.push_back(queue); } } + + ManagementAgent* agent = getBroker().getManagementAgent(); + if (agent) + agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), + name, durable, exclusive, autoDelete, arguments, + queue_created.second ? "created" : "existing")); } - if (exclusive && !queue->isExclusiveOwner(this)) - throw ResourceLockedException( - QPID_MSG("Cannot grant exclusive access to queue " - << queue->getName())); + + if (exclusive && !queue->isExclusiveOwner(&session)) + throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue " + << queue->getName())); } void SessionAdapter::QueueHandlerImpl::purge(const string& queue){ - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::PURGE,acl::QUEUE,queue,NULL) ) - throw NotAllowedException("ACL denied queue purge request"); + AclModule* acl = getBroker().getAcl(); + if (acl) + { + if (!acl->authorise(getConnection().getUserId(),acl::ACT_PURGE,acl::OBJ_QUEUE,queue,NULL) ) + throw NotAllowedException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId())); } getQueue(queue)->purge(); } void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){ - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::DELETE,acl::QUEUE,queue,NULL) ) - throw NotAllowedException("ACL denied queue delete request"); + AclModule* acl = getBroker().getAcl(); + if (acl) + { + if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL) ) + throw NotAllowedException(QPID_MSG("ACL denied queue delete request from " << getConnection().getUserId())); } - ChannelException error(0, ""); Queue::shared_ptr q = getQueue(queue); + if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session)) + throw ResourceLockedException(QPID_MSG("Cannot delete queue " + << queue << "; it is exclusive to another session")); if(ifEmpty && q->getMessageCount() > 0){ throw PreconditionFailedException("Queue not empty."); }else if(ifUnused && q->getConsumerCount() > 0){ @@ -400,16 +435,18 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse q->destroy(); getBroker().getQueues().destroy(queue); q->unbind(getBroker().getExchanges(), q); + + ManagementAgent* agent = getBroker().getManagementAgent(); + if (agent) + agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue)); } } - SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerHelper(s), releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)), releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)), - rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)), - acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2)) + rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)) {} // @@ -431,37 +468,47 @@ void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, b void SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, - const string& destination, - uint8_t acceptMode, - uint8_t acquireMode, - bool exclusive, - const string& /*resumeId*/,//TODO implement resume behaviour - uint64_t /*resumeTtl*/, - const FieldTable& arguments) + const string& destination, + uint8_t acceptMode, + uint8_t acquireMode, + bool exclusive, + const string& resumeId, + uint64_t resumeTtl, + const FieldTable& arguments) { - AclModule* acl = getBroker().getAcl(); - if (acl) - { - // add flags as needed - if (!acl->authorise(getConnection().getUserId(),acl::CONSUME,acl::QUEUE,queueName,NULL) ) - throw NotAllowedException("ACL denied Queue subscribe request"); + AclModule* acl = getBroker().getAcl(); + if (acl) + { + if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) ) + throw NotAllowedException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId())); } Queue::shared_ptr queue = getQueue(queueName); if(!destination.empty() && state.exists(destination)) throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); + if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session)) + throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue " + << queue->getName())); + + state.consume(destination, queue, + acceptMode == 0, acquireMode == 0, exclusive, + resumeId, resumeTtl, arguments); - string tag = destination; - state.consume(MessageDelivery::getMessageDeliveryToken(destination, acceptMode, acquireMode), - tag, queue, false, //TODO get rid of no-local - acceptMode == 0, acquireMode == 0, exclusive, &arguments); + ManagementAgent* agent = getBroker().getManagementAgent(); + if (agent) + agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(), + queueName, destination, exclusive, arguments)); } void SessionAdapter::MessageHandlerImpl::cancel(const string& destination ) { state.cancel(destination); + + ManagementAgent* agent = getBroker().getManagementAgent(); + if (agent) + agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination)); } void @@ -510,8 +557,7 @@ void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination) void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands) { - - commands.for_each(acceptOp); + state.accepted(commands); } framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers) @@ -595,7 +641,7 @@ XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid, if (suspend) { throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); } else { - return XaResult(XA_RBROLLBACK); + return XaResult(XA_STATUS_XA_RBROLLBACK); } } else { if (suspend) { @@ -603,10 +649,10 @@ XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid, } else { state.endDtx(convert(xid), false); } - return XaResult(XA_OK); + return XaResult(XA_STATUS_XA_OK); } - } catch (const DtxTimeoutException& e) { - return XaResult(XA_RBTIMEOUT); + } catch (const DtxTimeoutException& /*e*/) { + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } @@ -623,9 +669,9 @@ XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid, } else { state.startDtx(convert(xid), getBroker().getDtxManager(), join); } - return XaResult(XA_OK); - } catch (const DtxTimeoutException& e) { - return XaResult(XA_RBTIMEOUT); + return XaResult(XA_STATUS_XA_OK); + } catch (const DtxTimeoutException& /*e*/) { + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } @@ -633,9 +679,9 @@ XaResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid) { try { bool ok = getBroker().getDtxManager().prepare(convert(xid)); - return XaResult(ok ? XA_OK : XA_RBROLLBACK); - } catch (const DtxTimeoutException& e) { - return XaResult(XA_RBTIMEOUT); + return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK); + } catch (const DtxTimeoutException& /*e*/) { + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } @@ -644,9 +690,9 @@ XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid, { try { bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase); - return XaResult(ok ? XA_OK : XA_RBROLLBACK); - } catch (const DtxTimeoutException& e) { - return XaResult(XA_RBTIMEOUT); + return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK); + } catch (const DtxTimeoutException& /*e*/) { + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } @@ -655,9 +701,9 @@ XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid) { try { getBroker().getDtxManager().rollback(convert(xid)); - return XaResult(XA_OK); - } catch (const DtxTimeoutException& e) { - return XaResult(XA_RBTIMEOUT); + return XaResult(XA_STATUS_XA_OK); + } catch (const DtxTimeoutException& /*e*/) { + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } @@ -699,11 +745,11 @@ void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid, Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const { Queue::shared_ptr queue; if (name.empty()) { - throw amqp_0_10::IllegalArgumentException(QPID_MSG("No queue name specified.")); + throw framing::IllegalArgumentException(QPID_MSG("No queue name specified.")); } else { queue = session.getBroker().getQueues().find(name); if (!queue) - throw amqp_0_10::NotFoundException(QPID_MSG("Queue not found: "<<name)); + throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name)); } return queue; } |