diff options
26 files changed, 535 insertions, 140 deletions
diff --git a/cpp/managementgen/schema.py b/cpp/managementgen/schema.py index c3db4eaf53..34121a2544 100755 --- a/cpp/managementgen/schema.py +++ b/cpp/managementgen/schema.py @@ -397,6 +397,24 @@ class SchemaArg: def getDir (self): return self.dir + def genSchema (self, stream): + stream.write (" ft = FieldTable ();\n") + stream.write (" ft.setString (NAME, \"" + self.name + "\");\n") + stream.write (" ft.setInt (TYPE, TYPE_" + self.type.type.base +");\n") + stream.write (" ft.setString (DIR, \"" + self.dir + "\");\n") + if self.unit != None: + stream.write (" ft.setString (UNIT, \"" + self.unit + "\");\n") + if self.min != None: + stream.write (" ft.setInt (MIN, " + self.min + ");\n") + if self.max != None: + stream.write (" ft.setInt (MAX, " + self.max + ");\n") + if self.maxLen != None: + stream.write (" ft.setInt (MAXLEN, " + self.maxLen + ");\n") + if self.desc != None: + stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n") + if self.default != None: + stream.write (" ft.setString (DEFAULT, \"" + self.default + "\");\n") + stream.write (" buf.put (ft);\n\n") #===================================================================================== # @@ -455,6 +473,16 @@ class SchemaMethod: dirTag = arg.dir.lower() + "_" stream.write (" " + ctype + " " + dirTag + arg.getName () + ";\n") + def genSchema (self, stream): + stream.write (" ft = FieldTable ();\n") + stream.write (" ft.setString (NAME, \"" + self.name + "\");\n") + stream.write (" ft.setInt (ARGCOUNT, " + str (len (self.args)) + ");\n") + if self.desc != None: + stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n") + stream.write (" buf.put (ft);\n\n") + for arg in self.args: + arg.genSchema (stream) + #===================================================================================== # #===================================================================================== @@ -550,15 +578,6 @@ class SchemaClass: for inst in self.instElements: inst.genAccessor (stream) - def genArgDeclaration (self, stream): - argsFound = 0 - for method in self.methods: - argsFound = argsFound + len (method.args) - for event in self.events: - argsFound = argsFound + len (event.args) - if argsFound > 0: - stream.write ("FieldTable arg;"); - def genConfigCount (self, stream): stream.write ("%d" % len (self.configElements)) @@ -683,7 +702,8 @@ class SchemaClass: number = number + 1 def genMethodSchema (self, stream): - pass ########################################################################### + for method in self.methods: + method.genSchema (stream) def genNameCap (self, stream): stream.write (self.name.capitalize ()) diff --git a/cpp/managementgen/templates/Class.cpp b/cpp/managementgen/templates/Class.cpp index 70077d495c..d3b95fd674 100644 --- a/cpp/managementgen/templates/Class.cpp +++ b/cpp/managementgen/templates/Class.cpp @@ -53,12 +53,15 @@ namespace { const string MAX("max"); const string MAXLEN("maxlen"); const string DESC("desc"); + const string ARGCOUNT("argCount"); + const string ARGS("args"); + const string DIR("dir"); + const string DEFAULT("default"); } void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) { FieldTable ft; - /*MGEN:Class.ArgDeclaration*/ schemaNeeded = false; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 9ac73c0219..6c8b21bc59 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -130,6 +130,7 @@ Broker::Broker(const Broker::Options& conf) : sessionManager(conf.ack) { if(conf.enableMgmt){ + ManagementAgent::enableManagement (); managementAgent = ManagementAgent::getAgent (); managementAgent->setInterval (conf.mgmtPubInterval); @@ -154,7 +155,8 @@ Broker::Broker(const Broker::Options& conf) : Vhost* vhost = new Vhost (this); vhostObject = Vhost::shared_ptr (vhost); - queues.setParent (vhost); + queues.setParent (vhost); + exchanges.setParent (vhost); } exchanges.declare(empty, DirectExchange::typeName); // Default exchange. @@ -284,7 +286,6 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, case management::Broker::METHOD_JOINCLUSTER : case management::Broker::METHOD_LEAVECLUSTER : - case management::Broker::METHOD_CRASH : status = Manageable::STATUS_NOT_IMPLEMENTED; break; } diff --git a/cpp/src/qpid/broker/Deliverable.h b/cpp/src/qpid/broker/Deliverable.h index bdea550159..e46d2024bf 100644 --- a/cpp/src/qpid/broker/Deliverable.h +++ b/cpp/src/qpid/broker/Deliverable.h @@ -30,6 +30,7 @@ namespace qpid { bool delivered; Deliverable() : delivered(false) {} virtual void deliverTo(Queue::shared_ptr& queue) = 0; + virtual uint64_t contentSize() { return 0; } virtual ~Deliverable(){} }; } diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp index e3fc39ce14..e79a3aa773 100644 --- a/cpp/src/qpid/broker/DeliverableMessage.cpp +++ b/cpp/src/qpid/broker/DeliverableMessage.cpp @@ -37,3 +37,7 @@ Message& DeliverableMessage::getMessage() return *msg; } +uint64_t DeliverableMessage::contentSize () +{ + return msg->contentSize (); +} diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h index 39e5c04110..440d1184eb 100644 --- a/cpp/src/qpid/broker/DeliverableMessage.h +++ b/cpp/src/qpid/broker/DeliverableMessage.h @@ -33,6 +33,7 @@ namespace qpid { DeliverableMessage(intrusive_ptr<Message>& msg); virtual void deliverTo(Queue::shared_ptr& queue); Message& getMessage(); + uint64_t contentSize(); virtual ~DeliverableMessage(){} }; } diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 87f363ff3d..43b707a5c8 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -25,16 +25,37 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; +using qpid::management::Manageable; -DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {} -DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} +DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} + +DirectExchange::DirectExchange(const std::string& _name, bool _durable, + const FieldTable& _args, Manageable* _parent) : + Exchange(_name, _durable, _args, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){ RWlock::ScopedWlock l(lock); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); + std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); + std::vector<Binding::shared_ptr>::iterator i; + + for (i = queues.begin(); i != queues.end(); i++) + if ((*i)->queue == queue) + break; + if (i == queues.end()) { - bindings[routingKey].push_back(queue); + Binding::shared_ptr binding (new Binding (routingKey, queue, this)); + bindings[routingKey].push_back(binding); + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_bindings (); + } return true; } else{ return false; @@ -43,14 +64,21 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedWlock l(lock); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); + std::vector<Binding::shared_ptr>::iterator i; + + for (i = queues.begin(); i != queues.end(); i++) + if ((*i)->queue == queue) + break; - std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); if (i < queues.end()) { queues.erase(i); - if(queues.empty()){ + if (queues.empty()) { bindings.erase(routingKey); } + if (mgmtExchange.get() != 0) { + mgmtExchange->dec_bindings (); + } return true; } else { return false; @@ -59,38 +87,65 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedRlock l(lock); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); + std::vector<Binding::shared_ptr>::iterator i; int count(0); - for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){ - msg.deliverTo(*i); - } + + for(i = queues.begin(); i != queues.end(); i++, count++) { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding.get() != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } + if(!count){ QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey); + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + } + else { + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } + + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); } } bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) { + std::vector<Binding::shared_ptr>::iterator j; + if (routingKey) { Bindings::iterator i = bindings.find(*routingKey); - return i != bindings.end() && (!queue || find(i->second.begin(), i->second.end(), queue) != i->second.end()); + + if (i == bindings.end()) + return false; + if (!queue) + return true; + for (j = i->second.begin(); j != i->second.end(); j++) + if ((*j)->queue == queue) + return true; } else if (!queue) { //if no queue or routing key is specified, just report whether any bindings exist return bindings.size() > 0; } else { - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { - if (find(i->second.begin(), i->second.end(), queue) != i->second.end()) { - return true; - } - } + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) + for (j = i->second.begin(); j != i->second.end(); j++) + if ((*j)->queue == queue) + return true; return false; } -} - -DirectExchange::~DirectExchange(){ + return false; } +DirectExchange::~DirectExchange() {} const std::string DirectExchange::typeName("direct"); diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 243f51d6a8..118f2ed4d3 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -31,17 +31,17 @@ namespace qpid { namespace broker { class DirectExchange : public virtual Exchange{ - typedef std::vector<Queue::shared_ptr> Queues; - typedef std::map<string, Queues > Bindings; + typedef std::vector<Binding::shared_ptr> Queues; + typedef std::map<string, Queues> Bindings; Bindings bindings; qpid::sys::RWlock lock; public: static const std::string typeName; - DirectExchange(const std::string& name); + DirectExchange(const std::string& name, management::Manageable* parent = 0); DirectExchange(const string& _name, bool _durable, - const qpid::framing::FieldTable& _args); + const qpid::framing::FieldTable& _args, management::Manageable* parent = 0); virtual std::string getType() const { return typeName; } diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 2d5cb09d7c..83466085bc 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -21,10 +21,52 @@ #include "Exchange.h" #include "ExchangeRegistry.h" +#include "qpid/management/ManagementAgent.h" using namespace qpid::broker; using qpid::framing::Buffer; using qpid::framing::FieldTable; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; + +Exchange::Exchange (const string& _name, Manageable* parent) : + name(_name), durable(false), persistenceId(0) +{ + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + if (agent.get () != 0) + { + mgmtExchange = management::Exchange::shared_ptr + (new management::Exchange (this, parent, _name)); + agent->addObject (mgmtExchange); + } + } +} + +Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, + Manageable* parent) + : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0) +{ + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + if (agent.get () != 0) + { + mgmtExchange = management::Exchange::shared_ptr + (new management::Exchange (this, parent, _name)); + agent->addObject (mgmtExchange); + } + } +} + +Exchange::~Exchange () +{ + if (mgmtExchange.get () != 0) + mgmtExchange->resourceDestroy (); +} Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer) { @@ -56,5 +98,43 @@ uint32_t Exchange::encodedSize() const + args.size(); } +ManagementObject::shared_ptr Exchange::GetManagementObject (void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtExchange); +} +Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent) + : queue(_queue), key(_key) +{ + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + if (agent.get() != 0) + { + ManagementObject::shared_ptr mo = queue->GetManagementObject(); + if (mo.get() != 0) + { + uint64_t queueId = mo->getObjectId(); + mgmtBinding = management::Binding::shared_ptr + (new management::Binding (this, (Manageable*) parent, queueId, key)); + agent->addObject (mgmtBinding); + } + } + } +} + +Exchange::Binding::~Binding () +{ + if (mgmtBinding.get () != 0) + mgmtBinding->resourceDestroy (); +} +ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtBinding); +} + +Manageable::status_t Exchange::Binding::ManagementMethod (uint32_t, Args&) +{ + return Manageable::STATUS_UNKNOWN_METHOD; +} diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 5febca0ae9..e9f5b3965a 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -28,13 +28,16 @@ #include "MessageStore.h" #include "PersistableExchange.h" #include "qpid/framing/FieldTable.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/Exchange.h" +#include "qpid/management/Binding.h" namespace qpid { namespace broker { using std::string; class ExchangeRegistry; - class Exchange : public PersistableExchange{ + class Exchange : public PersistableExchange, public management::Manageable { private: const string name; const bool durable; @@ -43,13 +46,31 @@ namespace qpid { uint32_t alternateUsers; mutable uint64_t persistenceId; + protected: + struct Binding : public management::Manageable { + typedef boost::shared_ptr<Binding> shared_ptr; + typedef std::vector<Binding::shared_ptr> vector; + + Queue::shared_ptr queue; + const std::string key; + const qpid::framing::FieldTable args; + management::Binding::shared_ptr mgmtBinding; + + Binding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent = 0); + ~Binding (); + management::ManagementObject::shared_ptr GetManagementObject () const; + management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); + }; + + management::Exchange::shared_ptr mgmtExchange; + public: typedef boost::shared_ptr<Exchange> shared_ptr; - explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){} - Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args) - : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){} - virtual ~Exchange(){} + explicit Exchange(const string& name, management::Manageable* parent = 0); + Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, + management::Manageable* parent = 0); + virtual ~Exchange(); const string& getName() const { return name; } bool isDurable() { return durable; } @@ -75,6 +96,10 @@ namespace qpid { static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer); + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t, management::Args&) { return management::Manageable::STATUS_UNKNOWN_METHOD; } }; } } diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index 2f58e23c23..58d9d5efb8 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -46,15 +46,15 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c Exchange::shared_ptr exchange; if(type == TopicExchange::typeName){ - exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args)); + exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent)); }else if(type == DirectExchange::typeName){ - exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args)); + exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent)); }else if(type == FanOutExchange::typeName){ - exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args)); + exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent)); }else if (type == HeadersExchange::typeName) { - exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args)); + exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent)); }else if (type == ManagementExchange::typeName) { - exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args)); + exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent)); }else{ throw UnknownExchangeTypeException(); } diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index dd75940c4c..f39bd661fa 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -27,6 +27,7 @@ #include "MessageStore.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" +#include "qpid/management/Manageable.h" namespace qpid { namespace broker { @@ -36,7 +37,9 @@ namespace broker { typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap; ExchangeMap exchanges; qpid::sys::RWlock lock; + management::Manageable* parent; public: + ExchangeRegistry () : parent(0) {} std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) throw(UnknownExchangeTypeException); std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type, @@ -45,6 +48,11 @@ namespace broker { void destroy(const std::string& name); Exchange::shared_ptr get(const std::string& name); Exchange::shared_ptr getDefault(); + + /** + * Register the manageable parent for declared queues + */ + void setParent (management::Manageable* _parent) { parent = _parent; } }; } } diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index ea2327c788..714d4ea444 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -25,15 +25,36 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {} -FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} +FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) : + Exchange(_name, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} + +FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, + const FieldTable& _args, Manageable* _parent) : + Exchange(_name, _durable, _args, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ RWlock::ScopedWlock locker(lock); + std::vector<Binding::shared_ptr>::iterator i; + // Add if not already present. - Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); + for (i = bindings.begin (); i != bindings.end(); i++) + if ((*i)->queue == queue) + break; + if (i == bindings.end()) { - bindings.push_back(queue); + Binding::shared_ptr binding (new Binding ("", queue, this)); + bindings.push_back(binding); + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_bindings (); + } return true; } else { return false; @@ -42,9 +63,17 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ RWlock::ScopedWlock locker(lock); - Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); + std::vector<Binding::shared_ptr>::iterator i; + + for (i = bindings.begin (); i != bindings.end(); i++) + if ((*i)->queue == queue) + break; + if (i != bindings.end()) { bindings.erase(i); + if (mgmtExchange.get() != 0) { + mgmtExchange->dec_bindings (); + } return true; } else { return false; @@ -53,14 +82,40 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ RWlock::ScopedRlock locker(lock); - for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){ - msg.deliverTo(*i); + uint32_t count(0); + + for(std::vector<Binding::shared_ptr>::iterator i = bindings.begin(); i != bindings.end(); ++i, count++){ + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding.get() != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } + + if (mgmtExchange.get() != 0) + { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + if (count == 0) + { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + else + { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } } } bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const) { - return std::find(bindings.begin(), bindings.end(), queue) != bindings.end(); + std::vector<Binding::shared_ptr>::iterator i; + + for (i = bindings.begin (); i != bindings.end(); i++) + if ((*i)->queue == queue) + break; + + return i != bindings.end(); } diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index 625afc8cce..4bc92f6b28 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -32,15 +32,16 @@ namespace qpid { namespace broker { class FanOutExchange : public virtual Exchange { - std::vector<Queue::shared_ptr> bindings; + std::vector<Binding::shared_ptr> bindings; qpid::sys::RWlock lock; public: static const std::string typeName; - FanOutExchange(const std::string& name); + FanOutExchange(const std::string& name, management::Manageable* parent = 0); FanOutExchange(const string& _name, bool _durable, - const qpid::framing::FieldTable& _args); + const qpid::framing::FieldTable& _args, + management::Manageable* parent = 0); virtual std::string getType() const { return typeName; } diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index dd688cdfcf..c0f6cf19d2 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -40,19 +40,40 @@ namespace { const std::string x_match("x-match"); } -HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { } -HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} +HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) : + Exchange(_name, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} + +HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, + const FieldTable& _args, Manageable* _parent) : + Exchange(_name, _durable, _args, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ RWlock::ScopedWlock locker(lock); FieldTable::ValuePtr what = args->get(x_match); if (!what || (*what != all && *what != any)) throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange.")); - Binding binding(*args, queue); - Bindings::iterator i = - std::find(bindings.begin(),bindings.end(), binding); + Bindings::iterator i; + + for (i = bindings.begin(); i != bindings.end(); i++) + if (i->first == *args && i->second->queue == queue) + break; + if (i == bindings.end()) { - bindings.push_back(binding); + Binding::shared_ptr binding (new Binding ("", queue, this)); + HeaderMap headerMap(*args, binding); + + bindings.push_back(headerMap); + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_bindings (); + } return true; } else { return false; @@ -61,10 +82,16 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ RWlock::ScopedWlock locker(lock); - Bindings::iterator i = - std::find(bindings.begin(),bindings.end(), Binding(*args, queue)); + Bindings::iterator i; + for (i = bindings.begin(); i != bindings.end(); i++) + if (i->first == *args && i->second->queue == queue) + break; + if (i != bindings.end()) { bindings.erase(i); + if (mgmtExchange.get() != 0) { + mgmtExchange->dec_bindings (); + } return true; } else { return false; @@ -73,9 +100,29 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ - RWlock::ScopedRlock locker(lock);; - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, *args)) msg.deliverTo(i->second); + RWlock::ScopedRlock locker(lock); + uint32_t count(0); + + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) { + if (match(i->first, *args)) msg.deliverTo(i->second->queue); + if (i->second->mgmtBinding.get() != 0) + i->second->mgmtBinding->inc_msgMatched (); + } + + if (mgmtExchange.get() != 0) + { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + if (count == 0) + { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + else + { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } } } @@ -83,7 +130,7 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args) { for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if ( (!args || equal(i->first, *args)) && (!queue || i->second == queue)) { + if ( (!args || equal(i->first, *args)) && (!queue || i->second->queue == queue)) { return true; } } diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index f7abf3514b..4f654179c5 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -32,8 +32,8 @@ namespace broker { class HeadersExchange : public virtual Exchange { - typedef std::pair<qpid::framing::FieldTable, Queue::shared_ptr> Binding; - typedef std::vector<Binding> Bindings; + typedef std::pair<qpid::framing::FieldTable, Binding::shared_ptr> HeaderMap; + typedef std::vector<HeaderMap> Bindings; Bindings bindings; qpid::sys::RWlock lock; @@ -41,9 +41,10 @@ class HeadersExchange : public virtual Exchange { public: static const std::string typeName; - HeadersExchange(const string& name); + HeadersExchange(const string& name, management::Manageable* parent = 0); HeadersExchange(const string& _name, bool _durable, - const qpid::framing::FieldTable& _args); + const qpid::framing::FieldTable& _args, + management::Manageable* parent = 0); virtual std::string getType() const { return typeName; } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 4dba60cd0d..e2fd998cc0 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -59,11 +59,14 @@ Queue::Queue(const string& _name, bool _autodelete, { if (parent != 0) { - mgmtObject = management::Queue::shared_ptr - (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0)); - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - agent->addObject (mgmtObject); + + if (agent.get () != 0) + { + mgmtObject = management::Queue::shared_ptr + (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0)); + agent->addObject (mgmtObject); + } } } @@ -93,14 +96,14 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ if (!enqueue(0, msg)){ push(msg); msg->enqueueComplete(); - if (mgmtObject != 0) { + if (mgmtObject.get() != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); mgmtObject->inc_byteDepth (msg->contentSize ()); } }else { - if (mgmtObject != 0) { + if (mgmtObject.get() != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); @@ -118,7 +121,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ void Queue::recover(intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued - if (mgmtObject != 0) { + if (mgmtObject.get() != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); @@ -136,7 +139,7 @@ void Queue::recover(intrusive_ptr<Message>& msg){ void Queue::process(intrusive_ptr<Message>& msg){ push(msg); - if (mgmtObject != 0) { + if (mgmtObject.get() != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); @@ -319,7 +322,7 @@ void Queue::consume(Consumer&, bool requestExclusive){ } consumerCount++; - if (mgmtObject != 0){ + if (mgmtObject.get() != 0){ mgmtObject->inc_consumers (); } } @@ -329,7 +332,7 @@ void Queue::cancel(Consumer& c){ Mutex::ScopedLock locker(consumerLock); consumerCount--; if(exclusive) exclusive = false; - if (mgmtObject != 0){ + if (mgmtObject.get() != 0){ mgmtObject->dec_consumers (); } } @@ -341,16 +344,6 @@ QueuedMessage Queue::dequeue(){ if(!messages.empty()){ msg = messages.front(); pop(); - if (mgmtObject != 0){ - mgmtObject->inc_msgTotalDequeues (); - //mgmtObject->inc_byteTotalDequeues (msg->contentSize ()); - mgmtObject->dec_msgDepth (); - //mgmtObject->dec_byteDepth (msg->contentSize ()); - if (0){//msg->isPersistent ()) { - mgmtObject->inc_msgPersistDequeues (); - //mgmtObject->inc_bytePersistDequeues (msg->contentSize ()); - } - } } return msg; } @@ -366,7 +359,19 @@ uint32_t Queue::purge(){ * Assumes messageLock is held */ void Queue::pop(){ - if (policy.get()) policy->dequeued(messages.front().payload->contentSize()); + QueuedMessage& msg = messages.front(); + + if (policy.get()) policy->dequeued(msg.payload->contentSize()); + if (mgmtObject.get() != 0){ + mgmtObject->inc_msgTotalDequeues (); + mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); + mgmtObject->dec_msgDepth (); + mgmtObject->dec_byteDepth (msg.payload->contentSize()); + if (msg.payload->isPersistent ()){ + mgmtObject->inc_msgPersistDequeues (); + mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); + } + } messages.pop_front(); } @@ -473,7 +478,8 @@ void Queue::destroy() } } -void Queue::bound(const string& exchange, const string& key, const FieldTable& args) +void Queue::bound(const string& exchange, const string& key, + const FieldTable& args) { bindings.add(exchange, key, args); } @@ -584,8 +590,24 @@ ManagementObject::shared_ptr Queue::GetManagementObject (void) const return dynamic_pointer_cast<ManagementObject> (mgmtObject); } -Manageable::status_t Queue::ManagementMethod (uint32_t /*methodId*/, +Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& /*args*/) { - return Manageable::STATUS_OK; + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); + + switch (methodId) + { + case management::Queue::METHOD_PURGE : + purge (); + status = Manageable::STATUS_OK; + break; + + case management::Queue::METHOD_INCREASEJOURNALSIZE : + status = Manageable::STATUS_NOT_IMPLEMENTED; + break; + } + + return status; } diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index bc761cf34d..5330ee4fd0 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -115,9 +115,19 @@ bool TopicPattern::match(const Tokens& target) const return do_match(begin(), end(), target.begin(), target.end()); } -TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { } -TopicExchange::TopicExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} +TopicExchange::TopicExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} +TopicExchange::TopicExchange(const std::string& _name, bool _durable, + const FieldTable& _args, Manageable* _parent) : + Exchange(_name, _durable, _args, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedWlock l(lock); @@ -125,7 +135,11 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons if (isBound(queue, routingPattern)) { return false; } else { - bindings[routingPattern].push_back(queue); + Binding::shared_ptr binding (new Binding (routingKey, queue, this)); + bindings[routingPattern].push_back(binding); + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_bindings (); + } return true; } } @@ -133,12 +147,19 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedWlock l(lock); BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); - Queue::vector& qv(bi->second); + Binding::vector& qv(bi->second); if (bi == bindings.end()) return false; - Queue::vector::iterator q = find(qv.begin(), qv.end(), queue); + + Binding::vector::iterator q; + for (q = qv.begin(); q != qv.end(); q++) + if ((*q)->queue == queue) + break; if(q == qv.end()) return false; qv.erase(q); if(qv.empty()) bindings.erase(bi); + if (mgmtExchange.get() != 0) { + mgmtExchange->dec_bindings (); + } return true; } @@ -146,21 +167,45 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern) { BindingMap::iterator bi = bindings.find(pattern); if (bi == bindings.end()) return false; - Queue::vector& qv(bi->second); - return find(qv.begin(), qv.end(), queue) != qv.end(); + Binding::vector& qv(bi->second); + Binding::vector::iterator q; + for (q = qv.begin(); q != qv.end(); q++) + if ((*q)->queue == queue) + break; + return q != qv.end(); } void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedRlock l(lock); + uint32_t count(0); Tokens tokens(routingKey); + for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (i->first.match(tokens)) { - Queue::vector& qv(i->second); - for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){ - msg.deliverTo(*j); + Binding::vector& qv(i->second); + for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){ + msg.deliverTo((*j)->queue); + if ((*j)->mgmtBinding.get() != 0) + (*j)->mgmtBinding->inc_msgMatched (); } } } + + if (mgmtExchange.get() != 0) + { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + if (count == 0) + { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + else + { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } } bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) @@ -176,16 +221,16 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing return true; } } - return false; } else { for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - Queue::vector& qv(i->second); - if (find(qv.begin(), qv.end(), queue) != qv.end()) { - return true; - } + Binding::vector& qv(i->second); + Binding::vector::iterator q; + for (q = qv.begin(); q != qv.end(); q++) + if ((*q)->queue == queue) + return true; } - return false; } + return false; } TopicExchange::~TopicExchange() {} diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index e2cc1a3535..2e107142b7 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -71,7 +71,7 @@ class TopicPattern : public Tokens }; class TopicExchange : public virtual Exchange{ - typedef std::map<TopicPattern, Queue::vector> BindingMap; + typedef std::map<TopicPattern, Binding::vector> BindingMap; BindingMap bindings; qpid::sys::RWlock lock; @@ -79,9 +79,9 @@ class TopicExchange : public virtual Exchange{ public: static const std::string typeName; - TopicExchange(const string& name); + TopicExchange(const string& name, management::Manageable* parent = 0); TopicExchange(const string& _name, bool _durable, - const qpid::framing::FieldTable& _args); + const qpid::framing::FieldTable& _args, management::Manageable* parent = 0); virtual std::string getType() const { return typeName; } diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp index 07e72c49f4..0ad2eac080 100644 --- a/cpp/src/qpid/broker/TxPublish.cpp +++ b/cpp/src/qpid/broker/TxPublish.cpp @@ -67,3 +67,7 @@ void TxPublish::Commit::operator()(Queue::shared_ptr& queue){ queue->process(msg); } +uint64_t TxPublish::contentSize () +{ + return msg->contentSize (); +} diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h index b4323864bc..085dd28316 100644 --- a/cpp/src/qpid/broker/TxPublish.h +++ b/cpp/src/qpid/broker/TxPublish.h @@ -66,10 +66,12 @@ namespace qpid { virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); - + virtual void deliverTo(Queue::shared_ptr& queue); virtual ~TxPublish(){} + + uint64_t contentSize(); }; } } diff --git a/cpp/src/qpid/broker/Vhost.cpp b/cpp/src/qpid/broker/Vhost.cpp index 635f345a86..537d2abf0e 100644 --- a/cpp/src/qpid/broker/Vhost.cpp +++ b/cpp/src/qpid/broker/Vhost.cpp @@ -27,11 +27,14 @@ Vhost::Vhost (management::Manageable* parentBroker) { if (parentBroker != 0) { - mgmtObject = management::Vhost::shared_ptr - (new management::Vhost (this, parentBroker, "/")); - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - agent->addObject (mgmtObject); + + if (agent.get () != 0) + { + mgmtObject = management::Vhost::shared_ptr + (new management::Vhost (this, parentBroker, "/")); + agent->addObject (mgmtObject); + } } } diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 85c2acce1d..90da74404b 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -33,6 +33,7 @@ using namespace qpid::broker; using namespace qpid::sys; ManagementAgent::shared_ptr ManagementAgent::agent; +bool ManagementAgent::enabled = 0; ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) { @@ -40,16 +41,21 @@ ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } +void ManagementAgent::enableManagement (void) +{ + enabled = 1; +} + ManagementAgent::shared_ptr ManagementAgent::getAgent (void) { - if (agent.get () == 0) + if (enabled && agent.get () == 0) agent = shared_ptr (new ManagementAgent (10)); return agent; } -void ManagementAgent::setExchange (Exchange::shared_ptr _mexchange, - Exchange::shared_ptr _dexchange) +void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange, + broker::Exchange::shared_ptr _dexchange) { mExchange = _mexchange; dExchange = _dexchange; @@ -57,6 +63,7 @@ void ManagementAgent::setExchange (Exchange::shared_ptr _mexchange, void ManagementAgent::addObject (ManagementObject::shared_ptr object) { + RWlock::ScopedWlock writeLock (userLock); uint64_t objectId = nextObjectId++; object->setObjectId (objectId); @@ -74,6 +81,8 @@ void ManagementAgent::Periodic::fire () void ManagementAgent::clientAdded (void) { + RWlock::ScopedRlock readLock (userLock); + for (ManagementObjectMap::iterator iter = managementObjects.begin (); iter != managementObjects.end (); iter++) @@ -94,7 +103,7 @@ void ManagementAgent::EncodeHeader (Buffer& buf) void ManagementAgent::SendBuffer (Buffer& buf, uint32_t length, - Exchange::shared_ptr exchange, + broker::Exchange::shared_ptr exchange, string routingKey) { intrusive_ptr<Message> msg (new Message ()); @@ -129,9 +138,10 @@ void ManagementAgent::PeriodicProcessing (void) { #define BUFSIZE 65536 #define THRESHOLD 16384 - char msgChars[BUFSIZE]; - uint32_t contentSize; - string routingKey; + RWlock::ScopedWlock writeLock (userLock); + char msgChars[BUFSIZE]; + uint32_t contentSize; + string routingKey; std::list<uint64_t> deleteList; if (managementObjects.empty ()) @@ -157,7 +167,7 @@ void ManagementAgent::PeriodicProcessing (void) SendBuffer (msgBuffer, contentSize, mExchange, routingKey); } - if (object->getConfigChanged ()) + if (object->getConfigChanged () || object->isDeleted ()) { Buffer msgBuffer (msgChars, BUFSIZE); EncodeHeader (msgBuffer); diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index c33a59adff..a4f10632da 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -25,6 +25,7 @@ #include "qpid/Options.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/Timer.h" +#include "qpid/sys/Mutex.h" #include "ManagementObject.h" #include <boost/shared_ptr.hpp> @@ -41,11 +42,12 @@ class ManagementAgent typedef boost::shared_ptr<ManagementAgent> shared_ptr; + static void enableManagement (void); static shared_ptr getAgent (void); void setInterval (uint16_t _interval) { interval = _interval; } - void setExchange (broker::Exchange::shared_ptr mgmtExchange, - broker::Exchange::shared_ptr directExchange); + void setExchange (broker::Exchange::shared_ptr mgmtExchange, + broker::Exchange::shared_ptr directExchange); void addObject (ManagementObject::shared_ptr object); void clientAdded (void); void dispatchCommand (broker::Deliverable& msg, @@ -64,6 +66,9 @@ class ManagementAgent }; static shared_ptr agent; + static bool enabled; + + qpid::sys::RWlock userLock; ManagementObjectMap managementObjects; broker::Timer timer; broker::Exchange::shared_ptr mExchange; diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp index f18b6fc402..ee18f026e7 100644 --- a/cpp/src/qpid/management/ManagementExchange.cpp +++ b/cpp/src/qpid/management/ManagementExchange.cpp @@ -27,13 +27,14 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -ManagementExchange::ManagementExchange (const string& _name) : - Exchange (_name), TopicExchange(_name) {} +ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent) : + Exchange (_name, _parent), TopicExchange(_name, _parent) {} ManagementExchange::ManagementExchange (const std::string& _name, bool _durable, - const FieldTable& _args) : - Exchange (_name, _durable, _args), - TopicExchange(_name, _durable, _args) {} + const FieldTable& _args, + Manageable* _parent) : + Exchange (_name, _durable, _args, _parent), + TopicExchange(_name, _durable, _args, _parent) {} bool ManagementExchange::bind (Queue::shared_ptr queue, diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h index 6ccdf47182..0fcd65b092 100644 --- a/cpp/src/qpid/management/ManagementExchange.h +++ b/cpp/src/qpid/management/ManagementExchange.h @@ -35,9 +35,10 @@ class ManagementExchange : public virtual TopicExchange public: static const std::string typeName; - ManagementExchange (const string& name); + ManagementExchange (const string& name, Manageable* _parent = 0); ManagementExchange (const string& _name, bool _durable, - const qpid::framing::FieldTable& _args); + const qpid::framing::FieldTable& _args, + Manageable* _parent = 0); virtual std::string getType() const { return typeName; } |