diff options
author | Ted Ross <tross@apache.org> | 2008-06-30 19:00:49 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-06-30 19:00:49 +0000 |
commit | d2051d8e6910c4cbcd9c2ce2ef01089360f83e43 (patch) | |
tree | 14142fcee4c5aa5decfaf138f2d04e8d6f1b9651 /qpid/cpp/src/qpid | |
parent | 061d6a61e73c8d4e43a711e526d6586db9f54c01 (diff) | |
download | qpid-python-d2051d8e6910c4cbcd9c2ce2ef01089360f83e43.tar.gz |
QPID-1160 - Per-thread counters in management API to avoid locking
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@672864 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 21 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/System.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Vhost.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 0 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementBroker.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementBroker.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.h | 29 |
15 files changed, 72 insertions, 54 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 18b2c52dad..9274de0555 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -38,13 +38,17 @@ namespace broker { Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, const management::ArgsLinkBridge& _args) : link(_link), id(_id), args(_args), - mgmtObject(new management::Bridge(this, link, id, args.i_durable, args.i_src, args.i_dest, - args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, - args.i_tag, args.i_excludes)), listener(l), name(Uuid(true).str()), persistenceId(0) { - if (!args.i_durable) - management::ManagementAgent::getAgent()->addObject(mgmtObject); + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); + if (agent.get() != 0) { + mgmtObject = management::Bridge::shared_ptr + (new management::Bridge(agent.get(), this, link, id, args.i_durable, args.i_src, args.i_dest, + args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, + args.i_tag, args.i_excludes)); + if (!args.i_durable) + agent->addObject(mgmtObject); + } } Bridge::~Bridge() diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index a3dd93899a..0b7886b3ba 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -135,7 +135,7 @@ Broker::Broker(const Broker::Options& conf) : if(conf.enableMgmt){ QPID_LOG(info, "Management enabled"); ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), - conf.mgmtPubInterval, this); + conf.mgmtPubInterval, this, conf.workerThreads + 3); managementAgent = management::ManagementAgent::getAgent (); ((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval); qpid::management::PackageQpid packageInitializer (managementAgent); @@ -143,7 +143,7 @@ Broker::Broker(const Broker::Options& conf) : System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ()); systemObject = System::shared_ptr (system); - mgmtObject = management::Broker::shared_ptr (new management::Broker (this, system, conf.port)); + mgmtObject = management::Broker::shared_ptr (new management::Broker (managementAgent.get(), this, system, conf.port)); mgmtObject->set_workerThreads (conf.workerThreads); mgmtObject->set_maxConns (conf.maxConnections); mgmtObject->set_connBacklog (conf.connectionBacklog); diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index b6f6b9cee9..9e763f6775 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -65,7 +65,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); if (agent.get() != 0) - mgmtObject = management::Connection::shared_ptr(new management::Connection(this, parent, mgmtId, !isLink)); + mgmtObject = management::Connection::shared_ptr + (new management::Connection(agent.get(), this, parent, mgmtId, !isLink)); agent->addObject(mgmtObject); } } diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 30a93e338c..c72b148338 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -40,7 +40,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) : if (agent.get () != 0) { mgmtExchange = management::Exchange::shared_ptr - (new management::Exchange (this, parent, _name, durable)); + (new management::Exchange (agent.get(), this, parent, _name, durable)); agent->addObject (mgmtExchange); } } @@ -56,7 +56,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel if (agent.get () != 0) { mgmtExchange = management::Exchange::shared_ptr - (new management::Exchange (this, parent, _name, durable)); + (new management::Exchange (agent.get(), this, parent, _name, durable)); if (!durable) { if (name == "") agent->addObject (mgmtExchange, 4, 1); // Special default exchange ID @@ -134,7 +134,7 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang { uint64_t queueId = mo->getObjectId(); mgmtBinding = management::Binding::shared_ptr - (new management::Binding (this, (Manageable*) parent, queueId, key, args)); + (new management::Binding (agent.get(), this, (Manageable*) parent, queueId, key, args)); agent->addObject (mgmtBinding); } } diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 630ce68150..87c0020dcb 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -63,7 +63,7 @@ Link::Link(LinkRegistry* _links, if (agent.get() != 0) { mgmtObject = management::Link::shared_ptr - (new management::Link(this, parent, _host, _port, _useSsl, _durable)); + (new management::Link(agent.get(), this, parent, _host, _port, _useSsl, _durable)); if (!durable) agent->addObject(mgmtObject); } @@ -109,7 +109,8 @@ void Link::startConnectionLH () boost::bind (&Link::closed, this, _1, _2)); } catch(std::exception& e) { setStateLH(STATE_WAITING); - mgmtObject->set_lastError (e.what()); + if (mgmtObject.get() != 0) + mgmtObject->set_lastError (e.what()); } } @@ -141,7 +142,8 @@ void Link::closed (int, std::string text) if (state != STATE_FAILED) { setStateLH(STATE_WAITING); - mgmtObject->set_lastError (text); + if (mgmtObject.get() != 0) + mgmtObject->set_lastError (text); } if (closing) @@ -257,7 +259,8 @@ void Link::notifyConnectionForced(const string text) Mutex::ScopedLock mutex(lock); setStateLH(STATE_FAILED); - mgmtObject->set_lastError(text); + if (mgmtObject.get() != 0) + mgmtObject->set_lastError(text); } void Link::setPersistenceId(uint64_t id) const diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index becca8dfcf..40f249bc11 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -71,7 +71,7 @@ Queue::Queue(const string& _name, bool _autodelete, if (agent.get () != 0) { mgmtObject = management::Queue::shared_ptr - (new management::Queue (this, parent, _name, _store != 0, _autodelete, _owner != 0)); + (new management::Queue (agent.get(), this, parent, _name, _store != 0, _autodelete, _owner != 0)); // Add the object to the management agent only if this queue is not durable. // If it's durable, we will add it later when the queue is assigned a persistenceId. @@ -113,6 +113,7 @@ bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg) } void Queue::deliver(boost::intrusive_ptr<Message>& msg){ + if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); @@ -128,19 +129,15 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ // if no store then mark as enqueued if (!enqueue(0, msg)){ if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); } push(msg); msg->enqueueComplete(); }else { if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); } @@ -155,12 +152,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); } if (store && !msg->isContentLoaded()) { @@ -173,12 +168,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); if (msg->isPersistent ()) { mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); @@ -362,10 +355,8 @@ void Queue::consume(Consumer& c, bool requestExclusive){ } consumerCount++; - if (mgmtObject.get() != 0){ - Mutex::ScopedLock mutex(mgmtObject->getLock()); + if (mgmtObject.get() != 0) mgmtObject->inc_consumerCount (); - } } void Queue::cancel(Consumer& c){ @@ -373,10 +364,8 @@ void Queue::cancel(Consumer& c){ Mutex::ScopedLock locker(consumerLock); consumerCount--; if(exclusive) exclusive = 0; - if (mgmtObject.get() != 0){ - Mutex::ScopedLock mutex(mgmtObject->getLock()); + if (mgmtObject.get() != 0) mgmtObject->dec_consumerCount (); - } } QueuedMessage Queue::dequeue(){ @@ -413,10 +402,8 @@ void Queue::pop(){ if (policy.get()) policy->dequeued(msg.payload->contentSize()); if (mgmtObject.get() != 0){ - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); - mgmtObject->dec_msgDepth (); if (msg.payload->isPersistent ()){ mgmtObject->inc_msgPersistDequeues (); mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index d7089424a5..95145e5d0e 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -64,7 +64,7 @@ SessionState::SessionState( ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); if (agent.get () != 0) { mgmtObject = management::Session::shared_ptr - (new management::Session (this, parent, getId().getName())); + (new management::Session (agent.get(), this, parent, getId().getName())); mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); agent->addObject (mgmtObject); diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp index da886710ac..107942fab5 100644 --- a/qpid/cpp/src/qpid/broker/System.cpp +++ b/qpid/cpp/src/qpid/broker/System.cpp @@ -63,7 +63,7 @@ System::System (string _dataDir) } mgmtObject = management::System::shared_ptr - (new management::System (this, systemId)); + (new management::System (agent.get(), this, systemId)); struct utsname _uname; if (uname (&_uname) == 0) { diff --git a/qpid/cpp/src/qpid/broker/Vhost.cpp b/qpid/cpp/src/qpid/broker/Vhost.cpp index a809679d57..cfe497c788 100644 --- a/qpid/cpp/src/qpid/broker/Vhost.cpp +++ b/qpid/cpp/src/qpid/broker/Vhost.cpp @@ -32,7 +32,7 @@ Vhost::Vhost (management::Manageable* parentBroker) if (agent.get () != 0) { mgmtObject = management::Vhost::shared_ptr - (new management::Vhost (this, parentBroker, "/")); + (new management::Vhost (agent.get(), this, parentBroker, "/")); agent->addObject (mgmtObject, 3, 1); } } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp deleted file mode 100644 index e69de29bb2..0000000000 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ /dev/null diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index c38e273c49..c8a1b37823 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -37,6 +37,8 @@ class ManagementAgent static shared_ptr getAgent (void); + virtual int getMaxThreads() = 0; + virtual void RegisterClass (std::string packageName, std::string className, uint8_t* md5Sum, diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp index 24d18875b6..271a2ec73c 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp +++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp @@ -47,8 +47,8 @@ ManagementBroker::RemoteAgent::~RemoteAgent () mgmtObject->resourceDestroy (); } -ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Manageable* _broker) : - dataDir (_dataDir), interval (_interval), broker (_broker) +ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Manageable* _broker, int _threads) : + threadPoolSize(_threads), dataDir(_dataDir), interval(_interval), broker(_broker) { timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); localBank = 5; @@ -105,11 +105,11 @@ void ManagementBroker::writeData () } } -void ManagementBroker::enableManagement (string dataDir, uint16_t interval, Manageable* broker) +void ManagementBroker::enableManagement (string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize) { enabled = 1; if (agent.get () == 0) - agent = shared_ptr (new ManagementBroker (dataDir, interval, broker)); + agent = shared_ptr (new ManagementBroker (dataDir, interval, broker, threadPoolSize)); } ManagementAgent::shared_ptr ManagementAgent::getAgent (void) @@ -634,7 +634,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe RemoteAgent* agent = new RemoteAgent; agent->objIdBank = assignedBank; agent->mgmtObject = management::Agent::shared_ptr - (new management::Agent (agent)); + (new management::Agent (this, agent)); agent->mgmtObject->set_sessionId (sessionId); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementBroker.h index 7548773960..18d30096e5 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.h +++ b/qpid/cpp/src/qpid/management/ManagementBroker.h @@ -41,19 +41,21 @@ class ManagementBroker : public ManagementAgent { private: - ManagementBroker (std::string dataDir, uint16_t interval, Manageable* broker); + ManagementBroker (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize); + int threadPoolSize; public: virtual ~ManagementBroker (); - static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker); + static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize); static shared_ptr getAgent (void); static void shutdown (void); void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (broker::Exchange::shared_ptr mgmtExchange, broker::Exchange::shared_ptr directExchange); + int getMaxThreads () { return threadPoolSize; } void RegisterClass (std::string packageName, std::string className, uint8_t* md5Sum, diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index 68d7e5c886..2528ed4284 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -21,12 +21,15 @@ #include "Manageable.h" #include "ManagementObject.h" +#include "ManagementAgent.h" #include "qpid/framing/FieldTable.h" using namespace qpid::framing; using namespace qpid::management; using namespace qpid::sys; +int ManagementObject::nextThreadIndex = 0; + void ManagementObject::writeTimestamps (Buffer& buf) { buf.putShortString (getPackageName ()); @@ -40,3 +43,14 @@ void ManagementObject::writeTimestamps (Buffer& buf) void ManagementObject::setReference(uint64_t) {} +int ManagementObject::getThreadIndex() { + static __thread int thisIndex = -1; + if (thisIndex == -1) { + sys::Mutex::ScopedLock mutex(accessLock); + thisIndex = nextThreadIndex; + if (nextThreadIndex < agent->getMaxThreads() - 1) + nextThreadIndex++; + } + return thisIndex; +} + diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index cf2da13b09..732dd14a24 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -32,19 +32,22 @@ namespace qpid { namespace management { class Manageable; +class ManagementAgent; class ManagementObject { protected: - uint64_t createTime; - uint64_t destroyTime; - uint64_t objectId; - bool configChanged; - bool instChanged; - bool deleted; - Manageable* coreObject; - sys::Mutex accessLock; + uint64_t createTime; + uint64_t destroyTime; + uint64_t objectId; + bool configChanged; + bool instChanged; + bool deleted; + Manageable* coreObject; + sys::Mutex accessLock; + ManagementAgent* agent; + int maxThreads; static const uint8_t TYPE_U8 = 1; static const uint8_t TYPE_U16 = 2; @@ -73,15 +76,18 @@ class ManagementObject static const uint8_t FLAG_INDEX = 0x02; static const uint8_t FLAG_END = 0x80; + static int nextThreadIndex; + + int getThreadIndex(); void writeTimestamps (qpid::framing::Buffer& buf); public: typedef boost::shared_ptr<ManagementObject> shared_ptr; typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); - ManagementObject (Manageable* _core) : + ManagementObject (ManagementAgent* _agent, Manageable* _core) : destroyTime(0), objectId (0), configChanged(true), - instChanged(true), deleted(false), coreObject(_core) + instChanged(true), deleted(false), coreObject(_core), agent(_agent) { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } virtual ~ManagementObject () {} @@ -102,8 +108,7 @@ class ManagementObject uint64_t getObjectId (void) { return objectId; } inline bool getConfigChanged (void) { return configChanged; } virtual bool getInstChanged (void) { return instChanged; } - inline void setAllChanged (void) - { + inline void setAllChanged (void) { configChanged = true; instChanged = true; } |