summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-06-30 19:00:49 +0000
committerTed Ross <tross@apache.org>2008-06-30 19:00:49 +0000
commitd2051d8e6910c4cbcd9c2ce2ef01089360f83e43 (patch)
tree14142fcee4c5aa5decfaf138f2d04e8d6f1b9651 /qpid/cpp/src/qpid
parent061d6a61e73c8d4e43a711e526d6586db9f54c01 (diff)
downloadqpid-python-d2051d8e6910c4cbcd9c2ce2ef01089360f83e43.tar.gz
QPID-1160 - Per-thread counters in management API to avoid locking
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@672864 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp21
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/System.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Vhost.cpp2
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp0
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h2
-rw-r--r--qpid/cpp/src/qpid/management/ManagementBroker.cpp10
-rw-r--r--qpid/cpp/src/qpid/management/ManagementBroker.h6
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.cpp14
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.h29
15 files changed, 72 insertions, 54 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 18b2c52dad..9274de0555 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -38,13 +38,17 @@ namespace broker {
Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
const management::ArgsLinkBridge& _args) :
link(_link), id(_id), args(_args),
- mgmtObject(new management::Bridge(this, link, id, args.i_durable, args.i_src, args.i_dest,
- args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
- args.i_tag, args.i_excludes)),
listener(l), name(Uuid(true).str()), persistenceId(0)
{
- if (!args.i_durable)
- management::ManagementAgent::getAgent()->addObject(mgmtObject);
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent();
+ if (agent.get() != 0) {
+ mgmtObject = management::Bridge::shared_ptr
+ (new management::Bridge(agent.get(), this, link, id, args.i_durable, args.i_src, args.i_dest,
+ args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
+ args.i_tag, args.i_excludes));
+ if (!args.i_durable)
+ agent->addObject(mgmtObject);
+ }
}
Bridge::~Bridge()
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index a3dd93899a..0b7886b3ba 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -135,7 +135,7 @@ Broker::Broker(const Broker::Options& conf) :
if(conf.enableMgmt){
QPID_LOG(info, "Management enabled");
ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
- conf.mgmtPubInterval, this);
+ conf.mgmtPubInterval, this, conf.workerThreads + 3);
managementAgent = management::ManagementAgent::getAgent ();
((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval);
qpid::management::PackageQpid packageInitializer (managementAgent);
@@ -143,7 +143,7 @@ Broker::Broker(const Broker::Options& conf) :
System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ());
systemObject = System::shared_ptr (system);
- mgmtObject = management::Broker::shared_ptr (new management::Broker (this, system, conf.port));
+ mgmtObject = management::Broker::shared_ptr (new management::Broker (managementAgent.get(), this, system, conf.port));
mgmtObject->set_workerThreads (conf.workerThreads);
mgmtObject->set_maxConns (conf.maxConnections);
mgmtObject->set_connBacklog (conf.connectionBacklog);
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index b6f6b9cee9..9e763f6775 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -65,7 +65,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent();
if (agent.get() != 0)
- mgmtObject = management::Connection::shared_ptr(new management::Connection(this, parent, mgmtId, !isLink));
+ mgmtObject = management::Connection::shared_ptr
+ (new management::Connection(agent.get(), this, parent, mgmtId, !isLink));
agent->addObject(mgmtObject);
}
}
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index 30a93e338c..c72b148338 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -40,7 +40,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) :
if (agent.get () != 0)
{
mgmtExchange = management::Exchange::shared_ptr
- (new management::Exchange (this, parent, _name, durable));
+ (new management::Exchange (agent.get(), this, parent, _name, durable));
agent->addObject (mgmtExchange);
}
}
@@ -56,7 +56,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
if (agent.get () != 0)
{
mgmtExchange = management::Exchange::shared_ptr
- (new management::Exchange (this, parent, _name, durable));
+ (new management::Exchange (agent.get(), this, parent, _name, durable));
if (!durable) {
if (name == "")
agent->addObject (mgmtExchange, 4, 1); // Special default exchange ID
@@ -134,7 +134,7 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang
{
uint64_t queueId = mo->getObjectId();
mgmtBinding = management::Binding::shared_ptr
- (new management::Binding (this, (Manageable*) parent, queueId, key, args));
+ (new management::Binding (agent.get(), this, (Manageable*) parent, queueId, key, args));
agent->addObject (mgmtBinding);
}
}
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 630ce68150..87c0020dcb 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -63,7 +63,7 @@ Link::Link(LinkRegistry* _links,
if (agent.get() != 0)
{
mgmtObject = management::Link::shared_ptr
- (new management::Link(this, parent, _host, _port, _useSsl, _durable));
+ (new management::Link(agent.get(), this, parent, _host, _port, _useSsl, _durable));
if (!durable)
agent->addObject(mgmtObject);
}
@@ -109,7 +109,8 @@ void Link::startConnectionLH ()
boost::bind (&Link::closed, this, _1, _2));
} catch(std::exception& e) {
setStateLH(STATE_WAITING);
- mgmtObject->set_lastError (e.what());
+ if (mgmtObject.get() != 0)
+ mgmtObject->set_lastError (e.what());
}
}
@@ -141,7 +142,8 @@ void Link::closed (int, std::string text)
if (state != STATE_FAILED)
{
setStateLH(STATE_WAITING);
- mgmtObject->set_lastError (text);
+ if (mgmtObject.get() != 0)
+ mgmtObject->set_lastError (text);
}
if (closing)
@@ -257,7 +259,8 @@ void Link::notifyConnectionForced(const string text)
Mutex::ScopedLock mutex(lock);
setStateLH(STATE_FAILED);
- mgmtObject->set_lastError(text);
+ if (mgmtObject.get() != 0)
+ mgmtObject->set_lastError(text);
}
void Link::setPersistenceId(uint64_t id) const
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index becca8dfcf..40f249bc11 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -71,7 +71,7 @@ Queue::Queue(const string& _name, bool _autodelete,
if (agent.get () != 0)
{
mgmtObject = management::Queue::shared_ptr
- (new management::Queue (this, parent, _name, _store != 0, _autodelete, _owner != 0));
+ (new management::Queue (agent.get(), this, parent, _name, _store != 0, _autodelete, _owner != 0));
// Add the object to the management agent only if this queue is not durable.
// If it's durable, we will add it later when the queue is assigned a persistenceId.
@@ -113,6 +113,7 @@ bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg)
}
void Queue::deliver(boost::intrusive_ptr<Message>& msg){
+
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
@@ -128,19 +129,15 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
// if no store then mark as enqueued
if (!enqueue(0, msg)){
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- mgmtObject->inc_msgDepth ();
}
push(msg);
msg->enqueueComplete();
}else {
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- mgmtObject->inc_msgDepth ();
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
}
@@ -155,12 +152,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- mgmtObject->inc_msgDepth ();
}
if (store && !msg->isContentLoaded()) {
@@ -173,12 +168,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
- mgmtObject->inc_msgDepth ();
if (msg->isPersistent ()) {
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
@@ -362,10 +355,8 @@ void Queue::consume(Consumer& c, bool requestExclusive){
}
consumerCount++;
- if (mgmtObject.get() != 0){
- Mutex::ScopedLock mutex(mgmtObject->getLock());
+ if (mgmtObject.get() != 0)
mgmtObject->inc_consumerCount ();
- }
}
void Queue::cancel(Consumer& c){
@@ -373,10 +364,8 @@ void Queue::cancel(Consumer& c){
Mutex::ScopedLock locker(consumerLock);
consumerCount--;
if(exclusive) exclusive = 0;
- if (mgmtObject.get() != 0){
- Mutex::ScopedLock mutex(mgmtObject->getLock());
+ if (mgmtObject.get() != 0)
mgmtObject->dec_consumerCount ();
- }
}
QueuedMessage Queue::dequeue(){
@@ -413,10 +402,8 @@ void Queue::pop(){
if (policy.get()) policy->dequeued(msg.payload->contentSize());
if (mgmtObject.get() != 0){
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalDequeues ();
mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
- mgmtObject->dec_msgDepth ();
if (msg.payload->isPersistent ()){
mgmtObject->inc_msgPersistDequeues ();
mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index d7089424a5..95145e5d0e 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -64,7 +64,7 @@ SessionState::SessionState(
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
if (agent.get () != 0) {
mgmtObject = management::Session::shared_ptr
- (new management::Session (this, parent, getId().getName()));
+ (new management::Session (agent.get(), this, parent, getId().getName()));
mgmtObject->set_attached (0);
mgmtObject->set_detachedLifespan (0);
agent->addObject (mgmtObject);
diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp
index da886710ac..107942fab5 100644
--- a/qpid/cpp/src/qpid/broker/System.cpp
+++ b/qpid/cpp/src/qpid/broker/System.cpp
@@ -63,7 +63,7 @@ System::System (string _dataDir)
}
mgmtObject = management::System::shared_ptr
- (new management::System (this, systemId));
+ (new management::System (agent.get(), this, systemId));
struct utsname _uname;
if (uname (&_uname) == 0)
{
diff --git a/qpid/cpp/src/qpid/broker/Vhost.cpp b/qpid/cpp/src/qpid/broker/Vhost.cpp
index a809679d57..cfe497c788 100644
--- a/qpid/cpp/src/qpid/broker/Vhost.cpp
+++ b/qpid/cpp/src/qpid/broker/Vhost.cpp
@@ -32,7 +32,7 @@ Vhost::Vhost (management::Manageable* parentBroker)
if (agent.get () != 0)
{
mgmtObject = management::Vhost::shared_ptr
- (new management::Vhost (this, parentBroker, "/"));
+ (new management::Vhost (agent.get(), this, parentBroker, "/"));
agent->addObject (mgmtObject, 3, 1);
}
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
deleted file mode 100644
index e69de29bb2..0000000000
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ /dev/null
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index c38e273c49..c8a1b37823 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -37,6 +37,8 @@ class ManagementAgent
static shared_ptr getAgent (void);
+ virtual int getMaxThreads() = 0;
+
virtual void RegisterClass (std::string packageName,
std::string className,
uint8_t* md5Sum,
diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp
index 24d18875b6..271a2ec73c 100644
--- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp
@@ -47,8 +47,8 @@ ManagementBroker::RemoteAgent::~RemoteAgent ()
mgmtObject->resourceDestroy ();
}
-ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Manageable* _broker) :
- dataDir (_dataDir), interval (_interval), broker (_broker)
+ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Manageable* _broker, int _threads) :
+ threadPoolSize(_threads), dataDir(_dataDir), interval(_interval), broker(_broker)
{
timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
localBank = 5;
@@ -105,11 +105,11 @@ void ManagementBroker::writeData ()
}
}
-void ManagementBroker::enableManagement (string dataDir, uint16_t interval, Manageable* broker)
+void ManagementBroker::enableManagement (string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize)
{
enabled = 1;
if (agent.get () == 0)
- agent = shared_ptr (new ManagementBroker (dataDir, interval, broker));
+ agent = shared_ptr (new ManagementBroker (dataDir, interval, broker, threadPoolSize));
}
ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
@@ -634,7 +634,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
RemoteAgent* agent = new RemoteAgent;
agent->objIdBank = assignedBank;
agent->mgmtObject = management::Agent::shared_ptr
- (new management::Agent (agent));
+ (new management::Agent (this, agent));
agent->mgmtObject->set_sessionId (sessionId);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementBroker.h
index 7548773960..18d30096e5 100644
--- a/qpid/cpp/src/qpid/management/ManagementBroker.h
+++ b/qpid/cpp/src/qpid/management/ManagementBroker.h
@@ -41,19 +41,21 @@ class ManagementBroker : public ManagementAgent
{
private:
- ManagementBroker (std::string dataDir, uint16_t interval, Manageable* broker);
+ ManagementBroker (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize);
+ int threadPoolSize;
public:
virtual ~ManagementBroker ();
- static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker);
+ static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize);
static shared_ptr getAgent (void);
static void shutdown (void);
void setInterval (uint16_t _interval) { interval = _interval; }
void setExchange (broker::Exchange::shared_ptr mgmtExchange,
broker::Exchange::shared_ptr directExchange);
+ int getMaxThreads () { return threadPoolSize; }
void RegisterClass (std::string packageName,
std::string className,
uint8_t* md5Sum,
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp
index 68d7e5c886..2528ed4284 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp
@@ -21,12 +21,15 @@
#include "Manageable.h"
#include "ManagementObject.h"
+#include "ManagementAgent.h"
#include "qpid/framing/FieldTable.h"
using namespace qpid::framing;
using namespace qpid::management;
using namespace qpid::sys;
+int ManagementObject::nextThreadIndex = 0;
+
void ManagementObject::writeTimestamps (Buffer& buf)
{
buf.putShortString (getPackageName ());
@@ -40,3 +43,14 @@ void ManagementObject::writeTimestamps (Buffer& buf)
void ManagementObject::setReference(uint64_t) {}
+int ManagementObject::getThreadIndex() {
+ static __thread int thisIndex = -1;
+ if (thisIndex == -1) {
+ sys::Mutex::ScopedLock mutex(accessLock);
+ thisIndex = nextThreadIndex;
+ if (nextThreadIndex < agent->getMaxThreads() - 1)
+ nextThreadIndex++;
+ }
+ return thisIndex;
+}
+
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h
index cf2da13b09..732dd14a24 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.h
+++ b/qpid/cpp/src/qpid/management/ManagementObject.h
@@ -32,19 +32,22 @@ namespace qpid {
namespace management {
class Manageable;
+class ManagementAgent;
class ManagementObject
{
protected:
- uint64_t createTime;
- uint64_t destroyTime;
- uint64_t objectId;
- bool configChanged;
- bool instChanged;
- bool deleted;
- Manageable* coreObject;
- sys::Mutex accessLock;
+ uint64_t createTime;
+ uint64_t destroyTime;
+ uint64_t objectId;
+ bool configChanged;
+ bool instChanged;
+ bool deleted;
+ Manageable* coreObject;
+ sys::Mutex accessLock;
+ ManagementAgent* agent;
+ int maxThreads;
static const uint8_t TYPE_U8 = 1;
static const uint8_t TYPE_U16 = 2;
@@ -73,15 +76,18 @@ class ManagementObject
static const uint8_t FLAG_INDEX = 0x02;
static const uint8_t FLAG_END = 0x80;
+ static int nextThreadIndex;
+
+ int getThreadIndex();
void writeTimestamps (qpid::framing::Buffer& buf);
public:
typedef boost::shared_ptr<ManagementObject> shared_ptr;
typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
- ManagementObject (Manageable* _core) :
+ ManagementObject (ManagementAgent* _agent, Manageable* _core) :
destroyTime(0), objectId (0), configChanged(true),
- instChanged(true), deleted(false), coreObject(_core)
+ instChanged(true), deleted(false), coreObject(_core), agent(_agent)
{ createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); }
virtual ~ManagementObject () {}
@@ -102,8 +108,7 @@ class ManagementObject
uint64_t getObjectId (void) { return objectId; }
inline bool getConfigChanged (void) { return configChanged; }
virtual bool getInstChanged (void) { return instChanged; }
- inline void setAllChanged (void)
- {
+ inline void setAllChanged (void) {
configChanged = true;
instChanged = true;
}