summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Broker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp44
1 files changed, 20 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index b763dd4119..2411e0520c 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -33,6 +33,7 @@
#include "qpid/broker/Link.h"
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/MessageGroupManager.h"
#include "qmf/org/apache/qpid/broker/Package.h"
@@ -120,7 +121,6 @@ Broker::Options::Options(const std::string& name) :
queueLimit(100*1048576/*100M default limit*/),
tcpNoDelay(false),
requireEncrypted(false),
- asyncQueueEvents(false), // Must be false in a cluster.
qmf2Support(true),
qmf1Support(true),
queueFlowStopRatio(80),
@@ -164,7 +164,6 @@ Broker::Options::Options(const std::string& name) :
("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted")
("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
("sasl-config", optValue(saslConfigPath, "DIR"), "gets sasl config info from nonstandard location")
- ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication")
("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
@@ -206,7 +205,6 @@ Broker::Broker(const Broker::Options& conf) :
*this),
mgmtObject(0),
queueCleaner(queues, &timer),
- queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
inCluster(false),
clusterUpdatee(false),
@@ -265,8 +263,6 @@ Broker::Broker(const Broker::Options& conf) :
federationTag = conf.fedTag;
}
- QueuePolicy::setDefaultMaxSize(conf.queueLimit);
-
// Early-Initialize plugins
Plugin::earlyInitAll(*this);
@@ -430,7 +426,6 @@ void Broker::shutdown() {
Broker::~Broker() {
shutdown();
- queueEvents.shutdown();
finalize(); // Finalize any plugins.
if (config.auth)
SaslAuthenticator::fini();
@@ -694,11 +689,15 @@ void Broker::createObject(const std::string& type, const std::string& name,
//treat everything else as extension properties
else extensions[i->first] = i->second;
}
- framing::FieldTable arguments;
- amqp_0_10::translate(extensions, arguments);
+ QueueSettings settings(durable, autodelete);
+ Variant::Map unused;
+ settings.populate(extensions, unused);
+ qpid::amqp_0_10::translate(unused, settings.storeSettings);
+ //TODO: unused doesn't take store settings into account... so can't yet implement strict
+ QPID_LOG(debug, "Broker did not use the following settings (store module may): " << unused);
std::pair<boost::shared_ptr<Queue>, bool> result =
- createQueue(name, durable, autodelete, 0, alternateExchange, arguments, userId, connectionId);
+ createQueue(name, settings, 0, alternateExchange, userId, connectionId);
if (!result.second) {
throw ObjectAlreadyExists(name);
}
@@ -1046,8 +1045,7 @@ Broker::getKnownBrokersImpl()
return knownBrokers;
}
-bool Broker::deferDeliveryImpl(const std::string& ,
- const boost::intrusive_ptr<Message>& )
+bool Broker::deferDeliveryImpl(const std::string&, const Message&)
{ return false; }
void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
@@ -1061,23 +1059,21 @@ const std::string Broker::TCP_TRANSPORT("tcp");
std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
const std::string& name,
- bool durable,
- bool autodelete,
+ const QueueSettings& settings,
const OwnershipToken* owner,
const std::string& alternateExchange,
- const qpid::framing::FieldTable& arguments,
const std::string& userId,
const std::string& connectionId)
{
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
+ params.insert(make_pair(acl::PROP_DURABLE, settings.durable ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE));
- params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE));
- params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
- params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
- params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+ params.insert(make_pair(acl::PROP_AUTODELETE, settings.autodelete ? _TRUE : _FALSE));
+ params.insert(make_pair(acl::PROP_POLICYTYPE, settings.dropMessagesAtLimit ? "ring" : "reject"));
+ params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(settings.maxDepth.getCount())));
+ params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(settings.maxDepth.getSize())));
if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
@@ -1089,7 +1085,7 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
}
- std::pair<Queue::shared_ptr, bool> result = queues.declare(name, durable, autodelete, owner, alternate, arguments);
+ std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate);
if (result.second) {
//add default binding:
result.first->bind(exchanges.getDefault(), name);
@@ -1100,16 +1096,16 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
//event instead?
managementAgent->raiseEvent(
_qmf::EventQueueDeclare(connectionId, userId, name,
- durable, owner, autodelete, alternateExchange,
- ManagementAgent::toMap(arguments),
+ settings.durable, owner, settings.autodelete, alternateExchange,
+ settings.asMap(),
"created"));
}
QPID_LOG_CAT(debug, model, "Create queue. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId
- << " durable:" << (durable ? "T" : "F")
+ << " durable:" << (settings.durable ? "T" : "F")
<< " owner:" << owner
- << " autodelete:" << (autodelete ? "T" : "F")
+ << " autodelete:" << (settings.autodelete ? "T" : "F")
<< " alternateExchange:" << alternateExchange );
}
return result;