diff options
author | Ted Ross <tross@apache.org> | 2008-07-16 20:25:12 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-07-16 20:25:12 +0000 |
commit | 0742abc5e6831675aba58949d763fb4f819cde69 (patch) | |
tree | 806d768e89b6fec8d301396ec889a12f6c9e33e1 /cpp/src | |
parent | 1848880ade63a0d2e2e9472a3fc231c52962dd78 (diff) | |
download | qpid-python-0742abc5e6831675aba58949d763fb4f819cde69.tar.gz |
QPID-1170 - Provide a better factory for creation and deletion of the management agent
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@677408 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgent.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 34 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/System.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Vhost.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 109 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.h | 7 |
15 files changed, 132 insertions, 84 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgent.h b/cpp/src/qpid/agent/ManagementAgent.h index 97fc827f2a..e7379e6c94 100644 --- a/cpp/src/qpid/agent/ManagementAgent.h +++ b/cpp/src/qpid/agent/ManagementAgent.h @@ -22,6 +22,7 @@ #include "qpid/management/ManagementObject.h" #include "qpid/management/Manageable.h" +#include "qpid/sys/Mutex.h" namespace qpid { namespace management { @@ -30,11 +31,21 @@ class ManagementAgent { public: + class Singleton { + public: + Singleton(bool disableManagement = false); + ~Singleton(); + static ManagementAgent* getInstance(); + private: + static sys::Mutex lock; + static bool disabled; + static int refCount; + static ManagementAgent* agent; + }; + ManagementAgent () {} virtual ~ManagementAgent () {} - static ManagementAgent* getAgent(); - virtual int getMaxThreads() = 0; // Connect to a management broker diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 3c079a5a0a..5894aad404 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -33,14 +33,36 @@ using std::string; using std::cout; using std::endl; -ManagementAgent* ManagementAgent::getAgent() +Mutex ManagementAgent::Singleton::lock; +bool ManagementAgent::Singleton::disabled = false; +ManagementAgent* ManagementAgent::Singleton::agent = 0; +int ManagementAgent::Singleton::refCount = 0; + +ManagementAgent::Singleton::Singleton(bool disableManagement) { - //static ManagementAgent* agent = 0; + Mutex::ScopedLock _lock(lock); + if (disableManagement && !disabled) { + disabled = true; + assert(refCount == 0); // can't disable after agent has been allocated + } + if (refCount == 0 && !disabled) + agent = new ManagementAgentImpl(); + refCount++; +} - //if (agent == 0) - // agent = new ManagementAgentImpl(); - //return agent; - return 0; +ManagementAgent::Singleton::~Singleton() +{ + Mutex::ScopedLock _lock(lock); + refCount--; + if (refCount == 0 && !disabled) { + delete agent; + agent = 0; + } +} + +ManagementAgent* ManagementAgent::Singleton::getInstance() +{ + return agent; } ManagementAgentImpl::ManagementAgentImpl() : diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index b7572fe833..2ecf63cd5d 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -41,8 +41,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen { public: - ManagementAgentImpl (); - virtual ~ManagementAgentImpl (); + ManagementAgentImpl(); + virtual ~ManagementAgentImpl(); int getMaxThreads() { return 1; } void init(std::string brokerHost = "localhost", diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 9e49404bae..53bed020e2 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -40,7 +40,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, link(_link), id(_id), args(_args), mgmtObject(0), listener(l), name(Uuid(true).str()), persistenceId(0) { - ManagementAgent* agent = ManagementAgent::getAgent(); + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0) { mgmtObject = new management::Bridge(agent, this, link, id, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, @@ -106,9 +106,8 @@ void Bridge::destroy() void Bridge::setPersistenceId(uint64_t id) const { - if (mgmtObject != 0 && persistenceId == 0) - { - ManagementAgent* agent = ManagementAgent::getAgent (); + if (mgmtObject != 0 && persistenceId == 0) { + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); agent->addObject (mgmtObject, id); } persistenceId = id; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 47d9dad537..bffca94f95 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -122,6 +122,7 @@ const std::string qpid_management("qpid.management"); Broker::Broker(const Broker::Options& conf) : poller(new Poller), config(conf), + managementAgentSingleton(!config.enableMgmt), store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), links(this), @@ -134,10 +135,10 @@ Broker::Broker(const Broker::Options& conf) : { if(conf.enableMgmt){ QPID_LOG(info, "Management enabled"); - ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), - conf.mgmtPubInterval, this, conf.workerThreads + 3); - managementAgent = management::ManagementAgent::getAgent (); - ((ManagementBroker*) managementAgent)->setInterval (conf.mgmtPubInterval); + managementAgent = managementAgentSingleton.getInstance(); + ((ManagementBroker*) managementAgent)->configure + (dataDir.isEnabled () ? dataDir.getPath () : string (), + conf.mgmtPubInterval, this, conf.workerThreads + 3); qpid::management::PackageQpid packageInitializer (managementAgent); System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ()); @@ -294,7 +295,6 @@ Broker::~Broker() { sasl_done(); #endif } - ManagementBroker::shutdown(); QPID_LOG(notice, "Shut down"); } diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index a8e5300718..ce0858f49b 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -149,6 +149,7 @@ class Broker : public sys::Runnable, public Plugin::Target, private: boost::shared_ptr<sys::Poller> poller; Options config; + management::ManagementAgent::Singleton managementAgentSingleton; std::vector< boost::shared_ptr<sys::ProtocolFactory> > protocolFactories; MessageStore* store; DataDir dataDir; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 61384638b3..5e85d3c89c 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -65,7 +65,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std if (parent != 0) { - ManagementAgent* agent = ManagementAgent::getAgent(); + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0) mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index e7de5615ff..fbfcaede82 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -36,7 +36,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) : { if (parent != 0) { - ManagementAgent* agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0) { mgmtExchange = new management::Exchange (agent, this, parent, _name, durable); @@ -51,7 +51,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel { if (parent != 0) { - ManagementAgent* agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0) { mgmtExchange = new management::Exchange (agent, this, parent, _name, durable); @@ -77,7 +77,7 @@ void Exchange::setPersistenceId(uint64_t id) const { if (mgmtExchange != 0 && persistenceId == 0) { - ManagementAgent* agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); agent->addObject (mgmtExchange, id, 2); } persistenceId = id; @@ -124,7 +124,7 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang { if (parent != 0) { - ManagementAgent* agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0) { ManagementObject* mo = queue->GetManagementObject(); diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 5c470040e2..9cbf86ebd3 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -59,7 +59,7 @@ Link::Link(LinkRegistry* _links, { if (parent != 0) { - ManagementAgent* agent = ManagementAgent::getAgent(); + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0) { mgmtObject = new management::Link(agent, this, parent, _host, _port, _useSsl, _durable); @@ -264,10 +264,9 @@ void Link::notifyConnectionForced(const string text) void Link::setPersistenceId(uint64_t id) const { - if (mgmtObject != 0 && persistenceId == 0) - { - ManagementAgent* agent = ManagementAgent::getAgent (); - agent->addObject (mgmtObject, id); + if (mgmtObject != 0 && persistenceId == 0) { + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + agent->addObject(mgmtObject, id); } persistenceId = id; } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 6d60f98505..bf64760fc7 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -67,7 +67,7 @@ Queue::Queue(const string& _name, bool _autodelete, { if (parent != 0) { - ManagementAgent* agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0) { @@ -576,7 +576,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const { if (mgmtObject != 0 && persistenceId == 0) { - ManagementAgent* agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); agent->addObject (mgmtObject, _persistenceId, 3); if (externalQueueStore) { diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 0a122fcae8..3cc509c904 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -62,7 +62,7 @@ SessionState::SessionState( { Manageable* parent = broker.GetVhostObject (); if (parent != 0) { - ManagementAgent* agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0) { mgmtObject = new management::Session (agent, this, parent, getId().getName()); mgmtObject->set_attached (0); diff --git a/cpp/src/qpid/broker/System.cpp b/cpp/src/qpid/broker/System.cpp index 5e51beac35..6c58339432 100644 --- a/cpp/src/qpid/broker/System.cpp +++ b/cpp/src/qpid/broker/System.cpp @@ -30,7 +30,7 @@ using namespace std; System::System (string _dataDir) : mgmtObject(0) { - ManagementAgent* agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0) { diff --git a/cpp/src/qpid/broker/Vhost.cpp b/cpp/src/qpid/broker/Vhost.cpp index 6c31628c5f..23203ec13e 100644 --- a/cpp/src/qpid/broker/Vhost.cpp +++ b/cpp/src/qpid/broker/Vhost.cpp @@ -27,7 +27,7 @@ Vhost::Vhost (management::Manageable* parentBroker) : mgmtObject(0) { if (parentBroker != 0) { - ManagementAgent* agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0) { diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 84e0c650f2..f66b34c43c 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -38,25 +38,83 @@ using namespace qpid::broker; using namespace qpid::sys; using namespace std; -ManagementAgent* ManagementBroker::agent; -bool ManagementBroker::enabled = 0; +Mutex ManagementAgent::Singleton::lock; +bool ManagementAgent::Singleton::disabled = false; +ManagementAgent* ManagementAgent::Singleton::agent = 0; +int ManagementAgent::Singleton::refCount = 0; + +ManagementAgent::Singleton::Singleton(bool disableManagement) +{ + Mutex::ScopedLock _lock(lock); + if (disableManagement && !disabled) { + disabled = true; + assert(refCount == 0); // can't disable after agent has been allocated + } + if (refCount == 0 && !disabled) + agent = new ManagementBroker(); + refCount++; +} + +ManagementAgent::Singleton::~Singleton() +{ + Mutex::ScopedLock _lock(lock); + refCount--; + if (refCount == 0 && !disabled) { + delete agent; + agent = 0; + } +} + +ManagementAgent* ManagementAgent::Singleton::getInstance() +{ + return agent; +} ManagementBroker::RemoteAgent::~RemoteAgent () { if (mgmtObject != 0) - mgmtObject->resourceDestroy (); + mgmtObject->resourceDestroy(); } -ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Manageable* _broker, int _threads) : - threadPoolSize(_threads), dataDir(_dataDir), interval(_interval), broker(_broker) +ManagementBroker::ManagementBroker () : + threadPoolSize(1), interval(10), broker(0) { - timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); localBank = 5; nextObjectId = 1; bootSequence = 1; nextRemoteBank = 10; nextRequestSequence = 1; clientWasAdded = false; +} + +ManagementBroker::~ManagementBroker () +{ + Mutex::ScopedLock lock (userLock); + + // Reset the shared pointers to exchanges. If this is not done now, the exchanges + // will stick around until dExchange and mExchange are implicitely destroyed (long + // after this destructor completes). Those exchanges hold references to management + // objects that will be invalid. + dExchange.reset(); + mExchange.reset(); + + moveNewObjectsLH(); + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) { + ManagementObject* object = iter->second; + delete object; + } + managementObjects.clear(); +} + +void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable* _broker, int _threads) +{ + dataDir = _dataDir; + interval = _interval; + broker = _broker; + threadPoolSize = _threads; + timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); // Get from file or generate and save to file. if (dataDir.empty ()) @@ -92,20 +150,6 @@ ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Managea } } -ManagementBroker::~ManagementBroker () -{ - Mutex::ScopedLock lock (userLock); - - moveNewObjectsLH(); - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) { - ManagementObject* object = iter->second; - delete object; - } - managementObjects.clear(); -} - void ManagementBroker::writeData () { string filename (dataDir + "/.mbrokerdata"); @@ -118,31 +162,6 @@ void ManagementBroker::writeData () } } -void ManagementBroker::enableManagement (string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize) -{ - enabled = 1; - if (agent == 0) - agent = new ManagementBroker (dataDir, interval, broker, threadPoolSize); -} - -ManagementAgent* ManagementAgent::getAgent (void) -{ - return ManagementBroker::agent; -} - -void ManagementBroker::shutdown (void) -{ - if (agent != 0) - { - ManagementBroker* broker = (ManagementBroker*) agent; - - broker->mExchange.reset (); - broker->dExchange.reset (); - delete agent; - agent = 0; - } -} - void ManagementBroker::setExchange (broker::Exchange::shared_ptr _mexchange, broker::Exchange::shared_ptr _dexchange) { diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h index 685b7db977..447720fb5e 100644 --- a/cpp/src/qpid/management/ManagementBroker.h +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -39,17 +39,14 @@ class ManagementBroker : public ManagementAgent { private: - ManagementBroker (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize); int threadPoolSize; public: + ManagementBroker (); virtual ~ManagementBroker (); - static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize); - static ManagementAgent* getAgent (void); - static void shutdown (void); - + void configure (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize); void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (broker::Exchange::shared_ptr mgmtExchange, broker::Exchange::shared_ptr directExchange); |