diff options
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 44 |
1 files changed, 20 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index b763dd4119..2411e0520c 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -33,6 +33,7 @@ #include "qpid/broker/Link.h" #include "qpid/broker/ExpiryPolicy.h" #include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/QueueSettings.h" #include "qpid/broker/MessageGroupManager.h" #include "qmf/org/apache/qpid/broker/Package.h" @@ -120,7 +121,6 @@ Broker::Options::Options(const std::string& name) : queueLimit(100*1048576/*100M default limit*/), tcpNoDelay(false), requireEncrypted(false), - asyncQueueEvents(false), // Must be false in a cluster. qmf2Support(true), qmf1Support(true), queueFlowStopRatio(80), @@ -164,7 +164,6 @@ Broker::Options::Options(const std::string& name) : ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted") ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") ("sasl-config", optValue(saslConfigPath, "DIR"), "gets sasl config info from nonstandard location") - ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication") ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.") ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.") ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised") @@ -206,7 +205,6 @@ Broker::Broker(const Broker::Options& conf) : *this), mgmtObject(0), queueCleaner(queues, &timer), - queueEvents(poller,!conf.asyncQueueEvents), recovery(true), inCluster(false), clusterUpdatee(false), @@ -265,8 +263,6 @@ Broker::Broker(const Broker::Options& conf) : federationTag = conf.fedTag; } - QueuePolicy::setDefaultMaxSize(conf.queueLimit); - // Early-Initialize plugins Plugin::earlyInitAll(*this); @@ -430,7 +426,6 @@ void Broker::shutdown() { Broker::~Broker() { shutdown(); - queueEvents.shutdown(); finalize(); // Finalize any plugins. if (config.auth) SaslAuthenticator::fini(); @@ -694,11 +689,15 @@ void Broker::createObject(const std::string& type, const std::string& name, //treat everything else as extension properties else extensions[i->first] = i->second; } - framing::FieldTable arguments; - amqp_0_10::translate(extensions, arguments); + QueueSettings settings(durable, autodelete); + Variant::Map unused; + settings.populate(extensions, unused); + qpid::amqp_0_10::translate(unused, settings.storeSettings); + //TODO: unused doesn't take store settings into account... so can't yet implement strict + QPID_LOG(debug, "Broker did not use the following settings (store module may): " << unused); std::pair<boost::shared_ptr<Queue>, bool> result = - createQueue(name, durable, autodelete, 0, alternateExchange, arguments, userId, connectionId); + createQueue(name, settings, 0, alternateExchange, userId, connectionId); if (!result.second) { throw ObjectAlreadyExists(name); } @@ -1046,8 +1045,7 @@ Broker::getKnownBrokersImpl() return knownBrokers; } -bool Broker::deferDeliveryImpl(const std::string& , - const boost::intrusive_ptr<Message>& ) +bool Broker::deferDeliveryImpl(const std::string&, const Message&) { return false; } void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) { @@ -1061,23 +1059,21 @@ const std::string Broker::TCP_TRANSPORT("tcp"); std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( const std::string& name, - bool durable, - bool autodelete, + const QueueSettings& settings, const OwnershipToken* owner, const std::string& alternateExchange, - const qpid::framing::FieldTable& arguments, const std::string& userId, const std::string& connectionId) { if (acl) { std::map<acl::Property, std::string> params; params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); + params.insert(make_pair(acl::PROP_DURABLE, settings.durable ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE)); - params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE)); - params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type"))); - params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count")))); - params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size")))); + params.insert(make_pair(acl::PROP_AUTODELETE, settings.autodelete ? _TRUE : _FALSE)); + params.insert(make_pair(acl::PROP_POLICYTYPE, settings.dropMessagesAtLimit ? "ring" : "reject")); + params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(settings.maxDepth.getCount()))); + params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(settings.maxDepth.getSize()))); if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId)); @@ -1089,7 +1085,7 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange)); } - std::pair<Queue::shared_ptr, bool> result = queues.declare(name, durable, autodelete, owner, alternate, arguments); + std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate); if (result.second) { //add default binding: result.first->bind(exchanges.getDefault(), name); @@ -1100,16 +1096,16 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( //event instead? managementAgent->raiseEvent( _qmf::EventQueueDeclare(connectionId, userId, name, - durable, owner, autodelete, alternateExchange, - ManagementAgent::toMap(arguments), + settings.durable, owner, settings.autodelete, alternateExchange, + settings.asMap(), "created")); } QPID_LOG_CAT(debug, model, "Create queue. name:" << name << " user:" << userId << " rhost:" << connectionId - << " durable:" << (durable ? "T" : "F") + << " durable:" << (settings.durable ? "T" : "F") << " owner:" << owner - << " autodelete:" << (autodelete ? "T" : "F") + << " autodelete:" << (settings.autodelete ? "T" : "F") << " alternateExchange:" << alternateExchange ); } return result; |