summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Broker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp286
1 files changed, 149 insertions, 137 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 08606516d4..49e80689fc 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -20,6 +20,8 @@
*/
#include "qpid/broker/Broker.h"
+
+#include "qpid/broker/AclModule.h"
#include "qpid/broker/AsyncResultHandle.h"
#include "qpid/broker/ConfigAsyncContext.h"
#include "qpid/broker/ConfigHandle.h"
@@ -27,9 +29,7 @@
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
-//#include "qpid/broker/MessageStoreModule.h"
-//#include "qpid/broker/NullMessageStore.h"
-//#include "qpid/broker/RecoveryAsyncContext.h"
+#include "qpid/broker/NameGenerator.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/SaslAuthenticator.h"
#include "qpid/broker/SecureConnectionFactory.h"
@@ -43,6 +43,7 @@
#include "qpid/broker/MessageGroupManager.h"
#include "qmf/org/apache/qpid/broker/Package.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h"
@@ -50,12 +51,12 @@
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogHiresTimestamp.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogHiresTimestamp.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventUnbind.h"
#include "qpid/amqp_0_10/Codecs.h"
@@ -75,6 +76,7 @@
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/TimeoutHandler.h"
@@ -110,6 +112,17 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace qpid {
namespace broker {
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
+const std::string qpid_management("qpid.management");
+const std::string knownHostsNone("none");
+
+// static
+Broker* Broker::thisBroker;
+
Broker::Options::Options(const std::string& name) :
qpid::Options(name),
noDataDir(0),
@@ -127,6 +140,7 @@ Broker::Options::Options(const std::string& name) :
queueLimit(100*1048576/*100M default limit*/),
tcpNoDelay(false),
requireEncrypted(false),
+ knownHosts(knownHostsNone),
qmf2Support(true),
qmf1Support(true),
queueFlowStopRatio(80),
@@ -136,7 +150,7 @@ Broker::Options::Options(const std::string& name) :
timestampRcvMsgs(false), // set the 0.10 timestamp delivery property
linkMaintenanceInterval(2),
linkHeartbeatInterval(120),
- maxNegotiateTime(2000) // 2s
+ maxNegotiateTime(10000) // 10s
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -152,6 +166,7 @@ Broker::Options::Options(const std::string& name) :
("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker")
("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored")
("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT")
+ ("interface", optValue(listenInterfaces, "<interface name>|<interface address>"), "Which network interfaces to use to listen for incoming connections")
("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
@@ -175,23 +190,27 @@ Broker::Options::Options(const std::string& name) :
("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.")
- ("link-maintenace-interval", optValue(linkMaintenanceInterval, "SECONDS"))
- ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"))
- ("max-negotiate-time", optValue(maxNegotiateTime, "MilliSeconds"), "Maximum time a connection can take to send the initial protocol negotiation")
+ ("link-maintenance-interval", optValue(linkMaintenanceInterval, "SECONDS"),
+ "Interval to check link health and re-connect if need be")
+ ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"),
+ "Heartbeat interval for a federation link")
+ ("max-negotiate-time", optValue(maxNegotiateTime, "MILLISECONDS"), "Maximum time a connection can take to send the initial protocol negotiation")
("federation-tag", optValue(fedTag, "NAME"), "Override the federation tag")
;
}
-const std::string empty;
-const std::string amq_direct("amq.direct");
-const std::string amq_topic("amq.topic");
-const std::string amq_fanout("amq.fanout");
-const std::string amq_match("amq.match");
-const std::string qpid_management("qpid.management");
-const std::string knownHostsNone("none");
+namespace {
+// Arguments to declare a non-replicated exchange.
+framing::FieldTable noReplicateArgs() {
+ framing::FieldTable args;
+ args.setString("qpid.replicate", "none");
+ return args;
+}
+}
Broker::Broker(const Broker::Options& conf) :
poller(new Poller),
+ timer(new qpid::sys::Timer),
config(conf),
managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support,
conf.qmf2Support)
@@ -205,21 +224,18 @@ Broker::Broker(const Broker::Options& conf) :
exchanges(this),
links(this),
factory(new SecureConnectionFactory(*this)),
- dtxManager(timer),
+ dtxManager(*timer.get()),
sessionManager(
qpid::SessionState::Configuration(
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
- mgmtObject(0),
- queueCleaner(queues, &timer),
- recovery(true),
- inCluster(false),
- clusterUpdatee(false),
+ queueCleaner(queues, timer.get()),
+ recoveryInProgress(false),
expiryPolicy(new ExpiryPolicy),
- getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)),
- deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2))
+ getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
+ thisBroker = this;
try {
if (conf.enableMgmt) {
QPID_LOG(info, "Management enabled");
@@ -231,7 +247,7 @@ Broker::Broker(const Broker::Options& conf) :
System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this);
systemObject = System::shared_ptr(system);
- mgmtObject = new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker");
+ mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker"));
mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId());
mgmtObject->set_port(conf.port);
mgmtObject->set_workerThreads(conf.workerThreads);
@@ -281,28 +297,26 @@ Broker::Broker(const Broker::Options& conf) :
// if (NullMessageStore::isNullStore(store.get()))
// setStore();
- exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+ framing::FieldTable args;
+
+ // Default exchnge is not replicated.
+ exchanges.declare(empty, DirectExchange::typeName, false, noReplicateArgs());
// if (store.get() != 0) {
if (asyncStore.get() != 0) {
- // The cluster plug-in will setRecovery(false) on all but the first
- // broker to join a cluster.
- if (getRecovery()) {
QPID_LOG(info, "Store recovery starting")
- RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager);
+ RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, protocolRegistry);
RecoveryHandle rh = asyncStore->createRecoveryHandle();
- boost::shared_ptr<RecoveryAsyncContext> rac(new RecoveryAsyncContext(recoverer, &recoverComplete, &asyncResultQueue));
+ boost::shared_ptr<RecoveryAsyncContext> rac(new RecoveryAsyncContext(recoverer, &recoverCompleteCb, &asyncResultQueue));
asyncStore->submitRecover(rh, rac);
-// store->recover(recoverer);
- }
- else {
- QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down");
-// store->truncateInit(true); // save old files in subdir
- asyncStore->initialize(true, true);
- }
+// RecoveryManagerImpl recoverer(
+// queues, exchanges, links, dtxManager, protocolRegistry);
+// recoveryInProgress = true;
+// store->recover(recoverer);
+// recoveryInProgress = false;
}
// debug
- else QPID_LOG(info, ">>>> No store!!!!")
+// else QPID_LOG(info, ">>>> No store!!!!")
//ensure standard exchanges exist (done after recovery from store)
declareStandardExchange(amq_direct, DirectExchange::typeName);
@@ -311,7 +325,7 @@ Broker::Broker(const Broker::Options& conf) :
declareStandardExchange(amq_match, HeadersExchange::typeName);
if(conf.enableMgmt) {
- exchanges.declare(qpid_management, ManagementTopicExchange::typeName);
+ exchanges.declare(qpid_management, ManagementTopicExchange::typeName, false, noReplicateArgs());
Exchange::shared_ptr mExchange = exchanges.get(qpid_management);
Exchange::shared_ptr dExchange = exchanges.get(amq_direct);
managementAgent->setExchange(mExchange, dExchange);
@@ -320,8 +334,10 @@ Broker::Broker(const Broker::Options& conf) :
std::string qmfTopic("qmf.default.topic");
std::string qmfDirect("qmf.default.direct");
- std::pair<Exchange::shared_ptr, bool> topicPair(exchanges.declare(qmfTopic, ManagementTopicExchange::typeName));
- std::pair<Exchange::shared_ptr, bool> directPair(exchanges.declare(qmfDirect, ManagementDirectExchange::typeName));
+ std::pair<Exchange::shared_ptr, bool> topicPair(
+ exchanges.declare(qmfTopic, ManagementTopicExchange::typeName, false, noReplicateArgs()));
+ std::pair<Exchange::shared_ptr, bool> directPair(
+ exchanges.declare(qmfDirect, ManagementDirectExchange::typeName, false, noReplicateArgs()));
boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2);
boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2);
@@ -349,19 +365,17 @@ Broker::Broker(const Broker::Options& conf) :
// Initialize plugins
Plugin::initializeAll(*this);
- if (managementAgent.get()) managementAgent->pluginsInitialized();
+ if(conf.enableMgmt) {
+ if (getAcl()) {
+ mgmtObject->set_maxConns(getAcl()->getMaxConnectTotal());
+ }
+ }
if (conf.queueCleanInterval) {
queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
}
- //initialize known broker urls (TODO: add support for urls for other transports (SSL, RDMA)):
- if (conf.knownHosts.empty()) {
- boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(TCP_TRANSPORT);
- if (factory) {
- knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( factory->getPort() ) );
- }
- } else if (conf.knownHosts != knownHostsNone) {
+ if (!conf.knownHosts.empty() && conf.knownHosts != knownHostsNone) {
knownBrokers.push_back(Url(conf.knownHosts));
}
@@ -375,11 +389,14 @@ void Broker::declareStandardExchange(const std::string& name, const std::string&
{
// bool storeEnabled = store.get() != NULL;
bool storeEnabled = asyncStore.get() != NULL;
- std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled);
+ framing::FieldTable args;
+ // Standard exchanges are not replicated.
+ std::pair<Exchange::shared_ptr, bool> status =
+ exchanges.declare(name, type, storeEnabled, noReplicateArgs());
if (status.second && storeEnabled) {
// store->create(*status.first, framing::FieldTable ());
ConfigHandle ch = asyncStore->createConfigHandle();
- boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureCompleteCb, &asyncResultQueue));
asyncStore->submitCreate(ch, status.first.get(), bc);
}
}
@@ -420,11 +437,20 @@ void Broker::setStore () {
}
// static
+void Broker::recoverCompleteCb(const AsyncResultHandle* const arh) {
+ thisBroker->recoverComplete(arh);
+}
+
void Broker::recoverComplete(const AsyncResultHandle* const arh) {
+ recoveryInProgress = false;
std::cout << "@@@@ Broker: Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
// static
+void Broker::configureCompleteCb(const AsyncResultHandle* const arh) {
+ thisBroker->configureComplete(arh);
+}
+
void Broker::configureComplete(const AsyncResultHandle* const arh) {
std::cout << "@@@@ Broker: Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
@@ -464,13 +490,13 @@ Broker::~Broker() {
finalize(); // Finalize any plugins.
if (config.auth)
SaslAuthenticator::fini();
- timer.stop();
+ timer->stop();
QPID_LOG(notice, "Shut down");
}
-ManagementObject* Broker::GetManagementObject(void) const
+ManagementObject::shared_ptr Broker::GetManagementObject(void) const
{
- return (ManagementObject*) mgmtObject;
+ return mgmtObject;
}
Manageable* Broker::GetVhostObject(void) const
@@ -536,7 +562,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
_qmf::ArgsBrokerQueueMoveMessages& moveArgs=
dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
QPID_LOG (debug, "Broker::queueMoveMessages()");
- if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter))
+ if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter) >= 0)
status = Manageable::STATUS_OK;
else
return Manageable::STATUS_PARAMETER_INVALID;
@@ -584,7 +610,22 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
status = setTimestampConfig(a.i_receive, getManagementExecutionContext());
break;
}
- default:
+
+ case _qmf::Broker::METHOD_GETLOGHIRESTIMESTAMP:
+ {
+ dynamic_cast<_qmf::ArgsBrokerGetLogHiresTimestamp&>(args).o_logHires = getLogHiresTimestamp();
+ QPID_LOG (debug, "Broker::getLogHiresTimestamp()");
+ status = Manageable::STATUS_OK;
+ break;
+ }
+ case _qmf::Broker::METHOD_SETLOGHIRESTIMESTAMP:
+ {
+ setLogHiresTimestamp(dynamic_cast<_qmf::ArgsBrokerSetLogHiresTimestamp&>(args).i_logHires);
+ QPID_LOG (debug, "Broker::setLogHiresTimestamp()");
+ status = Manageable::STATUS_OK;
+ break;
+ }
+ default:
QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
@@ -750,7 +791,7 @@ void Broker::createObject(const std::string& type, const std::string& name,
else extensions[i->first] = i->second;
}
framing::FieldTable arguments;
- amqp_0_10::translate(extensions, arguments);
+ qpid::amqp_0_10::translate(extensions, arguments);
try {
std::pair<boost::shared_ptr<Exchange>, bool> result =
@@ -772,7 +813,7 @@ void Broker::createObject(const std::string& type, const std::string& name,
else extensions[i->first] = i->second;
}
framing::FieldTable arguments;
- amqp_0_10::translate(extensions, arguments);
+ qpid::amqp_0_10::translate(extensions, arguments);
bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
@@ -1008,6 +1049,18 @@ std::string Broker::getLogLevel()
return level;
}
+void Broker::setLogHiresTimestamp(bool enabled)
+{
+ QPID_LOG(notice, "Changing log hires timestamp to " << enabled);
+ qpid::log::Logger::instance().setHiresTimestamp(enabled);
+}
+
+bool Broker::getLogHiresTimestamp()
+{
+ return qpid::log::Logger::instance().getHiresTimestamp();
+}
+
+
boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const {
ProtocolFactoryMap::const_iterator i
= name.empty() ? protocolFactories.begin() : protocolFactories.find(name);
@@ -1036,39 +1089,29 @@ void Broker::accept() {
}
void Broker::connect(
+ const std::string& name,
const std::string& host, const std::string& port, const std::string& transport,
- boost::function2<void, int, std::string> failed,
- sys::ConnectionCodec::Factory* f)
+ boost::function2<void, int, std::string> failed)
{
boost::shared_ptr<ProtocolFactory> pf = getProtocolFactory(transport);
- if (pf) pf->connect(poller, host, port, f ? f : factory.get(), failed);
+ if (pf) pf->connect(poller, name, host, port, factory.get(), failed);
else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport));
}
-void Broker::connect(
- const Url& url,
- boost::function2<void, int, std::string> failed,
- sys::ConnectionCodec::Factory* f)
-{
- url.throwIfEmpty();
- const Address& addr=url[0];
- connect(addr.host, boost::lexical_cast<std::string>(addr.port), addr.protocol, failed, f);
-}
-
-uint32_t Broker::queueMoveMessages(
+int32_t Broker::queueMoveMessages(
const std::string& srcQueue,
const std::string& destQueue,
uint32_t qty,
const Variant::Map& filter)
{
- Queue::shared_ptr src_queue = queues.find(srcQueue);
- if (!src_queue)
- return 0;
- Queue::shared_ptr dest_queue = queues.find(destQueue);
- if (!dest_queue)
- return 0;
-
- return src_queue->move(dest_queue, qty, &filter);
+ Queue::shared_ptr src_queue = queues.find(srcQueue);
+ if (!src_queue)
+ return -1;
+ Queue::shared_ptr dest_queue = queues.find(destQueue);
+ if (!dest_queue)
+ return -1;
+
+ return (int32_t) src_queue->move(dest_queue, qty, &filter);
}
@@ -1083,12 +1126,6 @@ Broker::getKnownBrokersImpl()
bool Broker::deferDeliveryImpl(const std::string&, const Message&)
{ return false; }
-void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
- clusterTimer = t;
- queueCleaner.setTimer(clusterTimer.get());
- dtxManager.setTimer(*clusterTimer.get());
-}
-
const std::string Broker::TCP_TRANSPORT("tcp");
@@ -1109,9 +1146,14 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
params.insert(make_pair(acl::PROP_POLICYTYPE, settings.dropMessagesAtLimit ? "ring" : "reject"));
params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(settings.maxDepth.getCount())));
params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(settings.maxDepth.getSize())));
+ params.insert(make_pair(acl::PROP_MAXFILECOUNT, boost::lexical_cast<string>(settings.maxFileCount)));
+ params.insert(make_pair(acl::PROP_MAXFILESIZE, boost::lexical_cast<string>(settings.maxFileSize)));
if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
+
+ if (!acl->approveCreateQueue(userId,name) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
}
Exchange::shared_ptr alternate;
@@ -1120,21 +1162,12 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
}
- std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate);
+ std::pair<Queue::shared_ptr, bool> result =
+ queues.declare(name, settings, alternate, false/*recovering*/,
+ owner, connectionId, userId);
if (result.second) {
//add default binding:
result.first->bind(exchanges.getDefault(), name, qpid::framing::FieldTable());
-
- if (managementAgent.get()) {
- //TODO: debatable whether we should raise an event here for
- //create when this is a 'declare' event; ideally add a create
- //event instead?
- managementAgent->raiseEvent(
- _qmf::EventQueueDeclare(connectionId, userId, name,
- settings.durable, owner, settings.autodelete, alternateExchange,
- settings.asMap(),
- "created"));
- }
QPID_LOG_CAT(debug, model, "Create queue. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId
@@ -1149,6 +1182,10 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
void Broker::deleteQueue(const std::string& name, const std::string& userId,
const std::string& connectionId, QueueFunctor check)
{
+ QPID_LOG_CAT(debug, model, "Deleting queue. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId
+ );
if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
}
@@ -1156,19 +1193,13 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId,
Queue::shared_ptr queue = queues.find(name);
if (queue) {
if (check) check(queue);
- queues.destroy(name);
+ if (acl)
+ acl->recordDestroyQueue(name);
+ queues.destroy(name, connectionId, userId);
queue->destroyed();
} else {
throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
}
-
- if (managementAgent.get())
- managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name));
- QPID_LOG_CAT(debug, model, "Delete queue. name:" << name
- << " user:" << userId
- << " rhost:" << connectionId
- );
-
}
std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
@@ -1196,33 +1227,16 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
}
std::pair<Exchange::shared_ptr, bool> result;
- result = exchanges.declare(name, type, durable, arguments);
+ result = exchanges.declare(
+ name, type, durable, arguments, alternate, connectionId, userId);
if (result.second) {
- if (alternate) {
- result.first->setAlternate(alternate);
- alternate->incAlternateUsers();
- }
if (durable) {
// store->create(*result.first, arguments);
ConfigHandle ch = asyncStore->createConfigHandle();
result.first->setHandle(ch);
- boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureCompleteCb, &asyncResultQueue));
asyncStore->submitCreate(ch, result.first.get(), bc);
}
- if (managementAgent.get()) {
- //TODO: debatable whether we should raise an event here for
- //create when this is a 'declare' event; ideally add a create
- //event instead?
- managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId,
- userId,
- name,
- type,
- alternateExchange,
- durable,
- false,
- ManagementAgent::toMap(arguments),
- "created"));
- }
QPID_LOG_CAT(debug, model, "Create exchange. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId
@@ -1236,6 +1250,9 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
void Broker::deleteExchange(const std::string& name, const std::string& userId,
const std::string& connectionId)
{
+ QPID_LOG_CAT(debug, model, "Deleting exchange. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId);
if (acl) {
if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
@@ -1246,21 +1263,15 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId,
}
Exchange::shared_ptr exchange(exchanges.get(name));
if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
- if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+ if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Cannot delete " << name <<", in use as alternate-exchange."));
// if (exchange->isDurable()) store->destroy(*exchange);
if (exchange->isDurable()) {
- boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureCompleteCb, &asyncResultQueue));
asyncStore->submitDestroy(exchange->getHandle(), bc);
exchange->resetHandle();
}
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
- exchanges.destroy(name);
-
- if (managementAgent.get())
- managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
- QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name
- << " user:" << userId
- << " rhost:" << connectionId);
+ exchanges.destroy(name, connectionId, userId);
}
void Broker::bind(const std::string& queueName,
@@ -1298,6 +1309,7 @@ void Broker::bind(const std::string& queueName,
QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName
<< " queue:" << queueName
<< " key:" << key
+ << " arguments:" << arguments
<< " user:" << userId
<< " rhost:" << connectionId);
}