summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionAdapter.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp278
1 files changed, 100 insertions, 178 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index e06e5d1aa0..9e4516679e 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -65,51 +65,54 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
const string& alternateExchange,
bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
- AclModule* acl = getBroker().getAcl();
- if (acl) {
- std::map<acl::Property, std::string> params;
- params.insert(make_pair(acl::PROP_TYPE, type));
- params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
- params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
- throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId()));
- }
-
//TODO: implement autoDelete
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
alternate = getBroker().getExchanges().get(alternateExchange);
}
if(passive){
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ //TODO: why does a passive declare require create
+ //permission? The purpose of the passive flag is to state
+ //that the exchange should *not* created. For
+ //authorisation a passive declare is similar to
+ //exchange-query.
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_TYPE, type));
+ params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+ params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
+ params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << getConnection().getUserId()));
+ }
Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
checkType(actual, type);
checkAlternate(actual, alternate);
- }else{
+ }else{
if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) {
throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")"));
}
try{
- std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
- if (response.second) {
- if (alternate) {
- response.first->setAlternate(alternate);
- alternate->incAlternateUsers();
- }
- if (durable) {
- getBroker().getStore().create(*response.first, args);
- }
- } else {
+ std::pair<Exchange::shared_ptr, bool> response =
+ getBroker().createExchange(exchange, type, durable, alternateExchange, args,
+ getConnection().getUserId(), getConnection().getUrl());
+ if (!response.second) {
+ //exchange already there, not created
checkType(response.first, type);
checkAlternate(response.first, alternate);
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(),
+ getConnection().getUserId(),
+ exchange,
+ type,
+ alternateExchange,
+ durable,
+ false,
+ ManagementAgent::toMap(args),
+ "existing"));
}
-
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type,
- alternateExchange, durable, false, ManagementAgent::toMap(args),
- response.second ? "created" : "existing"));
-
}catch(UnknownExchangeTypeException& /*e*/){
throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
}
@@ -135,22 +138,8 @@ void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr ex
void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
{
- AclModule* acl = getBroker().getAcl();
- if (acl) {
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
- throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << getConnection().getUserId()));
- }
-
- //TODO: implement unused
- Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
- if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
- if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
- if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
- getBroker().getExchanges().destroy(name);
-
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name));
+ //TODO: implement if-unused
+ getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getUrl());
}
ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
@@ -170,67 +159,19 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam
}
void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
- const string& exchangeName, const string& routingKey,
- const FieldTable& arguments)
+ const string& exchangeName, const string& routingKey,
+ const FieldTable& arguments)
{
- AclModule* acl = getBroker().getAcl();
- if (acl) {
- std::map<acl::Property, std::string> params;
- params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
- params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
-
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
- throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << getConnection().getUserId()));
- }
-
- Queue::shared_ptr queue = getQueue(queueName);
- Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
- if(exchange){
- string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
- if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
- queue->bound(exchangeName, routingKey, arguments);
- if (exchange->isDurable() && queue->isDurable()) {
- getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
- }
-
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName,
- queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments)));
- }
- }else{
- throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
- }
+ getBroker().bind(queueName, exchangeName, routingKey, arguments,
+ getConnection().getUserId(), getConnection().getUrl());
}
void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
const string& exchangeName,
const string& routingKey)
{
- AclModule* acl = getBroker().getAcl();
- if (acl) {
- std::map<acl::Property, std::string> params;
- params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
- params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
- throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << getConnection().getUserId()));
- }
-
- Queue::shared_ptr queue = getQueue(queueName);
- if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
-
- Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
- if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
-
- //TODO: revise unbind to rely solely on binding key (not args)
- if (exchange->unbind(queue, routingKey, 0)) {
- if (exchange->isDurable() && queue->isDurable())
- getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable());
-
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey));
- }
+ getBroker().unbind(queueName, exchangeName, routingKey,
+ getConnection().getUserId(), getConnection().getUrl());
}
ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
@@ -333,52 +274,42 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
bool passive, bool durable, bool exclusive,
bool autoDelete, const qpid::framing::FieldTable& arguments)
-{
- AclModule* acl = getBroker().getAcl();
- if (acl) {
- std::map<acl::Property, std::string> params;
- params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
- params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
- params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
- params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
- params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
- params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
- params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
-
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
- throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
- }
-
- Exchange::shared_ptr alternate;
- if (!alternateExchange.empty()) {
- alternate = getBroker().getExchanges().get(alternateExchange);
- }
+{
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = getQueue(name);
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ //TODO: why does a passive declare require create
+ //permission? The purpose of the passive flag is to state
+ //that the queue should *not* created. For
+ //authorisation a passive declare is similar to
+ //queue-query (or indeed a qmf query).
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+ params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
+ params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
+ params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
+ params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
+ params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+ params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+ params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
+ throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
+ }
+ queue = getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
- std::pair<Queue::shared_ptr, bool> queue_created =
- getBroker().getQueues().declare(name, durable,
- autoDelete,
- exclusive ? &session : 0);
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ getBroker().createQueue(name, durable,
+ autoDelete,
+ exclusive ? &session : 0,
+ alternateExchange,
+ arguments,
+ getConnection().getUserId(),
+ getConnection().getUrl());
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
- if (alternate) {
- queue->setAlternateExchange(alternate);
- alternate->incAlternateUsers();
- }
-
- //apply settings & create persistent record if required
- try { queue_created.first->create(arguments); }
- catch (...) { getBroker().getQueues().destroy(name); throw; }
-
- //add default binding:
- getBroker().getExchanges().getDefault()->bind(queue, name, 0);
- queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
-
//handle automatic cleanup:
if (exclusive) {
exclusiveQueues.push_back(queue);
@@ -387,21 +318,20 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
if (exclusive && queue->setExclusiveOwner(&session)) {
exclusiveQueues.push_back(queue);
}
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
+ name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
+ "existing"));
}
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
- name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
- queue_created.second ? "created" : "existing"));
}
- if (exclusive && !queue->isExclusiveOwner(&session))
+ if (exclusive && !queue->isExclusiveOwner(&session))
throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue "
<< queue->getName()));
-}
-
-
+}
+
void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
AclModule* acl = getBroker().getAcl();
if (acl)
@@ -410,40 +340,32 @@ void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
throw UnauthorizedAccessException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId()));
}
getQueue(queue)->purge();
-}
-
-void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
-
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL) )
- throw UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << getConnection().getUserId()));
- }
+}
- Queue::shared_ptr q = getQueue(queue);
- if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session))
+void SessionAdapter::QueueHandlerImpl::checkDelete(Queue::shared_ptr queue, bool ifUnused, bool ifEmpty)
+{
+ if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session)) {
throw ResourceLockedException(QPID_MSG("Cannot delete queue "
- << queue << "; it is exclusive to another session"));
- if(ifEmpty && q->getMessageCount() > 0){
- throw PreconditionFailedException("Queue not empty.");
- }else if(ifUnused && q->getConsumerCount() > 0){
- throw PreconditionFailedException("Queue in use.");
- }else{
+ << queue->getName() << "; it is exclusive to another session"));
+ } else if(ifEmpty && queue->getMessageCount() > 0) {
+ throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
+ << queue->getName() << "; queue not empty"));
+ } else if(ifUnused && queue->getConsumerCount() > 0) {
+ throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
+ << queue->getName() << "; queue in use"));
+ } else if (queue->isExclusiveOwner(&getConnection())) {
//remove the queue from the list of exclusive queues if necessary
- if(q->isExclusiveOwner(&getConnection())){
- QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
- if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
- }
- q->destroy();
- getBroker().getQueues().destroy(queue);
- q->unbind(getBroker().getExchanges(), q);
-
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue));
- q->notifyDeleted();
- }
+ QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(),
+ getConnection().exclusiveQueues.end(),
+ queue);
+ if (i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
+ }
+}
+
+void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty)
+{
+ getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(),
+ boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty));
}
SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :