summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-05-11 14:16:52 +0000
committerTed Ross <tross@apache.org>2009-05-11 14:16:52 +0000
commita1b440e5393206ec5833e2d6c2617c2aca71701f (patch)
treeedbe7aad7a01122986380860c4cedd95086a282a
parentec0e348d1d14679f72ce704555dd2605880bddfa (diff)
downloadqpid-python-a1b440e5393206ec5833e2d6c2617c2aca71701f.tar.gz
QPID-1843 - Cleaned up the interface to the broker's internal management agent.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@773570 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xcpp/managementgen/qmf-gen22
-rwxr-xr-xcpp/managementgen/qmfgen/generate.py20
-rwxr-xr-xcpp/managementgen/qmfgen/schema.py9
-rw-r--r--cpp/managementgen/qmfgen/templates/Class.cpp7
-rw-r--r--cpp/managementgen/qmfgen/templates/Class.h6
-rw-r--r--cpp/managementgen/qmfgen/templates/Event.cpp2
-rw-r--r--cpp/managementgen/qmfgen/templates/Package.h2
-rw-r--r--cpp/src/Makefile.am6
-rw-r--r--cpp/src/qpid/acl/Acl.cpp2
-rw-r--r--cpp/src/qpid/acl/Acl.h2
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp6
-rw-r--r--cpp/src/qpid/broker/Broker.cpp23
-rw-r--r--cpp/src/qpid/broker/Broker.h5
-rw-r--r--cpp/src/qpid/broker/Connection.cpp2
-rw-r--r--cpp/src/qpid/broker/Connection.h2
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h4
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp58
-rw-r--r--cpp/src/qpid/broker/Exchange.h7
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp10
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h4
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp8
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h4
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp8
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h4
-rw-r--r--cpp/src/qpid/broker/Link.cpp6
-rw-r--r--cpp/src/qpid/broker/Link.h2
-rw-r--r--cpp/src/qpid/broker/Queue.cpp18
-rw-r--r--cpp/src/qpid/broker/Queue.h4
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp6
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h3
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp18
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp8
-rw-r--r--cpp/src/qpid/broker/System.cpp7
-rw-r--r--cpp/src/qpid/broker/System.h4
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h4
-rw-r--r--cpp/src/qpid/broker/Vhost.cpp9
-rw-r--r--cpp/src/qpid/broker/Vhost.h3
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp4
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp5
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp (renamed from cpp/src/qpid/management/ManagementBroker.cpp)152
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h (renamed from cpp/src/qpid/management/ManagementBroker.h)68
-rw-r--r--cpp/src/qpid/management/ManagementExchange.cpp12
-rw-r--r--cpp/src/qpid/management/ManagementExchange.h10
-rw-r--r--cpp/src/qpid/management/ManagementObject.cpp4
-rw-r--r--cpp/src/qpid/management/ManagementObject.h10
47 files changed, 304 insertions, 288 deletions
diff --git a/cpp/managementgen/qmf-gen b/cpp/managementgen/qmf-gen
index c6cfca5f83..ebc07137ae 100755
--- a/cpp/managementgen/qmf-gen
+++ b/cpp/managementgen/qmf-gen
@@ -45,6 +45,8 @@ parser.add_option("-p", "--gen-prefix", dest="genprefix", default="",
help="Prefix for generated files in make dependencies")
parser.add_option("-q", "--qpid-broker", dest="qpidbroker", default=False, action="store_true",
help="Generate makefile for Qpid broker")
+parser.add_option("-b", "--broker-plugin", dest="brokerplugin", default=False, action="store_true",
+ help="Generate code for use in a qpid broker plugin")
(opts, args) = parser.parse_args()
@@ -57,17 +59,23 @@ if len(args) == 0:
print "no input files"
parser.exit()
+vargs = {}
+if opts.brokerplugin:
+ vargs["agentHeaderDir"] = "management"
+else:
+ vargs["agentHeaderDir"] = "agent"
+
for schemafile in args:
package = SchemaPackage(typefile, schemafile, opts)
gen.setPackage (package.packageName)
- gen.makeClassFiles ("Class.h", package)
- gen.makeClassFiles ("Class.cpp", package)
- gen.makeMethodFiles ("Args.h", package)
- gen.makeEventFiles ("Event.h", package)
- gen.makeEventFiles ("Event.cpp", package)
- gen.makePackageFile ("Package.h", package)
- gen.makePackageFile ("Package.cpp", package)
+ gen.makeClassFiles ("Class.h", package, vars=vargs)
+ gen.makeClassFiles ("Class.cpp", package, vars=vargs)
+ gen.makeMethodFiles ("Args.h", package, vars=vargs)
+ gen.makeEventFiles ("Event.h", package, vars=vargs)
+ gen.makeEventFiles ("Event.cpp", package, vars=vargs)
+ gen.makePackageFile ("Package.h", package, vars=vargs)
+ gen.makePackageFile ("Package.cpp", package, vars=vargs)
if opts.makefile != None:
args = {}
diff --git a/cpp/managementgen/qmfgen/generate.py b/cpp/managementgen/qmfgen/generate.py
index 255d41ea0e..7173c2faa1 100755
--- a/cpp/managementgen/qmfgen/generate.py
+++ b/cpp/managementgen/qmfgen/generate.py
@@ -388,30 +388,39 @@ class Generator:
def setVariable (self, key, value):
self.variables[key] = value
- def makeClassFiles (self, templateFile, schema, force=False):
+ def makeClassFiles (self, templateFile, schema, force=False, vars=None):
""" Generate an expanded template per schema class """
classes = schema.getClasses ()
template = Template (self.input + templateFile, self)
+ if vars:
+ for arg in vars:
+ self.setVariable(arg, vars[arg])
self.templateFiles.append (templateFile)
for _class in classes:
target = self.targetClassFile (_class, templateFile)
stream = template.expand (_class)
self.writeIfChanged (stream, target, force)
- def makeEventFiles (self, templateFile, schema, force=False):
+ def makeEventFiles (self, templateFile, schema, force=False, vars=None):
""" Generate an expanded template per schema event """
events = schema.getEvents()
template = Template (self.input + templateFile, self)
+ if vars:
+ for arg in vars:
+ self.setVariable(arg, vars[arg])
self.templateFiles.append (templateFile)
for event in events:
target = self.targetEventFile(event, templateFile)
stream = template.expand(event)
self.writeIfChanged(stream, target, force)
- def makeMethodFiles (self, templateFile, schema, force=False):
+ def makeMethodFiles (self, templateFile, schema, force=False, vars=None):
""" Generate an expanded template per method-with-arguments """
classes = schema.getClasses ()
template = Template (self.input + templateFile, self)
+ if vars:
+ for arg in vars:
+ self.setVariable(arg, vars[arg])
self.templateFiles.append (templateFile)
for _class in classes:
methods = _class.getMethods ()
@@ -421,9 +430,12 @@ class Generator:
stream = template.expand (method)
self.writeIfChanged (stream, target, force)
- def makePackageFile (self, templateFile, schema, force=False):
+ def makePackageFile (self, templateFile, schema, force=False, vars=None):
""" Generate a package-specific file """
template = Template (self.input + templateFile, self)
+ if vars:
+ for arg in vars:
+ self.setVariable(arg, vars[arg])
self.templateFiles.append (templateFile)
target = self.targetPackageFile (schema, templateFile)
stream = template.expand (schema)
diff --git a/cpp/managementgen/qmfgen/schema.py b/cpp/managementgen/qmfgen/schema.py
index 69823d6de0..3b53830c69 100755
--- a/cpp/managementgen/qmfgen/schema.py
+++ b/cpp/managementgen/qmfgen/schema.py
@@ -754,6 +754,9 @@ class SchemaEvent:
def getFullName (self):
return capitalize(self.package + capitalize(self.name))
+ def genAgentHeaderLocation (self, stream, variables):
+ stream.write(variables["agentHeaderDir"])
+
def getArgCount (self):
return len (self.args)
@@ -954,6 +957,9 @@ class SchemaClass:
if inst.assign == None:
inst.genAccessor (stream)
+ def genAgentHeaderLocation (self, stream, variables):
+ stream.write(variables["agentHeaderDir"])
+
def genCloseNamespaces (self, stream, variables):
for item in self.packageName.split("."):
stream.write ("}")
@@ -1258,6 +1264,9 @@ class SchemaPackage:
def getEvents(self):
return self.events
+ def genAgentHeaderLocation (self, stream, variables):
+ stream.write(variables["agentHeaderDir"])
+
def genCloseNamespaces (self, stream, variables):
for item in self.packageName.split("."):
stream.write ("}")
diff --git a/cpp/managementgen/qmfgen/templates/Class.cpp b/cpp/managementgen/qmfgen/templates/Class.cpp
index 247e1090ff..973d92586a 100644
--- a/cpp/managementgen/qmfgen/templates/Class.cpp
+++ b/cpp/managementgen/qmfgen/templates/Class.cpp
@@ -23,7 +23,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
-#include "qpid/agent/ManagementAgent.h"
+#include "qpid//*MGEN:Class.AgentHeaderLocation*//ManagementAgent.h"
#include "/*MGEN:Class.NameCap*/.h"
/*MGEN:Class.MethodArgIncludes*/
@@ -40,8 +40,8 @@ string /*MGEN:Class.NameCap*/::className = string ("/*MGEN:Class.NameLower*/
uint8_t /*MGEN:Class.NameCap*/::md5Sum[16] =
{/*MGEN:Class.SchemaMD5*/};
-/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (ManagementAgent* _agent, Manageable* _core/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/) :
- ManagementObject(_agent, _core)/*MGEN:Class.ConstructorInits*/
+/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (ManagementAgent*, Manageable* _core/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/) :
+ ManagementObject(_core)/*MGEN:Class.ConstructorInits*/
{
/*MGEN:Class.ParentRefAssignment*/
/*MGEN:Class.InitializeElements*/
@@ -51,7 +51,6 @@ uint8_t /*MGEN:Class.NameCap*/::md5Sum[16] =
presenceMask[idx] = 0;
/*MGEN:ENDIF*/
/*MGEN:IF(Class.ExistPerThreadStats)*/
- maxThreads = agent->getMaxThreads();
perThreadStatsArray = new struct PerThreadStats*[maxThreads];
for (int idx = 0; idx < maxThreads; idx++)
perThreadStatsArray[idx] = 0;
diff --git a/cpp/managementgen/qmfgen/templates/Class.h b/cpp/managementgen/qmfgen/templates/Class.h
index 0bf9911895..225090f0a9 100644
--- a/cpp/managementgen/qmfgen/templates/Class.h
+++ b/cpp/managementgen/qmfgen/templates/Class.h
@@ -27,6 +27,12 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Uuid.h"
+namespace qpid {
+ namespace management {
+ class ManagementAgent;
+ }
+}
+
namespace qmf {
/*MGEN:Class.OpenNamespaces*/
diff --git a/cpp/managementgen/qmfgen/templates/Event.cpp b/cpp/managementgen/qmfgen/templates/Event.cpp
index cdb40c6d79..2ffec8bcdf 100644
--- a/cpp/managementgen/qmfgen/templates/Event.cpp
+++ b/cpp/managementgen/qmfgen/templates/Event.cpp
@@ -23,7 +23,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
-#include "qpid/agent/ManagementAgent.h"
+#include "qpid//*MGEN:Event.AgentHeaderLocation*//ManagementAgent.h"
#include "Event/*MGEN:Event.NameCap*/.h"
using namespace qmf::/*MGEN:Event.Namespace*/;
diff --git a/cpp/managementgen/qmfgen/templates/Package.h b/cpp/managementgen/qmfgen/templates/Package.h
index 0ad7060b9e..569c7cfb33 100644
--- a/cpp/managementgen/qmfgen/templates/Package.h
+++ b/cpp/managementgen/qmfgen/templates/Package.h
@@ -23,7 +23,7 @@
/*MGEN:Root.Disclaimer*/
-#include "qpid/agent/ManagementAgent.h"
+#include "qpid//*MGEN:Class.AgentHeaderLocation*//ManagementAgent.h"
namespace qmf {
/*MGEN:Class.OpenNamespaces*/
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 4d2d375802..63ca7009d9 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -87,7 +87,7 @@ $(rgen_generator):
# Management generator.
mgen_dir=$(top_srcdir)/managementgen
-mgen_cmd=$(mgen_dir)/qmf-gen -m $(srcdir)/managementgen.mk -q -o gen/qmf \
+mgen_cmd=$(mgen_dir)/qmf-gen -m $(srcdir)/managementgen.mk -q -b -o gen/qmf \
$(top_srcdir)/../specs/management-schema.xml \
$(srcdir)/qpid/acl/management-schema.xml \
$(srcdir)/qpid/cluster/management-schema.xml
@@ -427,7 +427,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/TxBuffer.cpp \
qpid/broker/TxPublish.cpp \
qpid/broker/Vhost.cpp \
- qpid/management/ManagementBroker.cpp \
+ qpid/management/ManagementAgent.cpp \
qpid/management/ManagementExchange.cpp \
qpid/sys/TCPIOPlugin.cpp
@@ -679,7 +679,7 @@ nobase_include_HEADERS = \
qpid/management/Args.h \
qpid/management/IdAllocator.h \
qpid/management/Manageable.h \
- qpid/management/ManagementBroker.h \
+ qpid/management/ManagementAgent.h \
qpid/management/ManagementEvent.h \
qpid/management/ManagementExchange.h \
qpid/management/ManagementObject.h \
diff --git a/cpp/src/qpid/acl/Acl.cpp b/cpp/src/qpid/acl/Acl.cpp
index 8c128e7bb9..fe2644c136 100644
--- a/cpp/src/qpid/acl/Acl.cpp
+++ b/cpp/src/qpid/acl/Acl.cpp
@@ -46,7 +46,7 @@ namespace _qmf = qmf::org::apache::qpid::acl;
Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false)
{
- agent = ManagementAgent::Singleton::getInstance();
+ agent = broker->getManagementAgent();
if (agent != 0){
_qmf::Package packageInit(agent);
diff --git a/cpp/src/qpid/acl/Acl.h b/cpp/src/qpid/acl/Acl.h
index 7770843e87..e153187b3d 100644
--- a/cpp/src/qpid/acl/Acl.h
+++ b/cpp/src/qpid/acl/Acl.h
@@ -26,7 +26,7 @@
#include "qpid/RefCounted.h"
#include "qpid/broker/AclModule.h"
#include "qpid/management/Manageable.h"
-#include "qpid/agent/ManagementAgent.h"
+#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/acl/Acl.h"
#include <map>
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 4d275b958f..e629a20e87 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -24,7 +24,7 @@
#include "LinkRegistry.h"
#include "SessionState.h"
-#include "qpid/agent/ManagementAgent.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
@@ -64,7 +64,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
std::stringstream title;
title << id << "_" << link->getBroker()->getFederationTag();
queueName += title.str();
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = link->getBroker()->getManagementAgent();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
(agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
@@ -181,7 +181,7 @@ void Bridge::destroy()
void Bridge::setPersistenceId(uint64_t pId) const
{
if (mgmtObject != 0 && persistenceId == 0) {
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = link->getBroker()->getManagementAgent();
agent->addObject (mgmtObject, pId);
}
persistenceId = pId;
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index c43eca6e5b..749489fbfd 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -65,7 +65,7 @@ using qpid::sys::Dispatcher;
using qpid::sys::Thread;
using qpid::framing::FrameHandler;
using qpid::framing::ChannelId;
-using qpid::management::ManagementBroker;
+using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
@@ -136,10 +136,11 @@ const std::string knownHostsNone("none");
Broker::Broker(const Broker::Options& conf) :
poller(new Poller),
config(conf),
- managementAgentSingleton(!config.enableMgmt),
store(0),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
+ queues(this),
+ exchanges(this),
links(this),
factory(new SecureConnectionFactory(*this)),
dtxManager(timer),
@@ -148,6 +149,7 @@ Broker::Broker(const Broker::Options& conf) :
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
+ managementAgent(conf.enableMgmt ? new ManagementAgent() : 0),
queueCleaner(queues, timer),
queueEvents(poller),
recovery(true),
@@ -156,13 +158,11 @@ Broker::Broker(const Broker::Options& conf) :
{
if (conf.enableMgmt) {
QPID_LOG(info, "Management enabled");
- managementAgent = managementAgentSingleton.getInstance();
- ((ManagementBroker*) managementAgent)->configure
- (dataDir.isEnabled() ? dataDir.getPath() : string(),
- conf.mgmtPubInterval, this, conf.workerThreads + 3);
+ managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(),
+ conf.mgmtPubInterval, this, conf.workerThreads + 3);
_qmf::Package packageInitializer(managementAgent);
- System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string());
+ System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this);
systemObject = System::shared_ptr(system);
mgmtObject = new _qmf::Broker(managementAgent, this, system, conf.port);
@@ -182,9 +182,9 @@ Broker::Broker(const Broker::Options& conf) :
// Since there is currently no support for virtual hosts, a placeholder object
// representing the implied single virtual host is added here to keep the
// management schema correct.
- Vhost* vhost = new Vhost(this);
+ Vhost* vhost = new Vhost(this, this);
vhostObject = Vhost::shared_ptr(vhost);
- framing::Uuid uuid(((ManagementBroker*) managementAgent)->getUuid());
+ framing::Uuid uuid(managementAgent->getUuid());
federationTag = uuid.str();
vhostObject->setFederationTag(federationTag);
@@ -238,9 +238,8 @@ Broker::Broker(const Broker::Options& conf) :
exchanges.declare(qpid_management, ManagementExchange::typeName);
Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
- ((ManagementBroker*) managementAgent)->setExchange (mExchange, dExchange);
- boost::dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent
- ((ManagementBroker*) managementAgent);
+ managementAgent->setExchange(mExchange, dExchange);
+ boost::dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent(managementAgent);
}
else
QPID_LOG(info, "Management not enabled");
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 5a1529a3ba..8f4621bb39 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -39,7 +39,7 @@
#include "Timer.h"
#include "ExpiryPolicy.h"
#include "qpid/management/Manageable.h"
-#include "qpid/management/ManagementBroker.h"
+#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h"
#include "qpid/Options.h"
@@ -120,7 +120,6 @@ public:
boost::shared_ptr<sys::Poller> poller;
Options config;
- management::ManagementAgent::Singleton managementAgentSingleton;
ProtocolFactoryMap protocolFactories;
std::auto_ptr<MessageStore> store;
AclModule* acl;
@@ -235,6 +234,8 @@ public:
void setRecovery(bool set) { recovery = set; }
bool getRecovery() const { return recovery; }
+
+ management::ManagementAgent* getManagementAgent() { return managementAgent; }
};
}}
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 365b3ccbeb..22188054a6 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -67,7 +67,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
if (parent != 0)
{
- agent = ManagementAgent::Singleton::getInstance();
+ agent = broker_.getManagementAgent();
// TODO set last bool true if system connection
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index e67cdce681..770bf2184f 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -40,7 +40,7 @@
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/agent/ManagementAgent.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/management/Manageable.h"
#include "qpid/ptr_map.h"
#include "qpid/sys/AggregateOutput.h"
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index d1d9ad07e4..deb9699c96 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -41,15 +41,15 @@ const std::string fedOpReorigin("R");
const std::string fedOpHello("H");
}
-DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+DirectExchange::DirectExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type(typeName);
}
DirectExchange::DirectExchange(const string& _name, bool _durable,
- const FieldTable& _args, Manageable* _parent) :
- Exchange(_name, _durable, _args, _parent)
+ const FieldTable& _args, Manageable* _parent, Broker* b) :
+ Exchange(_name, _durable, _args, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type(typeName);
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 27d101c4fe..9081c319c0 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -46,11 +46,11 @@ public:
static const std::string typeName;
QPID_BROKER_EXTERN DirectExchange(const std::string& name,
- management::Manageable* parent = 0);
+ management::Manageable* parent = 0, Broker* broker = 0);
QPID_BROKER_EXTERN DirectExchange(const string& _name,
bool _durable,
const qpid::framing::FieldTable& _args,
- management::Manageable* parent = 0);
+ management::Manageable* parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index dd1fe98b2c..acedd1f91a 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -21,8 +21,8 @@
#include "Exchange.h"
#include "ExchangeRegistry.h"
-#include "qpid/agent/ManagementAgent.h"
-#include "qpid/management/ManagementBroker.h"
+#include "Broker.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/MessageProperties.h"
#include "DeliverableMessage.h"
@@ -33,7 +33,6 @@ using qpid::framing::Buffer;
using qpid::framing::FieldTable;
using qpid::sys::Mutex;
using qpid::management::ManagementAgent;
-using qpid::management::ManagementBroker;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
@@ -83,13 +82,13 @@ void Exchange::routeIVE(){
}
-Exchange::Exchange (const string& _name, Manageable* parent) :
- name(_name), durable(false), persistenceId(0), sequence(false),
- sequenceNo(0), ive(false), mgmtExchange(0)
+Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
+ name(_name), durable(false), persistenceId(0), sequence(false),
+ sequenceNo(0), ive(false), mgmtExchange(0), broker(b)
{
- if (parent != 0)
+ if (parent != 0 && broker != 0)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0)
{
mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable);
@@ -101,13 +100,13 @@ Exchange::Exchange (const string& _name, Manageable* parent) :
static const std::string QPID_MANAGEMENT("qpid.management");
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
- Manageable* parent)
+ Manageable* parent, Broker* b)
: name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
- args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
+ args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b)
{
- if (parent != 0)
+ if (parent != 0 && broker != 0)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0)
{
mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable);
@@ -118,8 +117,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
} else if (name == QPID_MANAGEMENT) {
agent->addObject (mgmtExchange, 0x1000000000000005LL); // Special management exchange ID
} else {
- ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
- agent->addObject (mgmtExchange, mb ? mb->allocateId(this) : 0);
+ agent->addObject (mgmtExchange, agent->allocateId(this));
}
}
}
@@ -145,7 +143,7 @@ void Exchange::setPersistenceId(uint64_t id) const
{
if (mgmtExchange != 0 && persistenceId == 0)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = broker->getManagementAgent();
agent->addObject (mgmtExchange, 0x2000000000000000LL + id);
}
persistenceId = id;
@@ -240,20 +238,22 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang
{
if (parent != 0)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
- if (agent != 0)
- {
- ManagementObject* mo = queue->GetManagementObject();
- if (mo != 0)
- {
- management::ObjectId queueId = mo->getObjectId();
- mgmtBinding = new _qmf::Binding
- (agent, this, (Manageable*) parent, queueId, key, args);
- if (!origin.empty())
- mgmtBinding->set_origin(origin);
- ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
- agent->addObject (mgmtBinding, mb ? mb->allocateId(this) : 0);
- }
+ Broker* broker = parent->getBroker();
+ if (broker != 0) {
+ ManagementAgent* agent = broker->getManagementAgent();
+ if (agent != 0)
+ {
+ ManagementObject* mo = queue->GetManagementObject();
+ if (mo != 0)
+ {
+ management::ObjectId queueId = mo->getObjectId();
+ mgmtBinding = new _qmf::Binding
+ (agent, this, (Manageable*) parent, queueId, key, args);
+ if (!origin.empty())
+ mgmtBinding->set_origin(origin);
+ agent->addObject (mgmtBinding, agent->allocateId(this));
+ }
+ }
}
}
}
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 47c0bdb3af..e33c0c6bbc 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -121,9 +121,10 @@ protected:
public:
typedef boost::shared_ptr<Exchange> shared_ptr;
- QPID_BROKER_EXTERN explicit Exchange(const std::string& name, management::Manageable* parent = 0);
+ QPID_BROKER_EXTERN explicit Exchange(const std::string& name, management::Manageable* parent = 0,
+ Broker* broker = 0);
QPID_BROKER_EXTERN Exchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args,
- management::Manageable* parent = 0);
+ management::Manageable* parent = 0, Broker* broker = 0);
QPID_BROKER_EXTERN virtual ~Exchange();
const std::string& getName() const { return name; }
@@ -167,10 +168,12 @@ public:
void registerDynamicBridge(DynamicBridge* db);
void removeDynamicBridge(DynamicBridge* db);
virtual bool supportsDynamicBinding() { return false; }
+ Broker* getBroker() const { return broker; }
protected:
qpid::sys::Mutex bridgeLock;
std::vector<DynamicBridge*> bridgeVector;
+ Broker* broker;
QPID_BROKER_EXTERN virtual void handleHelloRequest();
void propagateFedOp(const std::string& routingKey, const std::string& tags,
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index bb0eec34ba..85bd65e456 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -45,15 +45,15 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
Exchange::shared_ptr exchange;
if(type == TopicExchange::typeName){
- exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent));
+ exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker));
}else if(type == DirectExchange::typeName){
- exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent));
+ exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker));
}else if(type == FanOutExchange::typeName){
- exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent));
+ exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
}else if (type == HeadersExchange::typeName) {
- exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent));
+ exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
}else if (type == ManagementExchange::typeName) {
- exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent));
+ exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent, broker));
}
else{
FunctionMap::iterator i = factory.find(type);
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index 9edd54f025..34ee173a91 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -45,7 +45,7 @@ class ExchangeRegistry{
typedef boost::function4<Exchange::shared_ptr, const std::string&,
bool, const qpid::framing::FieldTable&, qpid::management::Manageable*> FactoryFunction;
- ExchangeRegistry () : parent(0) {}
+ ExchangeRegistry (Broker* b = 0) : parent(0), broker(b) {}
QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare
(const std::string& name, const std::string& type);
QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare
@@ -84,7 +84,7 @@ class ExchangeRegistry{
FunctionMap factory;
mutable qpid::sys::RWlock lock;
management::Manageable* parent;
-
+ Broker* broker;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index aa1f7ff30a..dc3bda4262 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -38,16 +38,16 @@ const std::string fedOpReorigin("R");
const std::string fedOpHello("H");
}
-FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) :
- Exchange(_name, _parent)
+FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent, Broker* b) :
+ Exchange(_name, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
}
FanOutExchange::FanOutExchange(const std::string& _name, bool _durable,
- const FieldTable& _args, Manageable* _parent) :
- Exchange(_name, _durable, _args, _parent)
+ const FieldTable& _args, Manageable* _parent, Broker* b) :
+ Exchange(_name, _durable, _args, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index edfc4395f4..32da9fe5b5 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -40,11 +40,11 @@ class FanOutExchange : public virtual Exchange {
static const std::string typeName;
QPID_BROKER_EXTERN FanOutExchange(const std::string& name,
- management::Manageable* parent = 0);
+ management::Manageable* parent = 0, Broker* broker = 0);
QPID_BROKER_EXTERN FanOutExchange(const string& _name,
bool _durable,
const qpid::framing::FieldTable& _args,
- management::Manageable* parent = 0);
+ management::Manageable* parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 09fb2d9bef..4b1176d560 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -43,16 +43,16 @@ namespace {
const std::string empty;
}
-HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) :
- Exchange(_name, _parent)
+HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent, Broker* b) :
+ Exchange(_name, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
}
HeadersExchange::HeadersExchange(const std::string& _name, bool _durable,
- const FieldTable& _args, Manageable* _parent) :
- Exchange(_name, _durable, _args, _parent)
+ const FieldTable& _args, Manageable* _parent, Broker* b) :
+ Exchange(_name, _durable, _args, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index 2b01f9ecae..87633c0f0e 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -61,11 +61,11 @@ class HeadersExchange : public virtual Exchange {
static const std::string typeName;
QPID_BROKER_EXTERN HeadersExchange(const string& name,
- management::Manageable* parent = 0);
+ management::Manageable* parent = 0, Broker* broker = 0);
QPID_BROKER_EXTERN HeadersExchange(const string& _name,
bool _durable,
const qpid::framing::FieldTable& _args,
- management::Manageable* parent = 0);
+ management::Manageable* parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index dd1a1fa0b4..a2717bfd4c 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -68,9 +68,9 @@ Link::Link(LinkRegistry* _links,
connection(0),
agent(0)
{
- if (parent != 0)
+ if (parent != 0 && broker != 0)
{
- agent = ManagementAgent::Singleton::getInstance();
+ agent = broker->getManagementAgent();
if (agent != 0)
{
mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable);
@@ -347,7 +347,7 @@ void Link::notifyConnectionForced(const string text)
void Link::setPersistenceId(uint64_t id) const
{
if (mgmtObject != 0 && persistenceId == 0) {
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = broker->getManagementAgent();
agent->addObject(mgmtObject, id);
}
persistenceId = id;
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
index 39014b0ec0..0b504651eb 100644
--- a/cpp/src/qpid/broker/Link.h
+++ b/cpp/src/qpid/broker/Link.h
@@ -30,7 +30,7 @@
#include "qpid/sys/Mutex.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
-#include "qpid/agent/ManagementAgent.h"
+#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Link.h"
#include <boost/ptr_container/ptr_vector.hpp>
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index aa0cd8ca31..6930275361 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -30,7 +30,7 @@
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
-#include "qpid/management/ManagementBroker.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
@@ -48,7 +48,6 @@ using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
using qpid::management::ManagementAgent;
-using qpid::management::ManagementBroker;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
@@ -80,7 +79,8 @@ const int ENQUEUE_AND_DEQUEUE=2;
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
- Manageable* parent) :
+ Manageable* parent,
+ Broker* b) :
name(_name),
autodelete(_autodelete),
@@ -98,11 +98,12 @@ Queue::Queue(const string& _name, bool _autodelete,
mgmtObject(0),
eventMode(0),
eventMgr(0),
- insertSeqNo(0)
+ insertSeqNo(0),
+ broker(b)
{
- if (parent != 0)
+ if (parent != 0 && broker != 0)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0)
{
@@ -111,8 +112,7 @@ Queue::Queue(const string& _name, bool _autodelete,
// Add the object to the management agent only if this queue is not durable.
// If it's durable, we will add it later when the queue is assigned a persistenceId.
if (store == 0) {
- ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
- agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
+ agent->addObject (mgmtObject, agent->allocateId(this));
}
}
}
@@ -838,7 +838,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const
{
if (mgmtObject != 0 && persistenceId == 0)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = broker->getManagementAgent();
agent->addObject (mgmtObject, 0x3000000000000000LL + _persistenceId);
if (externalQueueStore) {
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index c5ef9a9307..0d5f2043d1 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -105,6 +105,7 @@ namespace qpid {
QueueEvents* eventMgr;
bool insertSeqNo;
std::string seqNoKey;
+ Broker* broker;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -158,7 +159,8 @@ namespace qpid {
bool autodelete = false,
MessageStore* const store = 0,
const OwnershipToken* const owner = 0,
- management::Manageable* parent = 0);
+ management::Manageable* parent = 0,
+ Broker* broker = 0);
QPID_BROKER_EXTERN ~Queue();
QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index d079e543c4..60182e1ead 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -27,8 +27,8 @@
using namespace qpid::broker;
using namespace qpid::sys;
-QueueRegistry::QueueRegistry() :
- counter(1), store(0), events(0), parent(0), lastNode(false) {}
+QueueRegistry::QueueRegistry(Broker* b) :
+ counter(1), store(0), events(0), parent(0), lastNode(false), broker(b) {}
QueueRegistry::~QueueRegistry(){}
@@ -42,7 +42,7 @@ QueueRegistry::declare(const string& declareName, bool durable,
QueueMap::iterator i = queues.find(name);
if (i == queues.end()) {
- Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent));
+ Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
queues[name] = queue;
if (lastNode) queue->setLastNodeFailure();
if (events) queue->setQueueEventManager(*events);
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index 3c02afedc4..a4ea65f18c 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -43,7 +43,7 @@ class QueueEvents;
*/
class QueueRegistry {
public:
- QPID_BROKER_EXTERN QueueRegistry();
+ QPID_BROKER_EXTERN QueueRegistry(Broker* b = 0);
QPID_BROKER_EXTERN ~QueueRegistry();
/**
@@ -131,6 +131,7 @@ private:
QueueEvents* events;
management::Manageable* parent;
bool lastNode; //used to set mode on queue declare
+ Broker* broker;
//destroy impl that assumes lock is already held:
void destroyLH (const string& name);
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 96c47085f0..0ddd546a68 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -23,7 +23,7 @@
#include "qpid/framing/enum.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/SequenceSet.h"
-#include "qpid/agent/ManagementAgent.h"
+#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -98,7 +98,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
checkAlternate(response.first, alternate);
}
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type,
alternateExchange, durable, false, args,
@@ -140,7 +140,7 @@ void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifU
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
getBroker().getExchanges().destroy(name);
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name));
}
@@ -181,7 +181,7 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
}
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, arguments));
}
@@ -214,7 +214,7 @@ void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
if (exchange->isDurable() && queue->isDurable())
getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable());
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey));
}
@@ -372,7 +372,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
}
}
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
name, durable, exclusive, autoDelete, arguments,
@@ -422,7 +422,7 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse
getBroker().getQueues().destroy(queue);
q->unbind(getBroker().getExchanges(), q);
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue));
}
@@ -484,7 +484,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
acceptMode == 0, acquireMode == 0, exclusive,
resumeId, resumeTtl, arguments);
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(),
queueName, destination, exclusive, arguments));
@@ -495,7 +495,7 @@ SessionAdapter::MessageHandlerImpl::cancel(const string& destination )
{
state.cancel(destination);
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination));
}
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 7e5f605753..26a35f4a4f 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -32,7 +32,7 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
-#include "qpid/management/ManagementBroker.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include <boost/bind.hpp>
@@ -45,7 +45,6 @@ using namespace framing;
using sys::Mutex;
using boost::intrusive_ptr;
using qpid::management::ManagementAgent;
-using qpid::management::ManagementBroker;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
@@ -73,7 +72,7 @@ SessionState::SessionState(
}
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = getBroker().getManagementAgent();
if (agent != 0) {
mgmtObject = new _qmf::Session
(agent, this, parent, getId().getName());
@@ -81,8 +80,7 @@ SessionState::SessionState(
mgmtObject->set_detachedLifespan (0);
mgmtObject->clr_expireTime();
if (rateFlowcontrol) mgmtObject->set_maxClientRate(maxRate);
- ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
- agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
+ agent->addObject (mgmtObject, agent->allocateId(this));
}
}
attach(h);
diff --git a/cpp/src/qpid/broker/System.cpp b/cpp/src/qpid/broker/System.cpp
index a11ad25bbe..86933109a1 100644
--- a/cpp/src/qpid/broker/System.cpp
+++ b/cpp/src/qpid/broker/System.cpp
@@ -18,7 +18,8 @@
//
#include "System.h"
-#include "qpid/agent/ManagementAgent.h"
+#include "Broker.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/SystemInfo.h"
#include <iostream>
@@ -29,9 +30,9 @@ using namespace qpid::broker;
using namespace std;
namespace _qmf = qmf::org::apache::qpid::broker;
-System::System (string _dataDir) : mgmtObject(0)
+System::System (string _dataDir, Broker* broker) : mgmtObject(0)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = broker ? broker->getManagementAgent() : 0;
if (agent != 0)
{
diff --git a/cpp/src/qpid/broker/System.h b/cpp/src/qpid/broker/System.h
index 42a816e095..0fc2c2bd88 100644
--- a/cpp/src/qpid/broker/System.h
+++ b/cpp/src/qpid/broker/System.h
@@ -28,6 +28,8 @@
namespace qpid {
namespace broker {
+class Broker;
+
class System : public management::Manageable
{
private:
@@ -38,7 +40,7 @@ class System : public management::Manageable
typedef boost::shared_ptr<System> shared_ptr;
- System (std::string _dataDir);
+ System (std::string _dataDir, Broker* broker = 0);
management::ManagementObject* GetManagementObject (void) const
{ return mgmtObject; }
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index d4f9721162..a465c35790 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -137,15 +137,15 @@ bool TopicPattern::match(const Tokens& target) const
return do_match(begin(), end(), target.begin(), target.end());
}
-TopicExchange::TopicExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
}
TopicExchange::TopicExchange(const std::string& _name, bool _durable,
- const FieldTable& _args, Manageable* _parent) :
- Exchange(_name, _durable, _args, _parent)
+ const FieldTable& _args, Manageable* _parent, Broker* b) :
+ Exchange(_name, _durable, _args, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index 24bf5f7bca..b3ee1ea66d 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -86,11 +86,11 @@ class TopicExchange : public virtual Exchange {
static const std::string typeName;
QPID_BROKER_EXTERN TopicExchange(const string& name,
- management::Manageable* parent = 0);
+ management::Manageable* parent = 0, Broker* broker = 0);
QPID_BROKER_EXTERN TopicExchange(const string& _name,
bool _durable,
const qpid::framing::FieldTable& _args,
- management::Manageable* parent = 0);
+ management::Manageable* parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
diff --git a/cpp/src/qpid/broker/Vhost.cpp b/cpp/src/qpid/broker/Vhost.cpp
index c5bb6c5104..aa7683d318 100644
--- a/cpp/src/qpid/broker/Vhost.cpp
+++ b/cpp/src/qpid/broker/Vhost.cpp
@@ -18,7 +18,8 @@
//
#include "Vhost.h"
-#include "qpid/agent/ManagementAgent.h"
+#include "Broker.h"
+#include "qpid/management/ManagementAgent.h"
using namespace qpid::broker;
using qpid::management::ManagementAgent;
@@ -28,11 +29,11 @@ namespace qpid { namespace management {
class Manageable;
}}
-Vhost::Vhost (qpid::management::Manageable* parentBroker) : mgmtObject(0)
+Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker) : mgmtObject(0)
{
- if (parentBroker != 0)
+ if (parentBroker != 0 && broker != 0)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0)
{
diff --git a/cpp/src/qpid/broker/Vhost.h b/cpp/src/qpid/broker/Vhost.h
index ef59362e4d..9554d641c2 100644
--- a/cpp/src/qpid/broker/Vhost.h
+++ b/cpp/src/qpid/broker/Vhost.h
@@ -27,6 +27,7 @@
namespace qpid {
namespace broker {
+class Broker;
class Vhost : public management::Manageable
{
private:
@@ -37,7 +38,7 @@ class Vhost : public management::Manageable
typedef boost::shared_ptr<Vhost> shared_ptr;
- Vhost (management::Manageable* parentBroker);
+ Vhost (management::Manageable* parentBroker, Broker* broker = 0);
management::ManagementObject* GetManagementObject (void) const
{ return mgmtObject; }
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 677bd2b722..1f39fe9ae9 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -44,7 +44,7 @@
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
#include "qpid/management/IdAllocator.h"
-#include "qpid/management/ManagementBroker.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/memory.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/LatencyTracker.h"
@@ -116,7 +116,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
lastBroker(false),
error(*this)
{
- mAgent = ManagementAgent::Singleton::getInstance();
+ mAgent = broker.getManagementAgent();
if (mAgent != 0){
_qmf::Package packageInit(mAgent);
mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str());
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 66d15fa56b..56c50eafae 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -31,7 +31,7 @@
#include "qpid/sys/AtomicValue.h"
#include "qpid/log/Statement.h"
-#include "qpid/management/ManagementBroker.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/management/IdAllocator.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Queue.h"
@@ -49,7 +49,6 @@ using namespace std;
using broker::Broker;
using management::IdAllocator;
using management::ManagementAgent;
-using management::ManagementBroker;
/** Note separating options from settings to work around boost version differences.
@@ -140,7 +139,7 @@ struct ClusterPlugin : public Plugin {
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
- ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance());
+ ManagementAgent* mgmt = broker->getManagementAgent();
if (mgmt) {
std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator());
mgmt->setAllocator(allocator);
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 19300ef1af..77277070d9 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -19,7 +19,8 @@
*
*/
-#include "ManagementBroker.h"
+#include "ManagementAgent.h"
+#include "ManagementObject.h"
#include "IdAllocator.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/log/Statement.h"
@@ -41,45 +42,13 @@ using namespace qpid::sys;
using namespace std;
namespace _qmf = qmf::org::apache::qpid::broker;
-Mutex ManagementAgent::Singleton::lock;
-bool ManagementAgent::Singleton::disabled = false;
-ManagementAgent* ManagementAgent::Singleton::agent = 0;
-int ManagementAgent::Singleton::refCount = 0;
-
-ManagementAgent::Singleton::Singleton(bool disableManagement)
-{
- Mutex::ScopedLock _lock(lock);
- if (disableManagement && !disabled) {
- disabled = true;
- assert(refCount == 0); // can't disable after agent has been allocated
- }
- if (refCount == 0 && !disabled)
- agent = new ManagementBroker();
- refCount++;
-}
-
-ManagementAgent::Singleton::~Singleton()
-{
- Mutex::ScopedLock _lock(lock);
- refCount--;
- if (refCount == 0 && !disabled) {
- delete agent;
- agent = 0;
- }
-}
-
-ManagementAgent* ManagementAgent::Singleton::getInstance()
-{
- return agent;
-}
-
-ManagementBroker::RemoteAgent::~RemoteAgent ()
+ManagementAgent::RemoteAgent::~RemoteAgent ()
{
if (mgmtObject != 0)
mgmtObject->resourceDestroy();
}
-ManagementBroker::ManagementBroker () :
+ManagementAgent::ManagementAgent () :
threadPoolSize(1), interval(10), broker(0), startTime(uint64_t(Duration(now())))
{
nextObjectId = 1;
@@ -90,7 +59,7 @@ ManagementBroker::ManagementBroker () :
clientWasAdded = false;
}
-ManagementBroker::~ManagementBroker ()
+ManagementAgent::~ManagementAgent ()
{
timer.stop();
{
@@ -114,20 +83,21 @@ ManagementBroker::~ManagementBroker ()
}
}
-void ManagementBroker::configure(const string& _dataDir, uint16_t _interval,
+void ManagementAgent::configure(const string& _dataDir, uint16_t _interval,
qpid::broker::Broker* _broker, int _threads)
{
dataDir = _dataDir;
interval = _interval;
broker = _broker;
threadPoolSize = _threads;
+ ManagementObject::maxThreads = threadPoolSize;
timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
// Get from file or generate and save to file.
if (dataDir.empty())
{
uuid.generate();
- QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: "
+ QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: "
<< uuid);
}
else
@@ -141,7 +111,7 @@ void ManagementBroker::configure(const string& _dataDir, uint16_t _interval,
inFile >> bootSequence;
inFile >> nextRemoteBank;
inFile.close();
- QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid);
+ QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid);
// if sequence goes beyond a 12-bit field, skip zero and wrap to 1.
bootSequence++;
@@ -152,15 +122,15 @@ void ManagementBroker::configure(const string& _dataDir, uint16_t _interval,
else
{
uuid.generate();
- QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid);
+ QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid);
writeData();
}
- QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence);
+ QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence);
}
}
-void ManagementBroker::writeData ()
+void ManagementAgent::writeData ()
{
string filename (dataDir + "/.mbrokerdata");
ofstream outFile (filename.c_str ());
@@ -172,14 +142,14 @@ void ManagementBroker::writeData ()
}
}
-void ManagementBroker::setExchange (qpid::broker::Exchange::shared_ptr _mexchange,
+void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange,
qpid::broker::Exchange::shared_ptr _dexchange)
{
mExchange = _mexchange;
dExchange = _dexchange;
}
-void ManagementBroker::registerClass (const string& packageName,
+void ManagementAgent::registerClass (const string& packageName,
const string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
@@ -189,7 +159,7 @@ void ManagementBroker::registerClass (const string& packageName,
addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
}
-void ManagementBroker::registerEvent (const string& packageName,
+void ManagementAgent::registerEvent (const string& packageName,
const string& eventName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
@@ -199,7 +169,7 @@ void ManagementBroker::registerEvent (const string& packageName,
addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
-ObjectId ManagementBroker::addObject (ManagementObject* object,
+ObjectId ManagementAgent::addObject (ManagementObject* object,
uint64_t persistId)
{
Mutex::ScopedLock lock (addLock);
@@ -221,7 +191,7 @@ ObjectId ManagementBroker::addObject (ManagementObject* object,
return objId;
}
-void ManagementBroker::raiseEvent(const ManagementEvent& event, severity_t severity)
+void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity)
{
Mutex::ScopedLock lock (userLock);
Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
@@ -241,18 +211,18 @@ void ManagementBroker::raiseEvent(const ManagementEvent& event, severity_t sever
"console.event.1.0." + event.getPackageName() + "." + event.getEventName());
}
-ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds)
- : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {}
+ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
+ : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), agent(_agent) {}
-ManagementBroker::Periodic::~Periodic () {}
+ManagementAgent::Periodic::~Periodic () {}
-void ManagementBroker::Periodic::fire ()
+void ManagementAgent::Periodic::fire ()
{
- broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval)));
- broker.periodicProcessing ();
+ agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval)));
+ agent.periodicProcessing ();
}
-void ManagementBroker::clientAdded (const std::string& routingKey)
+void ManagementAgent::clientAdded (const std::string& routingKey)
{
if (routingKey.find("console") != 0)
return;
@@ -272,7 +242,7 @@ void ManagementBroker::clientAdded (const std::string& routingKey)
}
}
-void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
{
buf.putOctet ('A');
buf.putOctet ('M');
@@ -281,7 +251,7 @@ void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
buf.putLong (seq);
}
-bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
{
uint8_t h1 = buf.getOctet();
uint8_t h2 = buf.getOctet();
@@ -293,7 +263,7 @@ bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
return h1 == 'A' && h2 == 'M' && h3 == '2';
}
-void ManagementBroker::sendBuffer(Buffer& buf,
+void ManagementAgent::sendBuffer(Buffer& buf,
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
string routingKey)
@@ -327,7 +297,7 @@ void ManagementBroker::sendBuffer(Buffer& buf,
} catch(exception&) {}
}
-void ManagementBroker::moveNewObjectsLH()
+void ManagementAgent::moveNewObjectsLH()
{
Mutex::ScopedLock lock (addLock);
for (ManagementObjectMap::iterator iter = newManagementObjects.begin ();
@@ -337,7 +307,7 @@ void ManagementBroker::moveNewObjectsLH()
newManagementObjects.clear();
}
-void ManagementBroker::periodicProcessing (void)
+void ManagementAgent::periodicProcessing (void)
{
#define BUFSIZE 65536
Mutex::ScopedLock lock (userLock);
@@ -421,7 +391,7 @@ void ManagementBroker::periodicProcessing (void)
}
}
-void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence,
+void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
uint32_t code, string text)
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -435,7 +405,7 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
+bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
const string& routingKey,
const FieldTable* /*args*/)
{
@@ -471,7 +441,7 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
return true;
}
-void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey,
+void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey,
uint32_t sequence, const ConnectionToken* connToken)
{
string methodName;
@@ -532,7 +502,7 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
-void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
+void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
@@ -545,7 +515,7 @@ void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence)
+void ManagementAgent::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence)
{
for (PackageMap::iterator pIter = packages.begin ();
pIter != packages.end ();
@@ -564,7 +534,7 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_
sendCommandComplete (replyToKey, sequence);
}
-void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
+void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
{
string packageName;
@@ -572,7 +542,7 @@ void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey
findOrAddPackageLH(packageName);
}
-void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
string packageName;
@@ -601,7 +571,7 @@ void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, u
sendCommandComplete(replyToKey, sequence);
}
-void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
+void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
{
string packageName;
SchemaClassKey key;
@@ -633,7 +603,7 @@ void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, ui
}
}
-void ManagementBroker::SchemaClass::appendSchema(Buffer& buf)
+void ManagementAgent::SchemaClass::appendSchema(Buffer& buf)
{
// If the management package is attached locally (embedded in the broker or
// linked in via plug-in), call the schema handler directly. If the package
@@ -645,7 +615,7 @@ void ManagementBroker::SchemaClass::appendSchema(Buffer& buf)
buf.putRawData(buffer, bufferLen);
}
-void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -680,7 +650,7 @@ void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey
sendCommandComplete(replyToKey, sequence, 1, "Package not found");
}
-void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
+void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -699,7 +669,7 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo
if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) {
size_t length = validateSchema(inBuffer, cIter->second.kind);
if (length == 0) {
- QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name);
+ QPID_LOG(warning, "Management Agent received invalid schema response: " << packageName << "." << key.name);
cMap.erase(key);
} else {
cIter->second.buffer = (uint8_t*) malloc(length);
@@ -720,7 +690,7 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo
}
}
-bool ManagementBroker::bankInUse (uint32_t bank)
+bool ManagementAgent::bankInUse (uint32_t bank)
{
for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
aIter != remoteAgents.end();
@@ -730,7 +700,7 @@ bool ManagementBroker::bankInUse (uint32_t bank)
return false;
}
-uint32_t ManagementBroker::allocateNewBank ()
+uint32_t ManagementAgent::allocateNewBank ()
{
while (bankInUse (nextRemoteBank))
nextRemoteBank++;
@@ -740,14 +710,14 @@ uint32_t ManagementBroker::allocateNewBank ()
return allocated;
}
-uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank)
+uint32_t ManagementAgent::assignBankLH (uint32_t requestedBank)
{
if (requestedBank == 0 || bankInUse (requestedBank))
return allocateNewBank ();
return requestedBank;
}
-void ManagementBroker::deleteOrphanedAgentsLH()
+void ManagementAgent::deleteOrphanedAgentsLH()
{
vector<ObjectId> deleteList;
@@ -776,7 +746,7 @@ void ManagementBroker::deleteOrphanedAgentsLH()
deleteList.clear();
}
-void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken)
+void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken)
{
string label;
uint32_t requestedBrokerBank, requestedAgentBank;
@@ -827,7 +797,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
FieldTable ft;
FieldTable::ValuePtr value;
@@ -887,7 +857,7 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui
sendCommandComplete(replyToKey, sequence);
}
-bool ManagementBroker::authorizeAgentMessageLH(Message& msg)
+bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
{
Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
uint8_t opcode;
@@ -951,7 +921,7 @@ bool ManagementBroker::authorizeAgentMessageLH(Message& msg)
return true;
}
-void ManagementBroker::dispatchAgentCommandLH(Message& msg)
+void ManagementAgent::dispatchAgentCommandLH(Message& msg)
{
Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
uint8_t opcode;
@@ -968,7 +938,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg)
return;
if (msg.encodedSize() > MA_BUFFER_SIZE) {
- QPID_LOG(debug, "ManagementBroker::dispatchAgentCommandLH: Message too large: " <<
+ QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
msg.encodedSize());
return;
}
@@ -994,7 +964,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg)
}
}
-ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(string name)
+ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string name)
{
PackageMap::iterator pIter = packages.find (name);
if (pIter != packages.end ())
@@ -1003,7 +973,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(stri
// No such package found, create a new map entry.
pair<PackageMap::iterator, bool> result =
packages.insert(pair<string, ClassMap>(name, ClassMap()));
- QPID_LOG (debug, "ManagementBroker added package " << name);
+ QPID_LOG (debug, "ManagementAgent added package " << name);
// Publish a package-indication message
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -1018,7 +988,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(stri
return result.first;
}
-void ManagementBroker::addClassLH(uint8_t kind,
+void ManagementAgent::addClassLH(uint8_t kind,
PackageMap::iterator pIter,
const string& className,
uint8_t* md5Sum,
@@ -1035,20 +1005,20 @@ void ManagementBroker::addClassLH(uint8_t kind,
return;
// No such class found, create a new class with local information.
- QPID_LOG (debug, "ManagementBroker added class " << pIter->first << ":" <<
+ QPID_LOG (debug, "ManagementAgent added class " << pIter->first << ":" <<
key.name);
cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall)));
cIter = cMap.find(key);
}
-void ManagementBroker::encodePackageIndication(Buffer& buf,
+void ManagementAgent::encodePackageIndication(Buffer& buf,
PackageMap::iterator pIter)
{
buf.putShortString((*pIter).first);
}
-void ManagementBroker::encodeClassIndication(Buffer& buf,
+void ManagementAgent::encodeClassIndication(Buffer& buf,
PackageMap::iterator pIter,
ClassMap::iterator cIter)
{
@@ -1060,7 +1030,7 @@ void ManagementBroker::encodeClassIndication(Buffer& buf,
buf.putBin128(key.hash);
}
-size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind)
+size_t ManagementAgent::validateSchema(Buffer& inBuffer, uint8_t kind)
{
if (kind == ManagementItem::CLASS_KIND_TABLE)
return validateTableSchema(inBuffer);
@@ -1069,7 +1039,7 @@ size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind)
return 0;
}
-size_t ManagementBroker::validateTableSchema(Buffer& inBuffer)
+size_t ManagementAgent::validateTableSchema(Buffer& inBuffer)
{
uint32_t start = inBuffer.getPosition();
uint32_t end;
@@ -1115,7 +1085,7 @@ size_t ManagementBroker::validateTableSchema(Buffer& inBuffer)
return end - start;
}
-size_t ManagementBroker::validateEventSchema(Buffer& inBuffer)
+size_t ManagementAgent::validateEventSchema(Buffer& inBuffer)
{
uint32_t start = inBuffer.getPosition();
uint32_t end;
@@ -1147,13 +1117,13 @@ size_t ManagementBroker::validateEventSchema(Buffer& inBuffer)
return end - start;
}
-void ManagementBroker::setAllocator(std::auto_ptr<IdAllocator> a)
+void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a)
{
Mutex::ScopedLock lock (addLock);
allocator = a;
}
-uint64_t ManagementBroker::allocateId(Manageable* object)
+uint64_t ManagementAgent::allocateId(Manageable* object)
{
Mutex::ScopedLock lock (addLock);
if (allocator.get()) return allocator->getIdFor(object);
diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementAgent.h
index a57f73be15..2411e6c277 100644
--- a/cpp/src/qpid/management/ManagementBroker.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -1,5 +1,5 @@
-#ifndef _ManagementBroker_
-#define _ManagementBroker_
+#ifndef _ManagementAgent_
+#define _ManagementAgent_
/*
*
@@ -21,14 +21,15 @@
* under the License.
*
*/
+#include "qpid/broker/BrokerImportExport.h"
#include "qpid/Options.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Timer.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
#include "qpid/broker/ConnectionToken.h"
-#include "qpid/agent/ManagementAgent.h"
#include "ManagementObject.h"
+#include "ManagementEvent.h"
#include "Manageable.h"
#include "qmf/org/apache/qpid/broker/Agent.h"
#include <qpid/framing/AMQFrame.h>
@@ -39,15 +40,27 @@ namespace management {
struct IdAllocator;
-class ManagementBroker : public ManagementAgent
+class ManagementAgent
{
private:
int threadPoolSize;
public:
- ManagementBroker ();
- virtual ~ManagementBroker ();
+ typedef enum {
+ SEV_EMERG = 0,
+ SEV_ALERT = 1,
+ SEV_CRIT = 2,
+ SEV_ERROR = 3,
+ SEV_WARN = 4,
+ SEV_NOTE = 5,
+ SEV_INFO = 6,
+ SEV_DEBUG = 7,
+ SEV_DEFAULT = 8
+ } severity_t;
+
+ ManagementAgent ();
+ virtual ~ManagementAgent ();
void configure (const std::string& dataDir, uint16_t interval,
qpid::broker::Broker* broker, int threadPoolSize);
@@ -55,41 +68,34 @@ public:
void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange,
qpid::broker::Exchange::shared_ptr directExchange);
int getMaxThreads () { return threadPoolSize; }
- void registerClass (const std::string& packageName,
- const std::string& className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
- void registerEvent (const std::string& packageName,
- const std::string& eventName,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
- ObjectId addObject (ManagementObject* object,
- uint64_t persistId = 0);
- void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT);
- void clientAdded (const std::string& routingKey);
+ QPID_BROKER_EXTERN void registerClass (const std::string& packageName,
+ const std::string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ QPID_BROKER_EXTERN void registerEvent (const std::string& packageName,
+ const std::string& eventName,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
+ uint64_t persistId = 0);
+ QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
+ severity_t severity = SEV_DEFAULT);
+ QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
+
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args);
- const framing::Uuid& getUuid() const { return uuid; }
- // Stubs for remote management agent calls
- void init(const std::string&, uint16_t, uint16_t, bool,
- const std::string&, const std::string&, const std::string&,
- const std::string&, const std::string&) { assert(0); }
- void init(const client::ConnectionSettings&, uint16_t, bool, const std::string&) { assert(0); }
- uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
- int getSignalFd () { assert(0); return -1; }
+ const framing::Uuid& getUuid() const { return uuid; }
void setAllocator(std::auto_ptr<IdAllocator> allocator);
uint64_t allocateId(Manageable* object);
private:
- friend class ManagementAgent;
-
struct Periodic : public qpid::broker::TimerTask
{
- ManagementBroker& broker;
+ ManagementAgent& agent;
- Periodic (ManagementBroker& broker, uint32_t seconds);
+ Periodic (ManagementAgent& agent, uint32_t seconds);
virtual ~Periodic ();
void fire ();
};
@@ -239,4 +245,4 @@ private:
}}
-#endif /*!_ManagementBroker_*/
+#endif /*!_ManagementAgent_*/
diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp
index 4dcafbfcdd..0793b2d18c 100644
--- a/cpp/src/qpid/management/ManagementExchange.cpp
+++ b/cpp/src/qpid/management/ManagementExchange.cpp
@@ -27,14 +27,14 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent) :
- Exchange (_name, _parent), TopicExchange(_name, _parent) {}
+ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent, Broker* b) :
+ Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {}
ManagementExchange::ManagementExchange (const std::string& _name,
bool _durable,
const FieldTable& _args,
- Manageable* _parent) :
- Exchange (_name, _durable, _args, _parent),
- TopicExchange(_name, _durable, _args, _parent) {}
+ Manageable* _parent, Broker* b) :
+ Exchange (_name, _durable, _args, _parent, b),
+ TopicExchange(_name, _durable, _args, _parent, b) {}
void ManagementExchange::route (Deliverable& msg,
const string& routingKey,
@@ -60,7 +60,7 @@ bool ManagementExchange::bind (Queue::shared_ptr queue,
return TopicExchange::bind(queue, routingKey, args);
}
-void ManagementExchange::setManagmentAgent (ManagementBroker* agent)
+void ManagementExchange::setManagmentAgent (ManagementAgent* agent)
{
managementAgent = agent;
}
diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h
index d54db1a74e..5e51683515 100644
--- a/cpp/src/qpid/management/ManagementExchange.h
+++ b/cpp/src/qpid/management/ManagementExchange.h
@@ -22,7 +22,7 @@
#define _ManagementExchange_
#include "qpid/broker/TopicExchange.h"
-#include "ManagementBroker.h"
+#include "ManagementAgent.h"
namespace qpid {
namespace broker {
@@ -30,15 +30,15 @@ namespace broker {
class ManagementExchange : public virtual TopicExchange
{
private:
- management::ManagementBroker* managementAgent;
+ management::ManagementAgent* managementAgent;
public:
static const std::string typeName;
- ManagementExchange (const string& name, Manageable* _parent = 0);
+ ManagementExchange (const string& name, Manageable* _parent = 0, Broker* broker = 0);
ManagementExchange (const string& _name, bool _durable,
const qpid::framing::FieldTable& _args,
- Manageable* _parent = 0);
+ Manageable* _parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
@@ -50,7 +50,7 @@ class ManagementExchange : public virtual TopicExchange
const string& routingKey,
const qpid::framing::FieldTable* args);
- void setManagmentAgent (management::ManagementBroker* agent);
+ void setManagmentAgent (management::ManagementAgent* agent);
virtual ~ManagementExchange();
};
diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp
index f4c45de126..08008b3d79 100644
--- a/cpp/src/qpid/management/ManagementObject.cpp
+++ b/cpp/src/qpid/management/ManagementObject.cpp
@@ -21,7 +21,6 @@
#include "Manageable.h"
#include "ManagementObject.h"
-#include "qpid/agent/ManagementAgent.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Thread.h"
@@ -156,6 +155,7 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i)
}}
+int ManagementObject::maxThreads = 1;
int ManagementObject::nextThreadIndex = 0;
void ManagementObject::writeTimestamps (framing::Buffer& buf)
@@ -176,7 +176,7 @@ int ManagementObject::getThreadIndex() {
if (thisIndex == -1) {
sys::Mutex::ScopedLock mutex(accessLock);
thisIndex = nextThreadIndex;
- if (nextThreadIndex < agent->getMaxThreads() - 1)
+ if (nextThreadIndex < maxThreads - 1)
nextThreadIndex++;
}
return thisIndex;
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
index 498169318d..15c2307886 100644
--- a/cpp/src/qpid/management/ManagementObject.h
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -32,7 +32,6 @@ namespace qpid {
namespace management {
class Manageable;
-class ManagementAgent;
class ObjectId;
@@ -111,7 +110,7 @@ public:
class ManagementObject : public ManagementItem
{
- protected:
+protected:
uint64_t createTime;
uint64_t destroyTime;
@@ -122,8 +121,6 @@ class ManagementObject : public ManagementItem
bool deleted;
Manageable* coreObject;
sys::Mutex accessLock;
- ManagementAgent* agent;
- int maxThreads;
uint32_t flags;
static int nextThreadIndex;
@@ -133,13 +130,14 @@ class ManagementObject : public ManagementItem
QPID_COMMON_EXTERN void writeTimestamps(qpid::framing::Buffer& buf);
public:
+ static int maxThreads;
typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
- ManagementObject(ManagementAgent* _agent, Manageable* _core) :
+ ManagementObject(Manageable* _core) :
createTime(uint64_t(qpid::sys::Duration(qpid::sys::now()))),
destroyTime(0), updateTime(createTime), configChanged(true),
instChanged(true), deleted(false),
- coreObject(_core), agent(_agent), forcePublish(false) {}
+ coreObject(_core), forcePublish(false) {}
virtual ~ManagementObject() {}
virtual writeSchemaCall_t getWriteSchemaCall() = 0;