summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionAdapter.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp390
1 files changed, 218 insertions, 172 deletions
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 03022b00bb..a7743d95ab 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -15,17 +15,23 @@
* limitations under the License.
*
*/
-#include "SessionAdapter.h"
-#include "Connection.h"
-#include "DeliveryToken.h"
-#include "MessageDelivery.h"
-#include "Queue.h"
+#include "qpid/broker/SessionAdapter.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/Queue.h"
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
-#include "qpid/framing/constants.h"
+#include "qpid/framing/enum.h"
#include "qpid/log/Statement.h"
-#include "qpid/amqp_0_10/exceptions.h"
#include "qpid/framing/SequenceSet.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventUnbind.h"
+#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
+#include "qmf/org/apache/qpid/broker/EventUnsubscribe.h"
#include <boost/format.hpp>
#include <boost/cast.hpp>
#include <boost/bind.hpp>
@@ -35,6 +41,9 @@ namespace broker {
using namespace qpid;
using namespace qpid::framing;
+using namespace qpid::framing::dtx;
+using namespace qpid::management;
+namespace _qmf = qmf::org::apache::qpid::broker;
typedef std::vector<Queue::shared_ptr> QueueVector;
@@ -48,23 +57,24 @@ SessionAdapter::SessionAdapter(SemanticState& s) :
dtxImpl(s)
{}
+static const std::string _TRUE("true");
+static const std::string _FALSE("false");
void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type,
const string& alternateExchange,
bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- std::map<std::string, std::string> params;
- params.insert(make_pair("TYPE", type));
- params.insert(make_pair("ALT", alternateExchange));
- params.insert(make_pair("PAS", std::string(passive ? "Y" : "N") ));
- params.insert(make_pair("DURA", std::string(durable ? "Y" : "N")));
- if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::EXCHANGE,exchange,&params) )
- throw NotAllowedException("ACL denied exhange declare request");
- }
-
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_TYPE, type));
+ params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+ params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
+ params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
+ throw NotAllowedException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId()));
+ }
+
//TODO: implement autoDelete
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
@@ -75,21 +85,31 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
checkType(actual, type);
checkAlternate(actual, alternate);
}else{
+ if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) {
+ throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")"));
+ }
try{
std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
if (response.second) {
- if (durable) {
- getBroker().getStore().create(*response.first, args);
- }
if (alternate) {
response.first->setAlternate(alternate);
alternate->incAlternateUsers();
}
+ if (durable) {
+ getBroker().getStore().create(*response.first, args);
+ }
} else {
checkType(response.first, type);
checkAlternate(response.first, alternate);
}
- }catch(UnknownExchangeTypeException& e){
+
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type,
+ alternateExchange, durable, false, args,
+ response.second ? "created" : "existing"));
+
+ }catch(UnknownExchangeTypeException& /*e*/){
throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
}
}
@@ -104,57 +124,62 @@ void SessionAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchang
void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
{
- if (alternate && alternate != exchange->getAlternate())
- throw NotAllowedException(
- QPID_MSG("Exchange declared with alternate-exchange "
- << exchange->getAlternate()->getName() << ", requested "
- << alternate->getName()));
+ if (alternate && ((exchange->getAlternate() && alternate != exchange->getAlternate())
+ || !exchange->getAlternate()))
+ throw NotAllowedException(QPID_MSG("Exchange declared with alternate-exchange "
+ << (exchange->getAlternate() ? exchange->getAlternate()->getName() : "<nonexistent>")
+ << ", requested "
+ << alternate->getName()));
}
-void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/){
-
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- if (!acl->authorise(getConnection().getUserId(),acl::DELETE,acl::EXCHANGE,name,NULL) )
- throw NotAllowedException("ACL denied exhange delete request");
+void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
+{
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
+ throw NotAllowedException(QPID_MSG("ACL denied exchange delete request from " << getConnection().getUserId()));
}
-
//TODO: implement unused
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
getBroker().getExchanges().destroy(name);
-}
+
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name));
+}
ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
{
-
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- if (!acl->authorise(getConnection().getUserId(),acl::ACCESS,acl::EXCHANGE,name,NULL) )
- throw NotAllowedException("ACL denied exhange query request");
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,name,NULL) )
+ throw NotAllowedException(QPID_MSG("ACL denied exchange query request from " << getConnection().getUserId()));
}
try {
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
- } catch (const NotFoundException& e) {
+ } catch (const NotFoundException& /*e*/) {
return ExchangeQueryResult("", false, true, FieldTable());
}
}
+
void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
const string& exchangeName, const string& routingKey,
- const FieldTable& arguments){
+ const FieldTable& arguments)
+{
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+ params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- if (!acl->authorise(getConnection().getUserId(),acl::BIND,acl::EXCHANGE,exchangeName,routingKey) )
- throw NotAllowedException("ACL denied exhange bind request");
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
+ throw NotAllowedException(QPID_MSG("ACL denied exchange bind request from " << getConnection().getUserId()));
}
Queue::shared_ptr queue = getQueue(queueName);
@@ -166,30 +191,29 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
if (exchange->isDurable() && queue->isDurable()) {
getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
}
+
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, arguments));
}
}else{
- throw NotFoundException(
- "Bind failed. No such exchange: " + exchangeName);
+ throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
}
}
-void
-SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
- const string& exchangeName,
- const string& routingKey)
+void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
+ const string& exchangeName,
+ const string& routingKey)
{
-
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- std::map<std::string, std::string> params;
- params.insert(make_pair("QN", queueName));
- params.insert(make_pair("RKEY", routingKey));
- if (!acl->authorise(getConnection().getUserId(),acl::UNBIND,acl::EXCHANGE,exchangeName,&params) )
- throw NotAllowedException("ACL denied exchange unbind request");
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+ params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
+ throw NotAllowedException(QPID_MSG("ACL denied exchange unbind request from " << getConnection().getUserId()));
}
-
Queue::shared_ptr queue = getQueue(queueName);
if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
@@ -197,10 +221,14 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
//TODO: revise unbind to rely solely on binding key (not args)
- if (exchange->unbind(queue, routingKey, 0) && exchange->isDurable() && queue->isDurable()) {
- getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable());
- }
+ if (exchange->unbind(queue, routingKey, 0)) {
+ if (exchange->isDurable() && queue->isDurable())
+ getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable());
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey));
+ }
}
ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
@@ -208,16 +236,15 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string
const std::string& key,
const framing::FieldTable& args)
{
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- std::map<std::string, std::string> params;
- params.insert(make_pair("QUEUE", queueName));
- params.insert(make_pair("RKEY", queueName));
- if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::EXCHANGE,exchangeName,&params) )
- throw NotAllowedException("ACL denied exhange bound request");
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+ params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchangeName,&params) )
+ throw NotAllowedException(QPID_MSG("ACL denied exchange bound request from " << getConnection().getUserId()));
}
-
+
Exchange::shared_ptr exchange;
try {
exchange = getBroker().getExchanges().get(exchangeName);
@@ -229,7 +256,7 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string
}
if (!exchange) {
- return ExchangeBoundResult(true, false, false, false, false);
+ return ExchangeBoundResult(true, (!queueName.empty() && !queue), false, false, false);
} else if (!queueName.empty() && !queue) {
return ExchangeBoundResult(false, true, false, false, false);
} else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
@@ -268,7 +295,6 @@ void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues()
exclusiveQueues.erase(exclusiveQueues.begin());
}
}
-
bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
{
@@ -278,13 +304,12 @@ bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- if (!acl->authorise(getConnection().getUserId(),acl::ACCESS,acl::QUEUE,name,NULL) )
- throw NotAllowedException("ACL denied queue query request");
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,NULL) )
+ throw NotAllowedException(QPID_MSG("ACL denied queue query request from " << getConnection().getUserId()));
}
-
+
Queue::shared_ptr queue = session.getBroker().getQueues().find(name);
if (queue) {
@@ -304,20 +329,23 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
}
void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, const qpid::framing::FieldTable& arguments){
-
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- std::map<std::string, std::string> params;
- params.insert(make_pair("ALT", alternateExchange));
- params.insert(make_pair("PAS", std::string(passive ? "Y" : "N") ));
- params.insert(make_pair("DURA", std::string(durable ? "Y" : "N")));
- params.insert(make_pair("EXCLUS", std::string(exclusive ? "Y" : "N")));
- params.insert(make_pair("AUTOD", std::string(autoDelete ? "Y" : "N")));
- if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::QUEUE,name,&params) )
- throw NotAllowedException("ACL denied queue create request");
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, const qpid::framing::FieldTable& arguments)
+{
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+ params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
+ params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
+ params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
+ params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
+ params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+ params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+ params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
+ throw NotAllowedException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
}
Exchange::shared_ptr alternate;
@@ -326,17 +354,16 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
}
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = getQueue(name);
+ queue = getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
- std::pair<Queue::shared_ptr, bool> queue_created =
- getBroker().getQueues().declare(
- name, durable,
- autoDelete,
- exclusive ? this : 0);
- queue = queue_created.first;
- assert(queue);
- if (queue_created.second) { // This is a new queue
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ getBroker().getQueues().declare(name, durable,
+ autoDelete,
+ exclusive ? &session : 0);
+ queue = queue_created.first;
+ assert(queue);
+ if (queue_created.second) { // This is a new queue
if (alternate) {
queue->setAlternateExchange(alternate);
alternate->incAlternateUsers();
@@ -345,48 +372,56 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
//apply settings & create persistent record if required
queue_created.first->create(arguments);
- //add default binding:
- getBroker().getExchanges().getDefault()->bind(queue, name, 0);
+ //add default binding:
+ getBroker().getExchanges().getDefault()->bind(queue, name, 0);
queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
//handle automatic cleanup:
- if (exclusive) {
- exclusiveQueues.push_back(queue);
- }
- } else {
- if (exclusive && queue->setExclusiveOwner(this)) {
- exclusiveQueues.push_back(queue);
+ if (exclusive) {
+ exclusiveQueues.push_back(queue);
+ }
+ } else {
+ if (exclusive && queue->setExclusiveOwner(&session)) {
+ exclusiveQueues.push_back(queue);
}
}
+
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
+ name, durable, exclusive, autoDelete, arguments,
+ queue_created.second ? "created" : "existing"));
}
- if (exclusive && !queue->isExclusiveOwner(this))
- throw ResourceLockedException(
- QPID_MSG("Cannot grant exclusive access to queue "
- << queue->getName()));
+
+ if (exclusive && !queue->isExclusiveOwner(&session))
+ throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue "
+ << queue->getName()));
}
void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- if (!acl->authorise(getConnection().getUserId(),acl::PURGE,acl::QUEUE,queue,NULL) )
- throw NotAllowedException("ACL denied queue purge request");
+ AclModule* acl = getBroker().getAcl();
+ if (acl)
+ {
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_PURGE,acl::OBJ_QUEUE,queue,NULL) )
+ throw NotAllowedException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId()));
}
getQueue(queue)->purge();
}
void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- if (!acl->authorise(getConnection().getUserId(),acl::DELETE,acl::QUEUE,queue,NULL) )
- throw NotAllowedException("ACL denied queue delete request");
+ AclModule* acl = getBroker().getAcl();
+ if (acl)
+ {
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL) )
+ throw NotAllowedException(QPID_MSG("ACL denied queue delete request from " << getConnection().getUserId()));
}
- ChannelException error(0, "");
Queue::shared_ptr q = getQueue(queue);
+ if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session))
+ throw ResourceLockedException(QPID_MSG("Cannot delete queue "
+ << queue << "; it is exclusive to another session"));
if(ifEmpty && q->getMessageCount() > 0){
throw PreconditionFailedException("Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
@@ -400,16 +435,18 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse
q->destroy();
getBroker().getQueues().destroy(queue);
q->unbind(getBroker().getExchanges(), q);
+
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue));
}
}
-
SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
HandlerHelper(s),
releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)),
releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
- rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
- acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
+ rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
{}
//
@@ -431,37 +468,47 @@ void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, b
void
SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
- const string& destination,
- uint8_t acceptMode,
- uint8_t acquireMode,
- bool exclusive,
- const string& /*resumeId*/,//TODO implement resume behaviour
- uint64_t /*resumeTtl*/,
- const FieldTable& arguments)
+ const string& destination,
+ uint8_t acceptMode,
+ uint8_t acquireMode,
+ bool exclusive,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const FieldTable& arguments)
{
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- // add flags as needed
- if (!acl->authorise(getConnection().getUserId(),acl::CONSUME,acl::QUEUE,queueName,NULL) )
- throw NotAllowedException("ACL denied Queue subscribe request");
+ AclModule* acl = getBroker().getAcl();
+ if (acl)
+ {
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) )
+ throw NotAllowedException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId()));
}
Queue::shared_ptr queue = getQueue(queueName);
if(!destination.empty() && state.exists(destination))
throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
+ if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session))
+ throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue "
+ << queue->getName()));
+
+ state.consume(destination, queue,
+ acceptMode == 0, acquireMode == 0, exclusive,
+ resumeId, resumeTtl, arguments);
- string tag = destination;
- state.consume(MessageDelivery::getMessageDeliveryToken(destination, acceptMode, acquireMode),
- tag, queue, false, //TODO get rid of no-local
- acceptMode == 0, acquireMode == 0, exclusive, &arguments);
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(),
+ queueName, destination, exclusive, arguments));
}
void
SessionAdapter::MessageHandlerImpl::cancel(const string& destination )
{
state.cancel(destination);
+
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination));
}
void
@@ -510,8 +557,7 @@ void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination)
void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands)
{
-
- commands.for_each(acceptOp);
+ state.accepted(commands);
}
framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)
@@ -595,7 +641,7 @@ XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
if (suspend) {
throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
} else {
- return XaResult(XA_RBROLLBACK);
+ return XaResult(XA_STATUS_XA_RBROLLBACK);
}
} else {
if (suspend) {
@@ -603,10 +649,10 @@ XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
} else {
state.endDtx(convert(xid), false);
}
- return XaResult(XA_OK);
+ return XaResult(XA_STATUS_XA_OK);
}
- } catch (const DtxTimeoutException& e) {
- return XaResult(XA_RBTIMEOUT);
+ } catch (const DtxTimeoutException& /*e*/) {
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -623,9 +669,9 @@ XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
} else {
state.startDtx(convert(xid), getBroker().getDtxManager(), join);
}
- return XaResult(XA_OK);
- } catch (const DtxTimeoutException& e) {
- return XaResult(XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_OK);
+ } catch (const DtxTimeoutException& /*e*/) {
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -633,9 +679,9 @@ XaResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid)
{
try {
bool ok = getBroker().getDtxManager().prepare(convert(xid));
- return XaResult(ok ? XA_OK : XA_RBROLLBACK);
- } catch (const DtxTimeoutException& e) {
- return XaResult(XA_RBTIMEOUT);
+ return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
+ } catch (const DtxTimeoutException& /*e*/) {
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -644,9 +690,9 @@ XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid,
{
try {
bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
- return XaResult(ok ? XA_OK : XA_RBROLLBACK);
- } catch (const DtxTimeoutException& e) {
- return XaResult(XA_RBTIMEOUT);
+ return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
+ } catch (const DtxTimeoutException& /*e*/) {
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -655,9 +701,9 @@ XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid)
{
try {
getBroker().getDtxManager().rollback(convert(xid));
- return XaResult(XA_OK);
- } catch (const DtxTimeoutException& e) {
- return XaResult(XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_OK);
+ } catch (const DtxTimeoutException& /*e*/) {
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -699,11 +745,11 @@ void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid,
Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const {
Queue::shared_ptr queue;
if (name.empty()) {
- throw amqp_0_10::IllegalArgumentException(QPID_MSG("No queue name specified."));
+ throw framing::IllegalArgumentException(QPID_MSG("No queue name specified."));
} else {
queue = session.getBroker().getQueues().find(name);
if (!queue)
- throw amqp_0_10::NotFoundException(QPID_MSG("Queue not found: "<<name));
+ throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name));
}
return queue;
}