summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2008-04-04 18:14:42 +0000
committerKim van der Riet <kpvdr@apache.org>2008-04-04 18:14:42 +0000
commita2ea9d432dc5713dadd4c710a982cc466de3ea8b (patch)
treed7c955359c88b80b24c4f70146309a806511adda /cpp/src
parent2193d76646028d97b7bfff69335d4239954adbe5 (diff)
downloadqpid-python-a2ea9d432dc5713dadd4c710a982cc466de3ea8b.tar.gz
Patch from Ted Ross (see QPID-902): This patch contains the following improvements for management:\n1) Schema display cleaned up in the python mgmt-cli\n2) Locking added automatically to management object accessors (manual locking removed from broker/Queue.cpp)\n3) Schemas are now pre-registered with the management agent using a package initializer. This allows management consoles to get schema information for a class even if no instances of the class exist.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@644806 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp32
-rw-r--r--cpp/src/qpid/broker/Queue.cpp7
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp31
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h8
-rw-r--r--cpp/src/qpid/management/ManagementObject.h3
5 files changed, 43 insertions, 38 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index a183ce9d02..03036fb825 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -29,6 +29,7 @@
#include "NullMessageStore.h"
#include "RecoveryManagerImpl.h"
#include "TopicExchange.h"
+#include "qpid/management/PackageQpid.h"
#include "qpid/management/ManagementExchange.h"
#include "qpid/management/ArgsBrokerEcho.h"
@@ -112,25 +113,13 @@ Broker::Broker(const Broker::Options& conf) :
sessionManager(conf.ack),
previewSessionManager(conf.ack)
{
- // Early-Initialize plugins
- const Plugin::Plugins& plugins=Plugin::getPlugins();
- for (Plugin::Plugins::const_iterator i = plugins.begin();
- i != plugins.end();
- i++)
- (*i)->earlyInitialize(*this);
-
- // If no plugin store module registered itself, set up the null store.
- if (store == 0)
- setStore (new NullMessageStore (false));
-
- queues.setStore (store);
- dtxManager.setStore (store);
-
if(conf.enableMgmt){
+ QPID_LOG(info, "Management enabled");
ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
conf.mgmtPubInterval);
managementAgent = ManagementAgent::getAgent ();
managementAgent->setInterval (conf.mgmtPubInterval);
+ qpid::management::PackageQpid packageInitializer (managementAgent);
System* system = new System ();
systemObject = System::shared_ptr (system);
@@ -157,6 +146,20 @@ Broker::Broker(const Broker::Options& conf) :
exchanges.setParent (vhost);
}
+ // Early-Initialize plugins
+ const Plugin::Plugins& plugins=Plugin::getPlugins();
+ for (Plugin::Plugins::const_iterator i = plugins.begin();
+ i != plugins.end();
+ i++)
+ (*i)->earlyInitialize(*this);
+
+ // If no plugin store module registered itself, set up the null store.
+ if (store == 0)
+ setStore (new NullMessageStore (false));
+
+ queues.setStore (store);
+ dtxManager.setStore (store);
+
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
if (store != 0) {
@@ -172,7 +175,6 @@ Broker::Broker(const Broker::Options& conf) :
declareStandardExchange(amq_match, HeadersExchange::typeName);
if(conf.enableMgmt) {
- QPID_LOG(info, "Management enabled");
exchanges.declare(qpid_management, ManagementExchange::typeName);
Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index a405971805..24ed6825b4 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -111,7 +111,6 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete();
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
@@ -119,7 +118,6 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
}
}else {
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
@@ -138,7 +136,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
@@ -157,7 +154,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgTxnEnqueues ();
@@ -348,7 +344,6 @@ void Queue::consume(Consumer&, bool requestExclusive){
consumerCount++;
if (mgmtObject.get() != 0){
- Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_consumers ();
}
}
@@ -359,7 +354,6 @@ void Queue::cancel(Consumer& c){
consumerCount--;
if(exclusive) exclusive = false;
if (mgmtObject.get() != 0){
- Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->dec_consumers ();
}
}
@@ -390,7 +384,6 @@ void Queue::pop(){
if (policy.get()) policy->dequeued(msg.payload->contentSize());
if (mgmtObject.get() != 0){
- Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalDequeues ();
mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
mgmtObject->dec_msgDepth ();
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index a5ed84fb32..d6a91ab2c1 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -114,6 +114,16 @@ void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange,
dExchange = _dexchange;
}
+void ManagementAgent::RegisterClass (string packageName,
+ string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
+{
+ RWlock::ScopedWlock writeLock (userLock);
+ PackageMap::iterator pIter = FindOrAddPackage (packageName);
+ AddClassLocal (pIter, className, md5Sum, schemaCall);
+}
+
void ManagementAgent::addObject (ManagementObject::shared_ptr object,
uint64_t /*persistenceId*/,
uint64_t /*idOffset*/)
@@ -128,15 +138,6 @@ void ManagementAgent::addObject (ManagementObject::shared_ptr object,
object->setObjectId (objectId);
managementObjects[objectId] = object;
-
- // If we've already seen instances of this object type, we're done.
- if (!object->firstInstance ())
- return;
-
- // This is the first object of this type that we've seen, update the schema
- // inventory.
- PackageMap::iterator pIter = FindOrAddPackage (object->getPackageName ());
- AddClassLocal (pIter, object);
}
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
@@ -623,14 +624,16 @@ ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::st
return result.first;
}
-void ManagementAgent::AddClassLocal (PackageMap::iterator pIter,
- ManagementObject::shared_ptr object)
+void ManagementAgent::AddClassLocal (PackageMap::iterator pIter,
+ string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
- key.name = object->getClassName ();
- memcpy (&key.hash, object->getMd5Sum (), 16);
+ key.name = className;
+ memcpy (&key.hash, md5Sum, 16);
ClassMap::iterator cIter = cMap.find (key);
if (cIter != cMap.end ())
@@ -641,7 +644,7 @@ void ManagementAgent::AddClassLocal (PackageMap::iterator pIter,
key.name);
SchemaClass classInfo;
- classInfo.writeSchemaCall = object->getWriteSchemaCall ();
+ classInfo.writeSchemaCall = schemaCall;
cMap[key] = classInfo;
// TODO: Publish a class-indication message
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index f2cd0373c0..ac665eb50d 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -53,6 +53,10 @@ class ManagementAgent
void setInterval (uint16_t _interval) { interval = _interval; }
void setExchange (broker::Exchange::shared_ptr mgmtExchange,
broker::Exchange::shared_ptr directExchange);
+ void RegisterClass (std::string packageName,
+ std::string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
void addObject (ManagementObject::shared_ptr object,
uint64_t persistenceId = 0,
uint64_t idOffset = 10);
@@ -160,7 +164,9 @@ class ManagementAgent
PackageMap::iterator FindOrAddPackage (std::string name);
void AddClassLocal (PackageMap::iterator pIter,
- ManagementObject::shared_ptr object);
+ std::string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
void EncodePackageIndication (qpid::framing::Buffer& buf,
PackageMap::iterator pIter);
void EncodeClassIndication (qpid::framing::Buffer& buf,
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
index 23042ad988..ca8a0fd558 100644
--- a/cpp/src/qpid/management/ManagementObject.h
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -24,6 +24,7 @@
#include "Manageable.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/Mutex.h"
#include <qpid/framing/Buffer.h>
#include <boost/shared_ptr.hpp>
#include <map>
@@ -44,6 +45,7 @@ class ManagementObject
bool instChanged;
bool deleted;
Manageable* coreObject;
+ sys::RWlock accessLock;
static const uint8_t TYPE_U8 = 1;
static const uint8_t TYPE_U16 = 2;
@@ -84,7 +86,6 @@ class ManagementObject
virtual ~ManagementObject () {}
virtual writeSchemaCall_t getWriteSchemaCall (void) = 0;
- virtual bool firstInstance (void) = 0;
virtual void writeConfig (qpid::framing::Buffer& buf) = 0;
virtual void writeInstrumentation (qpid::framing::Buffer& buf,
bool skipHeaders = false) = 0;