diff options
author | Ted Ross <tross@apache.org> | 2009-05-11 14:16:52 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-05-11 14:16:52 +0000 |
commit | 690e5d617c9aad8550905cf30a8380778c0f9e7a (patch) | |
tree | 855a30b9bee8760b083813a6166c8d1e981ad738 /qpid | |
parent | 44e07189ca7b56b4f051035fddfffc948beeb162 (diff) | |
download | qpid-python-690e5d617c9aad8550905cf30a8380778c0f9e7a.tar.gz |
QPID-1843 - Cleaned up the interface to the broker's internal management agent.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@773570 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
47 files changed, 304 insertions, 288 deletions
diff --git a/qpid/cpp/managementgen/qmf-gen b/qpid/cpp/managementgen/qmf-gen index c6cfca5f83..ebc07137ae 100755 --- a/qpid/cpp/managementgen/qmf-gen +++ b/qpid/cpp/managementgen/qmf-gen @@ -45,6 +45,8 @@ parser.add_option("-p", "--gen-prefix", dest="genprefix", default="", help="Prefix for generated files in make dependencies") parser.add_option("-q", "--qpid-broker", dest="qpidbroker", default=False, action="store_true", help="Generate makefile for Qpid broker") +parser.add_option("-b", "--broker-plugin", dest="brokerplugin", default=False, action="store_true", + help="Generate code for use in a qpid broker plugin") (opts, args) = parser.parse_args() @@ -57,17 +59,23 @@ if len(args) == 0: print "no input files" parser.exit() +vargs = {} +if opts.brokerplugin: + vargs["agentHeaderDir"] = "management" +else: + vargs["agentHeaderDir"] = "agent" + for schemafile in args: package = SchemaPackage(typefile, schemafile, opts) gen.setPackage (package.packageName) - gen.makeClassFiles ("Class.h", package) - gen.makeClassFiles ("Class.cpp", package) - gen.makeMethodFiles ("Args.h", package) - gen.makeEventFiles ("Event.h", package) - gen.makeEventFiles ("Event.cpp", package) - gen.makePackageFile ("Package.h", package) - gen.makePackageFile ("Package.cpp", package) + gen.makeClassFiles ("Class.h", package, vars=vargs) + gen.makeClassFiles ("Class.cpp", package, vars=vargs) + gen.makeMethodFiles ("Args.h", package, vars=vargs) + gen.makeEventFiles ("Event.h", package, vars=vargs) + gen.makeEventFiles ("Event.cpp", package, vars=vargs) + gen.makePackageFile ("Package.h", package, vars=vargs) + gen.makePackageFile ("Package.cpp", package, vars=vargs) if opts.makefile != None: args = {} diff --git a/qpid/cpp/managementgen/qmfgen/generate.py b/qpid/cpp/managementgen/qmfgen/generate.py index 255d41ea0e..7173c2faa1 100755 --- a/qpid/cpp/managementgen/qmfgen/generate.py +++ b/qpid/cpp/managementgen/qmfgen/generate.py @@ -388,30 +388,39 @@ class Generator: def setVariable (self, key, value): self.variables[key] = value - def makeClassFiles (self, templateFile, schema, force=False): + def makeClassFiles (self, templateFile, schema, force=False, vars=None): """ Generate an expanded template per schema class """ classes = schema.getClasses () template = Template (self.input + templateFile, self) + if vars: + for arg in vars: + self.setVariable(arg, vars[arg]) self.templateFiles.append (templateFile) for _class in classes: target = self.targetClassFile (_class, templateFile) stream = template.expand (_class) self.writeIfChanged (stream, target, force) - def makeEventFiles (self, templateFile, schema, force=False): + def makeEventFiles (self, templateFile, schema, force=False, vars=None): """ Generate an expanded template per schema event """ events = schema.getEvents() template = Template (self.input + templateFile, self) + if vars: + for arg in vars: + self.setVariable(arg, vars[arg]) self.templateFiles.append (templateFile) for event in events: target = self.targetEventFile(event, templateFile) stream = template.expand(event) self.writeIfChanged(stream, target, force) - def makeMethodFiles (self, templateFile, schema, force=False): + def makeMethodFiles (self, templateFile, schema, force=False, vars=None): """ Generate an expanded template per method-with-arguments """ classes = schema.getClasses () template = Template (self.input + templateFile, self) + if vars: + for arg in vars: + self.setVariable(arg, vars[arg]) self.templateFiles.append (templateFile) for _class in classes: methods = _class.getMethods () @@ -421,9 +430,12 @@ class Generator: stream = template.expand (method) self.writeIfChanged (stream, target, force) - def makePackageFile (self, templateFile, schema, force=False): + def makePackageFile (self, templateFile, schema, force=False, vars=None): """ Generate a package-specific file """ template = Template (self.input + templateFile, self) + if vars: + for arg in vars: + self.setVariable(arg, vars[arg]) self.templateFiles.append (templateFile) target = self.targetPackageFile (schema, templateFile) stream = template.expand (schema) diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py index 69823d6de0..3b53830c69 100755 --- a/qpid/cpp/managementgen/qmfgen/schema.py +++ b/qpid/cpp/managementgen/qmfgen/schema.py @@ -754,6 +754,9 @@ class SchemaEvent: def getFullName (self): return capitalize(self.package + capitalize(self.name)) + def genAgentHeaderLocation (self, stream, variables): + stream.write(variables["agentHeaderDir"]) + def getArgCount (self): return len (self.args) @@ -954,6 +957,9 @@ class SchemaClass: if inst.assign == None: inst.genAccessor (stream) + def genAgentHeaderLocation (self, stream, variables): + stream.write(variables["agentHeaderDir"]) + def genCloseNamespaces (self, stream, variables): for item in self.packageName.split("."): stream.write ("}") @@ -1258,6 +1264,9 @@ class SchemaPackage: def getEvents(self): return self.events + def genAgentHeaderLocation (self, stream, variables): + stream.write(variables["agentHeaderDir"]) + def genCloseNamespaces (self, stream, variables): for item in self.packageName.split("."): stream.write ("}") diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp b/qpid/cpp/managementgen/qmfgen/templates/Class.cpp index 247e1090ff..973d92586a 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp +++ b/qpid/cpp/managementgen/qmfgen/templates/Class.cpp @@ -23,7 +23,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" -#include "qpid/agent/ManagementAgent.h" +#include "qpid//*MGEN:Class.AgentHeaderLocation*//ManagementAgent.h" #include "/*MGEN:Class.NameCap*/.h" /*MGEN:Class.MethodArgIncludes*/ @@ -40,8 +40,8 @@ string /*MGEN:Class.NameCap*/::className = string ("/*MGEN:Class.NameLower*/ uint8_t /*MGEN:Class.NameCap*/::md5Sum[16] = {/*MGEN:Class.SchemaMD5*/}; -/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (ManagementAgent* _agent, Manageable* _core/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/) : - ManagementObject(_agent, _core)/*MGEN:Class.ConstructorInits*/ +/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (ManagementAgent*, Manageable* _core/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/) : + ManagementObject(_core)/*MGEN:Class.ConstructorInits*/ { /*MGEN:Class.ParentRefAssignment*/ /*MGEN:Class.InitializeElements*/ @@ -51,7 +51,6 @@ uint8_t /*MGEN:Class.NameCap*/::md5Sum[16] = presenceMask[idx] = 0; /*MGEN:ENDIF*/ /*MGEN:IF(Class.ExistPerThreadStats)*/ - maxThreads = agent->getMaxThreads(); perThreadStatsArray = new struct PerThreadStats*[maxThreads]; for (int idx = 0; idx < maxThreads; idx++) perThreadStatsArray[idx] = 0; diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h index 0bf9911895..225090f0a9 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Class.h +++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h @@ -27,6 +27,12 @@ #include "qpid/framing/FieldTable.h" #include "qpid/framing/Uuid.h" +namespace qpid { + namespace management { + class ManagementAgent; + } +} + namespace qmf { /*MGEN:Class.OpenNamespaces*/ diff --git a/qpid/cpp/managementgen/qmfgen/templates/Event.cpp b/qpid/cpp/managementgen/qmfgen/templates/Event.cpp index cdb40c6d79..2ffec8bcdf 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Event.cpp +++ b/qpid/cpp/managementgen/qmfgen/templates/Event.cpp @@ -23,7 +23,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" -#include "qpid/agent/ManagementAgent.h" +#include "qpid//*MGEN:Event.AgentHeaderLocation*//ManagementAgent.h" #include "Event/*MGEN:Event.NameCap*/.h" using namespace qmf::/*MGEN:Event.Namespace*/; diff --git a/qpid/cpp/managementgen/qmfgen/templates/Package.h b/qpid/cpp/managementgen/qmfgen/templates/Package.h index 0ad7060b9e..569c7cfb33 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Package.h +++ b/qpid/cpp/managementgen/qmfgen/templates/Package.h @@ -23,7 +23,7 @@ /*MGEN:Root.Disclaimer*/ -#include "qpid/agent/ManagementAgent.h" +#include "qpid//*MGEN:Class.AgentHeaderLocation*//ManagementAgent.h" namespace qmf { /*MGEN:Class.OpenNamespaces*/ diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 4d2d375802..63ca7009d9 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -87,7 +87,7 @@ $(rgen_generator): # Management generator. mgen_dir=$(top_srcdir)/managementgen -mgen_cmd=$(mgen_dir)/qmf-gen -m $(srcdir)/managementgen.mk -q -o gen/qmf \ +mgen_cmd=$(mgen_dir)/qmf-gen -m $(srcdir)/managementgen.mk -q -b -o gen/qmf \ $(top_srcdir)/../specs/management-schema.xml \ $(srcdir)/qpid/acl/management-schema.xml \ $(srcdir)/qpid/cluster/management-schema.xml @@ -427,7 +427,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/TxBuffer.cpp \ qpid/broker/TxPublish.cpp \ qpid/broker/Vhost.cpp \ - qpid/management/ManagementBroker.cpp \ + qpid/management/ManagementAgent.cpp \ qpid/management/ManagementExchange.cpp \ qpid/sys/TCPIOPlugin.cpp @@ -679,7 +679,7 @@ nobase_include_HEADERS = \ qpid/management/Args.h \ qpid/management/IdAllocator.h \ qpid/management/Manageable.h \ - qpid/management/ManagementBroker.h \ + qpid/management/ManagementAgent.h \ qpid/management/ManagementEvent.h \ qpid/management/ManagementExchange.h \ qpid/management/ManagementObject.h \ diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp index 8c128e7bb9..fe2644c136 100644 --- a/qpid/cpp/src/qpid/acl/Acl.cpp +++ b/qpid/cpp/src/qpid/acl/Acl.cpp @@ -46,7 +46,7 @@ namespace _qmf = qmf::org::apache::qpid::acl; Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false) { - agent = ManagementAgent::Singleton::getInstance(); + agent = broker->getManagementAgent(); if (agent != 0){ _qmf::Package packageInit(agent); diff --git a/qpid/cpp/src/qpid/acl/Acl.h b/qpid/cpp/src/qpid/acl/Acl.h index 7770843e87..e153187b3d 100644 --- a/qpid/cpp/src/qpid/acl/Acl.h +++ b/qpid/cpp/src/qpid/acl/Acl.h @@ -26,7 +26,7 @@ #include "qpid/RefCounted.h" #include "qpid/broker/AclModule.h" #include "qpid/management/Manageable.h" -#include "qpid/agent/ManagementAgent.h" +#include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/acl/Acl.h" #include <map> diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 4d275b958f..e629a20e87 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -24,7 +24,7 @@ #include "LinkRegistry.h" #include "SessionState.h" -#include "qpid/agent/ManagementAgent.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" @@ -64,7 +64,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, std::stringstream title; title << id << "_" << link->getBroker()->getFederationTag(); queueName += title.str(); - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = link->getBroker()->getManagementAgent(); if (agent != 0) { mgmtObject = new _qmf::Bridge (agent, this, link, id, args.i_durable, args.i_src, args.i_dest, @@ -181,7 +181,7 @@ void Bridge::destroy() void Bridge::setPersistenceId(uint64_t pId) const { if (mgmtObject != 0 && persistenceId == 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = link->getBroker()->getManagementAgent(); agent->addObject (mgmtObject, pId); } persistenceId = pId; diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index c43eca6e5b..749489fbfd 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -65,7 +65,7 @@ using qpid::sys::Dispatcher; using qpid::sys::Thread; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; -using qpid::management::ManagementBroker; +using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -136,10 +136,11 @@ const std::string knownHostsNone("none"); Broker::Broker(const Broker::Options& conf) : poller(new Poller), config(conf), - managementAgentSingleton(!config.enableMgmt), store(0), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), + queues(this), + exchanges(this), links(this), factory(new SecureConnectionFactory(*this)), dtxManager(timer), @@ -148,6 +149,7 @@ Broker::Broker(const Broker::Options& conf) : conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), + managementAgent(conf.enableMgmt ? new ManagementAgent() : 0), queueCleaner(queues, timer), queueEvents(poller), recovery(true), @@ -156,13 +158,11 @@ Broker::Broker(const Broker::Options& conf) : { if (conf.enableMgmt) { QPID_LOG(info, "Management enabled"); - managementAgent = managementAgentSingleton.getInstance(); - ((ManagementBroker*) managementAgent)->configure - (dataDir.isEnabled() ? dataDir.getPath() : string(), - conf.mgmtPubInterval, this, conf.workerThreads + 3); + managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), + conf.mgmtPubInterval, this, conf.workerThreads + 3); _qmf::Package packageInitializer(managementAgent); - System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string()); + System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this); systemObject = System::shared_ptr(system); mgmtObject = new _qmf::Broker(managementAgent, this, system, conf.port); @@ -182,9 +182,9 @@ Broker::Broker(const Broker::Options& conf) : // Since there is currently no support for virtual hosts, a placeholder object // representing the implied single virtual host is added here to keep the // management schema correct. - Vhost* vhost = new Vhost(this); + Vhost* vhost = new Vhost(this, this); vhostObject = Vhost::shared_ptr(vhost); - framing::Uuid uuid(((ManagementBroker*) managementAgent)->getUuid()); + framing::Uuid uuid(managementAgent->getUuid()); federationTag = uuid.str(); vhostObject->setFederationTag(federationTag); @@ -238,9 +238,8 @@ Broker::Broker(const Broker::Options& conf) : exchanges.declare(qpid_management, ManagementExchange::typeName); Exchange::shared_ptr mExchange = exchanges.get (qpid_management); Exchange::shared_ptr dExchange = exchanges.get (amq_direct); - ((ManagementBroker*) managementAgent)->setExchange (mExchange, dExchange); - boost::dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent - ((ManagementBroker*) managementAgent); + managementAgent->setExchange(mExchange, dExchange); + boost::dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent(managementAgent); } else QPID_LOG(info, "Management not enabled"); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 5a1529a3ba..8f4621bb39 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -39,7 +39,7 @@ #include "Timer.h" #include "ExpiryPolicy.h" #include "qpid/management/Manageable.h" -#include "qpid/management/ManagementBroker.h" +#include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/broker/Broker.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h" #include "qpid/Options.h" @@ -120,7 +120,6 @@ public: boost::shared_ptr<sys::Poller> poller; Options config; - management::ManagementAgent::Singleton managementAgentSingleton; ProtocolFactoryMap protocolFactories; std::auto_ptr<MessageStore> store; AclModule* acl; @@ -235,6 +234,8 @@ public: void setRecovery(bool set) { recovery = set; } bool getRecovery() const { return recovery; } + + management::ManagementAgent* getManagementAgent() { return managementAgent; } }; }} diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 365b3ccbeb..22188054a6 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -67,7 +67,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std if (parent != 0) { - agent = ManagementAgent::Singleton::getInstance(); + agent = broker_.getManagementAgent(); // TODO set last bool true if system connection diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index e67cdce681..770bf2184f 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -40,7 +40,7 @@ #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/ProtocolVersion.h" -#include "qpid/agent/ManagementAgent.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/management/Manageable.h" #include "qpid/ptr_map.h" #include "qpid/sys/AggregateOutput.h" diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index d1d9ad07e4..deb9699c96 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -41,15 +41,15 @@ const std::string fedOpReorigin("R"); const std::string fedOpHello("H"); } -DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) +DirectExchange::DirectExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type(typeName); } DirectExchange::DirectExchange(const string& _name, bool _durable, - const FieldTable& _args, Manageable* _parent) : - Exchange(_name, _durable, _args, _parent) + const FieldTable& _args, Manageable* _parent, Broker* b) : + Exchange(_name, _durable, _args, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type(typeName); diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.h b/qpid/cpp/src/qpid/broker/DirectExchange.h index 27d101c4fe..9081c319c0 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.h +++ b/qpid/cpp/src/qpid/broker/DirectExchange.h @@ -46,11 +46,11 @@ public: static const std::string typeName; QPID_BROKER_EXTERN DirectExchange(const std::string& name, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN DirectExchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index dd1fe98b2c..acedd1f91a 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -21,8 +21,8 @@ #include "Exchange.h" #include "ExchangeRegistry.h" -#include "qpid/agent/ManagementAgent.h" -#include "qpid/management/ManagementBroker.h" +#include "Broker.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/log/Statement.h" #include "qpid/framing/MessageProperties.h" #include "DeliverableMessage.h" @@ -33,7 +33,6 @@ using qpid::framing::Buffer; using qpid::framing::FieldTable; using qpid::sys::Mutex; using qpid::management::ManagementAgent; -using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -83,13 +82,13 @@ void Exchange::routeIVE(){ } -Exchange::Exchange (const string& _name, Manageable* parent) : - name(_name), durable(false), persistenceId(0), sequence(false), - sequenceNo(0), ive(false), mgmtExchange(0) +Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : + name(_name), durable(false), persistenceId(0), sequence(false), + sequenceNo(0), ive(false), mgmtExchange(0), broker(b) { - if (parent != 0) + if (parent != 0 && broker != 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable); @@ -101,13 +100,13 @@ Exchange::Exchange (const string& _name, Manageable* parent) : static const std::string QPID_MANAGEMENT("qpid.management"); Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, - Manageable* parent) + Manageable* parent, Broker* b) : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), - args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0) + args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b) { - if (parent != 0) + if (parent != 0 && broker != 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable); @@ -118,8 +117,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel } else if (name == QPID_MANAGEMENT) { agent->addObject (mgmtExchange, 0x1000000000000005LL); // Special management exchange ID } else { - ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); - agent->addObject (mgmtExchange, mb ? mb->allocateId(this) : 0); + agent->addObject (mgmtExchange, agent->allocateId(this)); } } } @@ -145,7 +143,7 @@ void Exchange::setPersistenceId(uint64_t id) const { if (mgmtExchange != 0 && persistenceId == 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); agent->addObject (mgmtExchange, 0x2000000000000000LL + id); } persistenceId = id; @@ -240,20 +238,22 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang { if (parent != 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); - if (agent != 0) - { - ManagementObject* mo = queue->GetManagementObject(); - if (mo != 0) - { - management::ObjectId queueId = mo->getObjectId(); - mgmtBinding = new _qmf::Binding - (agent, this, (Manageable*) parent, queueId, key, args); - if (!origin.empty()) - mgmtBinding->set_origin(origin); - ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); - agent->addObject (mgmtBinding, mb ? mb->allocateId(this) : 0); - } + Broker* broker = parent->getBroker(); + if (broker != 0) { + ManagementAgent* agent = broker->getManagementAgent(); + if (agent != 0) + { + ManagementObject* mo = queue->GetManagementObject(); + if (mo != 0) + { + management::ObjectId queueId = mo->getObjectId(); + mgmtBinding = new _qmf::Binding + (agent, this, (Manageable*) parent, queueId, key, args); + if (!origin.empty()) + mgmtBinding->set_origin(origin); + agent->addObject (mgmtBinding, agent->allocateId(this)); + } + } } } } diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index 47c0bdb3af..e33c0c6bbc 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -121,9 +121,10 @@ protected: public: typedef boost::shared_ptr<Exchange> shared_ptr; - QPID_BROKER_EXTERN explicit Exchange(const std::string& name, management::Manageable* parent = 0); + QPID_BROKER_EXTERN explicit Exchange(const std::string& name, management::Manageable* parent = 0, + Broker* broker = 0); QPID_BROKER_EXTERN Exchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN virtual ~Exchange(); const std::string& getName() const { return name; } @@ -167,10 +168,12 @@ public: void registerDynamicBridge(DynamicBridge* db); void removeDynamicBridge(DynamicBridge* db); virtual bool supportsDynamicBinding() { return false; } + Broker* getBroker() const { return broker; } protected: qpid::sys::Mutex bridgeLock; std::vector<DynamicBridge*> bridgeVector; + Broker* broker; QPID_BROKER_EXTERN virtual void handleHelloRequest(); void propagateFedOp(const std::string& routingKey, const std::string& tags, diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp index bb0eec34ba..85bd65e456 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -45,15 +45,15 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c Exchange::shared_ptr exchange; if(type == TopicExchange::typeName){ - exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent)); + exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker)); }else if(type == DirectExchange::typeName){ - exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent)); + exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker)); }else if(type == FanOutExchange::typeName){ - exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent)); + exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker)); }else if (type == HeadersExchange::typeName) { - exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent)); + exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker)); }else if (type == ManagementExchange::typeName) { - exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent)); + exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent, broker)); } else{ FunctionMap::iterator i = factory.find(type); diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h index 9edd54f025..34ee173a91 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h @@ -45,7 +45,7 @@ class ExchangeRegistry{ typedef boost::function4<Exchange::shared_ptr, const std::string&, bool, const qpid::framing::FieldTable&, qpid::management::Manageable*> FactoryFunction; - ExchangeRegistry () : parent(0) {} + ExchangeRegistry (Broker* b = 0) : parent(0), broker(b) {} QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare (const std::string& name, const std::string& type); QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare @@ -84,7 +84,7 @@ class ExchangeRegistry{ FunctionMap factory; mutable qpid::sys::RWlock lock; management::Manageable* parent; - + Broker* broker; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index aa1f7ff30a..dc3bda4262 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -38,16 +38,16 @@ const std::string fedOpReorigin("R"); const std::string fedOpHello("H"); } -FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) : - Exchange(_name, _parent) +FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent, Broker* b) : + Exchange(_name, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, - const FieldTable& _args, Manageable* _parent) : - Exchange(_name, _durable, _args, _parent) + const FieldTable& _args, Manageable* _parent, Broker* b) : + Exchange(_name, _durable, _args, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.h b/qpid/cpp/src/qpid/broker/FanOutExchange.h index edfc4395f4..32da9fe5b5 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.h +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.h @@ -40,11 +40,11 @@ class FanOutExchange : public virtual Exchange { static const std::string typeName; QPID_BROKER_EXTERN FanOutExchange(const std::string& name, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN FanOutExchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index 09fb2d9bef..4b1176d560 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -43,16 +43,16 @@ namespace { const std::string empty; } -HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) : - Exchange(_name, _parent) +HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent, Broker* b) : + Exchange(_name, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, - const FieldTable& _args, Manageable* _parent) : - Exchange(_name, _durable, _args, _parent) + const FieldTable& _args, Manageable* _parent, Broker* b) : + Exchange(_name, _durable, _args, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h index 2b01f9ecae..87633c0f0e 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.h +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h @@ -61,11 +61,11 @@ class HeadersExchange : public virtual Exchange { static const std::string typeName; QPID_BROKER_EXTERN HeadersExchange(const string& name, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN HeadersExchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index dd1a1fa0b4..a2717bfd4c 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -68,9 +68,9 @@ Link::Link(LinkRegistry* _links, connection(0), agent(0) { - if (parent != 0) + if (parent != 0 && broker != 0) { - agent = ManagementAgent::Singleton::getInstance(); + agent = broker->getManagementAgent(); if (agent != 0) { mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable); @@ -347,7 +347,7 @@ void Link::notifyConnectionForced(const string text) void Link::setPersistenceId(uint64_t id) const { if (mgmtObject != 0 && persistenceId == 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); agent->addObject(mgmtObject, id); } persistenceId = id; diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index 39014b0ec0..0b504651eb 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -30,7 +30,7 @@ #include "qpid/sys/Mutex.h" #include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" -#include "qpid/agent/ManagementAgent.h" +#include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/broker/Link.h" #include <boost/ptr_container/ptr_vector.hpp> diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index aa0cd8ca31..6930275361 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -30,7 +30,7 @@ #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" -#include "qpid/management/ManagementBroker.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" @@ -48,7 +48,6 @@ using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; using qpid::management::ManagementAgent; -using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -80,7 +79,8 @@ const int ENQUEUE_AND_DEQUEUE=2; Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, - Manageable* parent) : + Manageable* parent, + Broker* b) : name(_name), autodelete(_autodelete), @@ -98,11 +98,12 @@ Queue::Queue(const string& _name, bool _autodelete, mgmtObject(0), eventMode(0), eventMgr(0), - insertSeqNo(0) + insertSeqNo(0), + broker(b) { - if (parent != 0) + if (parent != 0 && broker != 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { @@ -111,8 +112,7 @@ Queue::Queue(const string& _name, bool _autodelete, // Add the object to the management agent only if this queue is not durable. // If it's durable, we will add it later when the queue is assigned a persistenceId. if (store == 0) { - ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); - agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0); + agent->addObject (mgmtObject, agent->allocateId(this)); } } } @@ -838,7 +838,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const { if (mgmtObject != 0 && persistenceId == 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); agent->addObject (mgmtObject, 0x3000000000000000LL + _persistenceId); if (externalQueueStore) { diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index c5ef9a9307..0d5f2043d1 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -105,6 +105,7 @@ namespace qpid { QueueEvents* eventMgr; bool insertSeqNo; std::string seqNoKey; + Broker* broker; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -158,7 +159,8 @@ namespace qpid { bool autodelete = false, MessageStore* const store = 0, const OwnershipToken* const owner = 0, - management::Manageable* parent = 0); + management::Manageable* parent = 0, + Broker* broker = 0); QPID_BROKER_EXTERN ~Queue(); QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr); diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index d079e543c4..60182e1ead 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -27,8 +27,8 @@ using namespace qpid::broker; using namespace qpid::sys; -QueueRegistry::QueueRegistry() : - counter(1), store(0), events(0), parent(0), lastNode(false) {} +QueueRegistry::QueueRegistry(Broker* b) : + counter(1), store(0), events(0), parent(0), lastNode(false), broker(b) {} QueueRegistry::~QueueRegistry(){} @@ -42,7 +42,7 @@ QueueRegistry::declare(const string& declareName, bool durable, QueueMap::iterator i = queues.find(name); if (i == queues.end()) { - Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent)); + Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker)); queues[name] = queue; if (lastNode) queue->setLastNodeFailure(); if (events) queue->setQueueEventManager(*events); diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h index 3c02afedc4..a4ea65f18c 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.h +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h @@ -43,7 +43,7 @@ class QueueEvents; */ class QueueRegistry { public: - QPID_BROKER_EXTERN QueueRegistry(); + QPID_BROKER_EXTERN QueueRegistry(Broker* b = 0); QPID_BROKER_EXTERN ~QueueRegistry(); /** @@ -131,6 +131,7 @@ private: QueueEvents* events; management::Manageable* parent; bool lastNode; //used to set mode on queue declare + Broker* broker; //destroy impl that assumes lock is already held: void destroyLH (const string& name); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 96c47085f0..0ddd546a68 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -23,7 +23,7 @@ #include "qpid/framing/enum.h" #include "qpid/log/Statement.h" #include "qpid/framing/SequenceSet.h" -#include "qpid/agent/ManagementAgent.h" +#include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" @@ -98,7 +98,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const checkAlternate(response.first, alternate); } - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type, alternateExchange, durable, false, args, @@ -140,7 +140,7 @@ void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifU if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); getBroker().getExchanges().destroy(name); - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name)); } @@ -181,7 +181,7 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, getBroker().getStore().bind(*exchange, *queue, routingKey, arguments); } - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, arguments)); } @@ -214,7 +214,7 @@ void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, if (exchange->isDurable() && queue->isDurable()) getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable()); - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey)); } @@ -372,7 +372,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& } } - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), name, durable, exclusive, autoDelete, arguments, @@ -422,7 +422,7 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse getBroker().getQueues().destroy(queue); q->unbind(getBroker().getExchanges(), q); - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue)); } @@ -484,7 +484,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, acceptMode == 0, acquireMode == 0, exclusive, resumeId, resumeTtl, arguments); - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(), queueName, destination, exclusive, arguments)); @@ -495,7 +495,7 @@ SessionAdapter::MessageHandlerImpl::cancel(const string& destination ) { state.cancel(destination); - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination)); } diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 7e5f605753..26a35f4a4f 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -32,7 +32,7 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" -#include "qpid/management/ManagementBroker.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/framing/AMQP_ClientProxy.h" #include <boost/bind.hpp> @@ -45,7 +45,6 @@ using namespace framing; using sys::Mutex; using boost::intrusive_ptr; using qpid::management::ManagementAgent; -using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -73,7 +72,7 @@ SessionState::SessionState( } Manageable* parent = broker.GetVhostObject (); if (parent != 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent != 0) { mgmtObject = new _qmf::Session (agent, this, parent, getId().getName()); @@ -81,8 +80,7 @@ SessionState::SessionState( mgmtObject->set_detachedLifespan (0); mgmtObject->clr_expireTime(); if (rateFlowcontrol) mgmtObject->set_maxClientRate(maxRate); - ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); - agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0); + agent->addObject (mgmtObject, agent->allocateId(this)); } } attach(h); diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp index a11ad25bbe..86933109a1 100644 --- a/qpid/cpp/src/qpid/broker/System.cpp +++ b/qpid/cpp/src/qpid/broker/System.cpp @@ -18,7 +18,8 @@ // #include "System.h" -#include "qpid/agent/ManagementAgent.h" +#include "Broker.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/SystemInfo.h" #include <iostream> @@ -29,9 +30,9 @@ using namespace qpid::broker; using namespace std; namespace _qmf = qmf::org::apache::qpid::broker; -System::System (string _dataDir) : mgmtObject(0) +System::System (string _dataDir, Broker* broker) : mgmtObject(0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker ? broker->getManagementAgent() : 0; if (agent != 0) { diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h index 42a816e095..0fc2c2bd88 100644 --- a/qpid/cpp/src/qpid/broker/System.h +++ b/qpid/cpp/src/qpid/broker/System.h @@ -28,6 +28,8 @@ namespace qpid { namespace broker { +class Broker; + class System : public management::Manageable { private: @@ -38,7 +40,7 @@ class System : public management::Manageable typedef boost::shared_ptr<System> shared_ptr; - System (std::string _dataDir); + System (std::string _dataDir, Broker* broker = 0); management::ManagementObject* GetManagementObject (void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index d4f9721162..a465c35790 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -137,15 +137,15 @@ bool TopicPattern::match(const Tokens& target) const return do_match(begin(), end(), target.begin(), target.end()); } -TopicExchange::TopicExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) +TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } TopicExchange::TopicExchange(const std::string& _name, bool _durable, - const FieldTable& _args, Manageable* _parent) : - Exchange(_name, _durable, _args, _parent) + const FieldTable& _args, Manageable* _parent, Broker* b) : + Exchange(_name, _durable, _args, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.h b/qpid/cpp/src/qpid/broker/TopicExchange.h index 24bf5f7bca..b3ee1ea66d 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.h +++ b/qpid/cpp/src/qpid/broker/TopicExchange.h @@ -86,11 +86,11 @@ class TopicExchange : public virtual Exchange { static const std::string typeName; QPID_BROKER_EXTERN TopicExchange(const string& name, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN TopicExchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } diff --git a/qpid/cpp/src/qpid/broker/Vhost.cpp b/qpid/cpp/src/qpid/broker/Vhost.cpp index c5bb6c5104..aa7683d318 100644 --- a/qpid/cpp/src/qpid/broker/Vhost.cpp +++ b/qpid/cpp/src/qpid/broker/Vhost.cpp @@ -18,7 +18,8 @@ // #include "Vhost.h" -#include "qpid/agent/ManagementAgent.h" +#include "Broker.h" +#include "qpid/management/ManagementAgent.h" using namespace qpid::broker; using qpid::management::ManagementAgent; @@ -28,11 +29,11 @@ namespace qpid { namespace management { class Manageable; }} -Vhost::Vhost (qpid::management::Manageable* parentBroker) : mgmtObject(0) +Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker) : mgmtObject(0) { - if (parentBroker != 0) + if (parentBroker != 0 && broker != 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { diff --git a/qpid/cpp/src/qpid/broker/Vhost.h b/qpid/cpp/src/qpid/broker/Vhost.h index ef59362e4d..9554d641c2 100644 --- a/qpid/cpp/src/qpid/broker/Vhost.h +++ b/qpid/cpp/src/qpid/broker/Vhost.h @@ -27,6 +27,7 @@ namespace qpid { namespace broker { +class Broker; class Vhost : public management::Manageable { private: @@ -37,7 +38,7 @@ class Vhost : public management::Manageable typedef boost::shared_ptr<Vhost> shared_ptr; - Vhost (management::Manageable* parentBroker); + Vhost (management::Manageable* parentBroker, Broker* broker = 0); management::ManagementObject* GetManagementObject (void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 677bd2b722..1f39fe9ae9 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -44,7 +44,7 @@ #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" #include "qpid/management/IdAllocator.h" -#include "qpid/management/ManagementBroker.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/memory.h" #include "qpid/sys/Thread.h" #include "qpid/sys/LatencyTracker.h" @@ -116,7 +116,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : lastBroker(false), error(*this) { - mAgent = ManagementAgent::Singleton::getInstance(); + mAgent = broker.getManagementAgent(); if (mAgent != 0){ _qmf::Package packageInit(mAgent); mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str()); diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index 66d15fa56b..56c50eafae 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -31,7 +31,7 @@ #include "qpid/sys/AtomicValue.h" #include "qpid/log/Statement.h" -#include "qpid/management/ManagementBroker.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/management/IdAllocator.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/Queue.h" @@ -49,7 +49,6 @@ using namespace std; using broker::Broker; using management::IdAllocator; using management::ManagementAgent; -using management::ManagementBroker; /** Note separating options from settings to work around boost version differences. @@ -140,7 +139,7 @@ struct ClusterPlugin : public Plugin { broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); - ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance()); + ManagementAgent* mgmt = broker->getManagementAgent(); if (mgmt) { std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator()); mgmt->setAllocator(allocator); diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 19300ef1af..77277070d9 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -19,7 +19,8 @@ * */ -#include "ManagementBroker.h" +#include "ManagementAgent.h" +#include "ManagementObject.h" #include "IdAllocator.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/log/Statement.h" @@ -41,45 +42,13 @@ using namespace qpid::sys; using namespace std; namespace _qmf = qmf::org::apache::qpid::broker; -Mutex ManagementAgent::Singleton::lock; -bool ManagementAgent::Singleton::disabled = false; -ManagementAgent* ManagementAgent::Singleton::agent = 0; -int ManagementAgent::Singleton::refCount = 0; - -ManagementAgent::Singleton::Singleton(bool disableManagement) -{ - Mutex::ScopedLock _lock(lock); - if (disableManagement && !disabled) { - disabled = true; - assert(refCount == 0); // can't disable after agent has been allocated - } - if (refCount == 0 && !disabled) - agent = new ManagementBroker(); - refCount++; -} - -ManagementAgent::Singleton::~Singleton() -{ - Mutex::ScopedLock _lock(lock); - refCount--; - if (refCount == 0 && !disabled) { - delete agent; - agent = 0; - } -} - -ManagementAgent* ManagementAgent::Singleton::getInstance() -{ - return agent; -} - -ManagementBroker::RemoteAgent::~RemoteAgent () +ManagementAgent::RemoteAgent::~RemoteAgent () { if (mgmtObject != 0) mgmtObject->resourceDestroy(); } -ManagementBroker::ManagementBroker () : +ManagementAgent::ManagementAgent () : threadPoolSize(1), interval(10), broker(0), startTime(uint64_t(Duration(now()))) { nextObjectId = 1; @@ -90,7 +59,7 @@ ManagementBroker::ManagementBroker () : clientWasAdded = false; } -ManagementBroker::~ManagementBroker () +ManagementAgent::~ManagementAgent () { timer.stop(); { @@ -114,20 +83,21 @@ ManagementBroker::~ManagementBroker () } } -void ManagementBroker::configure(const string& _dataDir, uint16_t _interval, +void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, qpid::broker::Broker* _broker, int _threads) { dataDir = _dataDir; interval = _interval; broker = _broker; threadPoolSize = _threads; + ManagementObject::maxThreads = threadPoolSize; timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); // Get from file or generate and save to file. if (dataDir.empty()) { uuid.generate(); - QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: " + QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: " << uuid); } else @@ -141,7 +111,7 @@ void ManagementBroker::configure(const string& _dataDir, uint16_t _interval, inFile >> bootSequence; inFile >> nextRemoteBank; inFile.close(); - QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); + QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid); // if sequence goes beyond a 12-bit field, skip zero and wrap to 1. bootSequence++; @@ -152,15 +122,15 @@ void ManagementBroker::configure(const string& _dataDir, uint16_t _interval, else { uuid.generate(); - QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid); + QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid); writeData(); } - QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence); + QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence); } } -void ManagementBroker::writeData () +void ManagementAgent::writeData () { string filename (dataDir + "/.mbrokerdata"); ofstream outFile (filename.c_str ()); @@ -172,14 +142,14 @@ void ManagementBroker::writeData () } } -void ManagementBroker::setExchange (qpid::broker::Exchange::shared_ptr _mexchange, +void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange, qpid::broker::Exchange::shared_ptr _dexchange) { mExchange = _mexchange; dExchange = _dexchange; } -void ManagementBroker::registerClass (const string& packageName, +void ManagementAgent::registerClass (const string& packageName, const string& className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) @@ -189,7 +159,7 @@ void ManagementBroker::registerClass (const string& packageName, addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); } -void ManagementBroker::registerEvent (const string& packageName, +void ManagementAgent::registerEvent (const string& packageName, const string& eventName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) @@ -199,7 +169,7 @@ void ManagementBroker::registerEvent (const string& packageName, addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } -ObjectId ManagementBroker::addObject (ManagementObject* object, +ObjectId ManagementAgent::addObject (ManagementObject* object, uint64_t persistId) { Mutex::ScopedLock lock (addLock); @@ -221,7 +191,7 @@ ObjectId ManagementBroker::addObject (ManagementObject* object, return objId; } -void ManagementBroker::raiseEvent(const ManagementEvent& event, severity_t severity) +void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity) { Mutex::ScopedLock lock (userLock); Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); @@ -241,18 +211,18 @@ void ManagementBroker::raiseEvent(const ManagementEvent& event, severity_t sever "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); } -ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) - : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {} +ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) + : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), agent(_agent) {} -ManagementBroker::Periodic::~Periodic () {} +ManagementAgent::Periodic::~Periodic () {} -void ManagementBroker::Periodic::fire () +void ManagementAgent::Periodic::fire () { - broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval))); - broker.periodicProcessing (); + agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval))); + agent.periodicProcessing (); } -void ManagementBroker::clientAdded (const std::string& routingKey) +void ManagementAgent::clientAdded (const std::string& routingKey) { if (routingKey.find("console") != 0) return; @@ -272,7 +242,7 @@ void ManagementBroker::clientAdded (const std::string& routingKey) } } -void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet ('A'); buf.putOctet ('M'); @@ -281,7 +251,7 @@ void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) buf.putLong (seq); } -bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) { uint8_t h1 = buf.getOctet(); uint8_t h2 = buf.getOctet(); @@ -293,7 +263,7 @@ bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) return h1 == 'A' && h2 == 'M' && h3 == '2'; } -void ManagementBroker::sendBuffer(Buffer& buf, +void ManagementAgent::sendBuffer(Buffer& buf, uint32_t length, qpid::broker::Exchange::shared_ptr exchange, string routingKey) @@ -327,7 +297,7 @@ void ManagementBroker::sendBuffer(Buffer& buf, } catch(exception&) {} } -void ManagementBroker::moveNewObjectsLH() +void ManagementAgent::moveNewObjectsLH() { Mutex::ScopedLock lock (addLock); for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); @@ -337,7 +307,7 @@ void ManagementBroker::moveNewObjectsLH() newManagementObjects.clear(); } -void ManagementBroker::periodicProcessing (void) +void ManagementAgent::periodicProcessing (void) { #define BUFSIZE 65536 Mutex::ScopedLock lock (userLock); @@ -421,7 +391,7 @@ void ManagementBroker::periodicProcessing (void) } } -void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence, +void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, uint32_t code, string text) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -435,7 +405,7 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence sendBuffer (outBuffer, outLen, dExchange, replyToKey); } -bool ManagementBroker::dispatchCommand (Deliverable& deliverable, +bool ManagementAgent::dispatchCommand (Deliverable& deliverable, const string& routingKey, const FieldTable* /*args*/) { @@ -471,7 +441,7 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable, return true; } -void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, +void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) { string methodName; @@ -532,7 +502,7 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe sendBuffer(outBuffer, outLen, dExchange, replyToKey); } -void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) +void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; @@ -545,7 +515,7 @@ void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32 sendBuffer (outBuffer, outLen, dExchange, replyToKey); } -void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) +void ManagementAgent::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) { for (PackageMap::iterator pIter = packages.begin (); pIter != packages.end (); @@ -564,7 +534,7 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_ sendCommandComplete (replyToKey, sequence); } -void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) +void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) { string packageName; @@ -572,7 +542,7 @@ void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey findOrAddPackageLH(packageName); } -void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) { string packageName; @@ -601,7 +571,7 @@ void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, u sendCommandComplete(replyToKey, sequence); } -void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) +void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) { string packageName; SchemaClassKey key; @@ -633,7 +603,7 @@ void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, ui } } -void ManagementBroker::SchemaClass::appendSchema(Buffer& buf) +void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) { // If the management package is attached locally (embedded in the broker or // linked in via plug-in), call the schema handler directly. If the package @@ -645,7 +615,7 @@ void ManagementBroker::SchemaClass::appendSchema(Buffer& buf) buf.putRawData(buffer, bufferLen); } -void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -680,7 +650,7 @@ void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey sendCommandComplete(replyToKey, sequence, 1, "Package not found"); } -void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) +void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -699,7 +669,7 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) { size_t length = validateSchema(inBuffer, cIter->second.kind); if (length == 0) { - QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name); + QPID_LOG(warning, "Management Agent received invalid schema response: " << packageName << "." << key.name); cMap.erase(key); } else { cIter->second.buffer = (uint8_t*) malloc(length); @@ -720,7 +690,7 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo } } -bool ManagementBroker::bankInUse (uint32_t bank) +bool ManagementAgent::bankInUse (uint32_t bank) { for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); @@ -730,7 +700,7 @@ bool ManagementBroker::bankInUse (uint32_t bank) return false; } -uint32_t ManagementBroker::allocateNewBank () +uint32_t ManagementAgent::allocateNewBank () { while (bankInUse (nextRemoteBank)) nextRemoteBank++; @@ -740,14 +710,14 @@ uint32_t ManagementBroker::allocateNewBank () return allocated; } -uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank) +uint32_t ManagementAgent::assignBankLH (uint32_t requestedBank) { if (requestedBank == 0 || bankInUse (requestedBank)) return allocateNewBank (); return requestedBank; } -void ManagementBroker::deleteOrphanedAgentsLH() +void ManagementAgent::deleteOrphanedAgentsLH() { vector<ObjectId> deleteList; @@ -776,7 +746,7 @@ void ManagementBroker::deleteOrphanedAgentsLH() deleteList.clear(); } -void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) +void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) { string label; uint32_t requestedBrokerBank, requestedAgentBank; @@ -827,7 +797,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe sendBuffer (outBuffer, outLen, dExchange, replyToKey); } -void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) { FieldTable ft; FieldTable::ValuePtr value; @@ -887,7 +857,7 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui sendCommandComplete(replyToKey, sequence); } -bool ManagementBroker::authorizeAgentMessageLH(Message& msg) +bool ManagementAgent::authorizeAgentMessageLH(Message& msg) { Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); uint8_t opcode; @@ -951,7 +921,7 @@ bool ManagementBroker::authorizeAgentMessageLH(Message& msg) return true; } -void ManagementBroker::dispatchAgentCommandLH(Message& msg) +void ManagementAgent::dispatchAgentCommandLH(Message& msg) { Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); uint8_t opcode; @@ -968,7 +938,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg) return; if (msg.encodedSize() > MA_BUFFER_SIZE) { - QPID_LOG(debug, "ManagementBroker::dispatchAgentCommandLH: Message too large: " << + QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << msg.encodedSize()); return; } @@ -994,7 +964,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg) } } -ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(string name) +ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string name) { PackageMap::iterator pIter = packages.find (name); if (pIter != packages.end ()) @@ -1003,7 +973,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(stri // No such package found, create a new map entry. pair<PackageMap::iterator, bool> result = packages.insert(pair<string, ClassMap>(name, ClassMap())); - QPID_LOG (debug, "ManagementBroker added package " << name); + QPID_LOG (debug, "ManagementAgent added package " << name); // Publish a package-indication message Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -1018,7 +988,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(stri return result.first; } -void ManagementBroker::addClassLH(uint8_t kind, +void ManagementAgent::addClassLH(uint8_t kind, PackageMap::iterator pIter, const string& className, uint8_t* md5Sum, @@ -1035,20 +1005,20 @@ void ManagementBroker::addClassLH(uint8_t kind, return; // No such class found, create a new class with local information. - QPID_LOG (debug, "ManagementBroker added class " << pIter->first << ":" << + QPID_LOG (debug, "ManagementAgent added class " << pIter->first << ":" << key.name); cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall))); cIter = cMap.find(key); } -void ManagementBroker::encodePackageIndication(Buffer& buf, +void ManagementAgent::encodePackageIndication(Buffer& buf, PackageMap::iterator pIter) { buf.putShortString((*pIter).first); } -void ManagementBroker::encodeClassIndication(Buffer& buf, +void ManagementAgent::encodeClassIndication(Buffer& buf, PackageMap::iterator pIter, ClassMap::iterator cIter) { @@ -1060,7 +1030,7 @@ void ManagementBroker::encodeClassIndication(Buffer& buf, buf.putBin128(key.hash); } -size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind) +size_t ManagementAgent::validateSchema(Buffer& inBuffer, uint8_t kind) { if (kind == ManagementItem::CLASS_KIND_TABLE) return validateTableSchema(inBuffer); @@ -1069,7 +1039,7 @@ size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind) return 0; } -size_t ManagementBroker::validateTableSchema(Buffer& inBuffer) +size_t ManagementAgent::validateTableSchema(Buffer& inBuffer) { uint32_t start = inBuffer.getPosition(); uint32_t end; @@ -1115,7 +1085,7 @@ size_t ManagementBroker::validateTableSchema(Buffer& inBuffer) return end - start; } -size_t ManagementBroker::validateEventSchema(Buffer& inBuffer) +size_t ManagementAgent::validateEventSchema(Buffer& inBuffer) { uint32_t start = inBuffer.getPosition(); uint32_t end; @@ -1147,13 +1117,13 @@ size_t ManagementBroker::validateEventSchema(Buffer& inBuffer) return end - start; } -void ManagementBroker::setAllocator(std::auto_ptr<IdAllocator> a) +void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a) { Mutex::ScopedLock lock (addLock); allocator = a; } -uint64_t ManagementBroker::allocateId(Manageable* object) +uint64_t ManagementAgent::allocateId(Manageable* object) { Mutex::ScopedLock lock (addLock); if (allocator.get()) return allocator->getIdFor(object); diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index a57f73be15..2411e6c277 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -1,5 +1,5 @@ -#ifndef _ManagementBroker_ -#define _ManagementBroker_ +#ifndef _ManagementAgent_ +#define _ManagementAgent_ /* * @@ -21,14 +21,15 @@ * under the License. * */ +#include "qpid/broker/BrokerImportExport.h" #include "qpid/Options.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/Timer.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" #include "qpid/broker/ConnectionToken.h" -#include "qpid/agent/ManagementAgent.h" #include "ManagementObject.h" +#include "ManagementEvent.h" #include "Manageable.h" #include "qmf/org/apache/qpid/broker/Agent.h" #include <qpid/framing/AMQFrame.h> @@ -39,15 +40,27 @@ namespace management { struct IdAllocator; -class ManagementBroker : public ManagementAgent +class ManagementAgent { private: int threadPoolSize; public: - ManagementBroker (); - virtual ~ManagementBroker (); + typedef enum { + SEV_EMERG = 0, + SEV_ALERT = 1, + SEV_CRIT = 2, + SEV_ERROR = 3, + SEV_WARN = 4, + SEV_NOTE = 5, + SEV_INFO = 6, + SEV_DEBUG = 7, + SEV_DEFAULT = 8 + } severity_t; + + ManagementAgent (); + virtual ~ManagementAgent (); void configure (const std::string& dataDir, uint16_t interval, qpid::broker::Broker* broker, int threadPoolSize); @@ -55,41 +68,34 @@ public: void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange, qpid::broker::Exchange::shared_ptr directExchange); int getMaxThreads () { return threadPoolSize; } - void registerClass (const std::string& packageName, - const std::string& className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - void registerEvent (const std::string& packageName, - const std::string& eventName, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - ObjectId addObject (ManagementObject* object, - uint64_t persistId = 0); - void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT); - void clientAdded (const std::string& routingKey); + QPID_BROKER_EXTERN void registerClass (const std::string& packageName, + const std::string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + QPID_BROKER_EXTERN void registerEvent (const std::string& packageName, + const std::string& eventName, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, + uint64_t persistId = 0); + QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, + severity_t severity = SEV_DEFAULT); + QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); + bool dispatchCommand (qpid::broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); - const framing::Uuid& getUuid() const { return uuid; } - // Stubs for remote management agent calls - void init(const std::string&, uint16_t, uint16_t, bool, - const std::string&, const std::string&, const std::string&, - const std::string&, const std::string&) { assert(0); } - void init(const client::ConnectionSettings&, uint16_t, bool, const std::string&) { assert(0); } - uint32_t pollCallbacks (uint32_t) { assert(0); return 0; } - int getSignalFd () { assert(0); return -1; } + const framing::Uuid& getUuid() const { return uuid; } void setAllocator(std::auto_ptr<IdAllocator> allocator); uint64_t allocateId(Manageable* object); private: - friend class ManagementAgent; - struct Periodic : public qpid::broker::TimerTask { - ManagementBroker& broker; + ManagementAgent& agent; - Periodic (ManagementBroker& broker, uint32_t seconds); + Periodic (ManagementAgent& agent, uint32_t seconds); virtual ~Periodic (); void fire (); }; @@ -239,4 +245,4 @@ private: }} -#endif /*!_ManagementBroker_*/ +#endif /*!_ManagementAgent_*/ diff --git a/qpid/cpp/src/qpid/management/ManagementExchange.cpp b/qpid/cpp/src/qpid/management/ManagementExchange.cpp index 4dcafbfcdd..0793b2d18c 100644 --- a/qpid/cpp/src/qpid/management/ManagementExchange.cpp +++ b/qpid/cpp/src/qpid/management/ManagementExchange.cpp @@ -27,14 +27,14 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent) : - Exchange (_name, _parent), TopicExchange(_name, _parent) {} +ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent, Broker* b) : + Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {} ManagementExchange::ManagementExchange (const std::string& _name, bool _durable, const FieldTable& _args, - Manageable* _parent) : - Exchange (_name, _durable, _args, _parent), - TopicExchange(_name, _durable, _args, _parent) {} + Manageable* _parent, Broker* b) : + Exchange (_name, _durable, _args, _parent, b), + TopicExchange(_name, _durable, _args, _parent, b) {} void ManagementExchange::route (Deliverable& msg, const string& routingKey, @@ -60,7 +60,7 @@ bool ManagementExchange::bind (Queue::shared_ptr queue, return TopicExchange::bind(queue, routingKey, args); } -void ManagementExchange::setManagmentAgent (ManagementBroker* agent) +void ManagementExchange::setManagmentAgent (ManagementAgent* agent) { managementAgent = agent; } diff --git a/qpid/cpp/src/qpid/management/ManagementExchange.h b/qpid/cpp/src/qpid/management/ManagementExchange.h index d54db1a74e..5e51683515 100644 --- a/qpid/cpp/src/qpid/management/ManagementExchange.h +++ b/qpid/cpp/src/qpid/management/ManagementExchange.h @@ -22,7 +22,7 @@ #define _ManagementExchange_ #include "qpid/broker/TopicExchange.h" -#include "ManagementBroker.h" +#include "ManagementAgent.h" namespace qpid { namespace broker { @@ -30,15 +30,15 @@ namespace broker { class ManagementExchange : public virtual TopicExchange { private: - management::ManagementBroker* managementAgent; + management::ManagementAgent* managementAgent; public: static const std::string typeName; - ManagementExchange (const string& name, Manageable* _parent = 0); + ManagementExchange (const string& name, Manageable* _parent = 0, Broker* broker = 0); ManagementExchange (const string& _name, bool _durable, const qpid::framing::FieldTable& _args, - Manageable* _parent = 0); + Manageable* _parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } @@ -50,7 +50,7 @@ class ManagementExchange : public virtual TopicExchange const string& routingKey, const qpid::framing::FieldTable* args); - void setManagmentAgent (management::ManagementBroker* agent); + void setManagmentAgent (management::ManagementAgent* agent); virtual ~ManagementExchange(); }; diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index f4c45de126..08008b3d79 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -21,7 +21,6 @@ #include "Manageable.h" #include "ManagementObject.h" -#include "qpid/agent/ManagementAgent.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Thread.h" @@ -156,6 +155,7 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i) }} +int ManagementObject::maxThreads = 1; int ManagementObject::nextThreadIndex = 0; void ManagementObject::writeTimestamps (framing::Buffer& buf) @@ -176,7 +176,7 @@ int ManagementObject::getThreadIndex() { if (thisIndex == -1) { sys::Mutex::ScopedLock mutex(accessLock); thisIndex = nextThreadIndex; - if (nextThreadIndex < agent->getMaxThreads() - 1) + if (nextThreadIndex < maxThreads - 1) nextThreadIndex++; } return thisIndex; diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index 498169318d..15c2307886 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -32,7 +32,6 @@ namespace qpid { namespace management { class Manageable; -class ManagementAgent; class ObjectId; @@ -111,7 +110,7 @@ public: class ManagementObject : public ManagementItem { - protected: +protected: uint64_t createTime; uint64_t destroyTime; @@ -122,8 +121,6 @@ class ManagementObject : public ManagementItem bool deleted; Manageable* coreObject; sys::Mutex accessLock; - ManagementAgent* agent; - int maxThreads; uint32_t flags; static int nextThreadIndex; @@ -133,13 +130,14 @@ class ManagementObject : public ManagementItem QPID_COMMON_EXTERN void writeTimestamps(qpid::framing::Buffer& buf); public: + static int maxThreads; typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); - ManagementObject(ManagementAgent* _agent, Manageable* _core) : + ManagementObject(Manageable* _core) : createTime(uint64_t(qpid::sys::Duration(qpid::sys::now()))), destroyTime(0), updateTime(createTime), configChanged(true), instChanged(true), deleted(false), - coreObject(_core), agent(_agent), forcePublish(false) {} + coreObject(_core), forcePublish(false) {} virtual ~ManagementObject() {} virtual writeSchemaCall_t getWriteSchemaCall() = 0; |