summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-12-06 18:37:18 +0000
committerAlan Conway <aconway@apache.org>2007-12-06 18:37:18 +0000
commitf03223296e70664d3ecd57df357e71202c79eff7 (patch)
tree7d0b5b8bcb6be11ef3294d04c198b4e5e7981fce
parent430e644a4a647c256ae51682d7230c35d954073e (diff)
downloadqpid-python-f03223296e70664d3ecd57df357e71202c79eff7.tar.gz
From Ted Ross <tross@redhat.com>
Queue statistics fixed. Additional objects added (exchange, binding). Changes: M cpp/src/qpid/broker/ExchangeRegistry.h M cpp/src/qpid/broker/ExchangeRegistry.cpp ExchangeRegistry was modified to pass a parent pointer to created exchanges. This parent reference is not stored but is used to link management objects in a hierarchy of ownership. M cpp/src/qpid/broker/Exchange.h M cpp/src/qpid/broker/Exchange.cpp Exchange now inherits Manageable to make it visible via the management interface. The Exchange parent class handles most of the management boilerplate. A Binding struct was introduced to track bindings for management. This is separate from QueueBindings which track bindings for queues. M cpp/src/qpid/broker/HeadersExchange.h M cpp/src/qpid/broker/FanOutExchange.h M cpp/src/qpid/broker/DirectExchange.h M cpp/src/qpid/broker/TopicExchange.h M cpp/src/qpid/broker/HeadersExchange.cpp M cpp/src/qpid/broker/FanOutExchange.cpp M cpp/src/qpid/broker/DirectExchange.cpp M cpp/src/qpid/broker/TopicExchange.cpp M cpp/src/qpid/management/ManagementExchange.cpp M cpp/src/qpid/management/ManagementExchange.h Each exchange type handles management stats in its own specific way. Additionally, the constructors pass the management parent pointer to the constructor or Exchange. An extra layer was added to contain bindings. Instead of directly storing bound queues, the exchanges store "bindings" which are managable constructs. M cpp/src/qpid/broker/Broker.cpp Broker now explicitly enables the management agent. Also sets the management parent (vhost) in the exchange registry. M cpp/src/qpid/broker/Vhost.cpp Updated constructor to be more defensive in case the management agent has not been enabled. M cpp/src/qpid/broker/Queue.cpp Same constructor update as vhost. Moved accounting of dequeues into "pop". Implemented management method handler (purge). M cpp/src/qpid/broker/Deliverable.h A new method was added to extract the content size of the deliverable content (if appropriate). The method is not pure virtual and returns zero if not overridden. M cpp/src/qpid/broker/DeliverableMessage.h M cpp/src/qpid/broker/TxPublish.cpp M cpp/src/qpid/broker/DeliverableMessage.cpp M cpp/src/qpid/broker/TxPublish.h These derivatives of Deliverable were updated with overrides for contenSize. M cpp/src/qpid/management/ManagementAgent.h M cpp/src/qpid/management/ManagementAgent.cpp An "enable" method was added to prevent inadvertent creation of a management agent when not desired. Adding and deleting management objects is now protected by a mutex. Make sure that deleted objects get reported even if neither their configuration nor instrumentation is changed. M specs/management-schema.xml Minor cosmetic updates. Additional parent linkage. M cpp/managementgen/schema.py M cpp/managementgen/templates/Class.cpp Added generated code to publish schema details for methods. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@601807 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xcpp/managementgen/schema.py40
-rw-r--r--cpp/managementgen/templates/Class.cpp5
-rw-r--r--cpp/src/qpid/broker/Broker.cpp5
-rw-r--r--cpp/src/qpid/broker/Deliverable.h1
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.cpp4
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.h1
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp97
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h8
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp80
-rw-r--r--cpp/src/qpid/broker/Exchange.h35
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp10
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h8
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp71
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h7
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp71
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h9
-rw-r--r--cpp/src/qpid/broker/Queue.cpp70
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp77
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h6
-rw-r--r--cpp/src/qpid/broker/TxPublish.cpp4
-rw-r--r--cpp/src/qpid/broker/TxPublish.h4
-rw-r--r--cpp/src/qpid/broker/Vhost.cpp11
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp26
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h9
-rw-r--r--cpp/src/qpid/management/ManagementExchange.cpp11
-rw-r--r--cpp/src/qpid/management/ManagementExchange.h5
26 files changed, 535 insertions, 140 deletions
diff --git a/cpp/managementgen/schema.py b/cpp/managementgen/schema.py
index c3db4eaf53..34121a2544 100755
--- a/cpp/managementgen/schema.py
+++ b/cpp/managementgen/schema.py
@@ -397,6 +397,24 @@ class SchemaArg:
def getDir (self):
return self.dir
+ def genSchema (self, stream):
+ stream.write (" ft = FieldTable ();\n")
+ stream.write (" ft.setString (NAME, \"" + self.name + "\");\n")
+ stream.write (" ft.setInt (TYPE, TYPE_" + self.type.type.base +");\n")
+ stream.write (" ft.setString (DIR, \"" + self.dir + "\");\n")
+ if self.unit != None:
+ stream.write (" ft.setString (UNIT, \"" + self.unit + "\");\n")
+ if self.min != None:
+ stream.write (" ft.setInt (MIN, " + self.min + ");\n")
+ if self.max != None:
+ stream.write (" ft.setInt (MAX, " + self.max + ");\n")
+ if self.maxLen != None:
+ stream.write (" ft.setInt (MAXLEN, " + self.maxLen + ");\n")
+ if self.desc != None:
+ stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n")
+ if self.default != None:
+ stream.write (" ft.setString (DEFAULT, \"" + self.default + "\");\n")
+ stream.write (" buf.put (ft);\n\n")
#=====================================================================================
#
@@ -455,6 +473,16 @@ class SchemaMethod:
dirTag = arg.dir.lower() + "_"
stream.write (" " + ctype + " " + dirTag + arg.getName () + ";\n")
+ def genSchema (self, stream):
+ stream.write (" ft = FieldTable ();\n")
+ stream.write (" ft.setString (NAME, \"" + self.name + "\");\n")
+ stream.write (" ft.setInt (ARGCOUNT, " + str (len (self.args)) + ");\n")
+ if self.desc != None:
+ stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n")
+ stream.write (" buf.put (ft);\n\n")
+ for arg in self.args:
+ arg.genSchema (stream)
+
#=====================================================================================
#
#=====================================================================================
@@ -550,15 +578,6 @@ class SchemaClass:
for inst in self.instElements:
inst.genAccessor (stream)
- def genArgDeclaration (self, stream):
- argsFound = 0
- for method in self.methods:
- argsFound = argsFound + len (method.args)
- for event in self.events:
- argsFound = argsFound + len (event.args)
- if argsFound > 0:
- stream.write ("FieldTable arg;");
-
def genConfigCount (self, stream):
stream.write ("%d" % len (self.configElements))
@@ -683,7 +702,8 @@ class SchemaClass:
number = number + 1
def genMethodSchema (self, stream):
- pass ###########################################################################
+ for method in self.methods:
+ method.genSchema (stream)
def genNameCap (self, stream):
stream.write (self.name.capitalize ())
diff --git a/cpp/managementgen/templates/Class.cpp b/cpp/managementgen/templates/Class.cpp
index 70077d495c..d3b95fd674 100644
--- a/cpp/managementgen/templates/Class.cpp
+++ b/cpp/managementgen/templates/Class.cpp
@@ -53,12 +53,15 @@ namespace {
const string MAX("max");
const string MAXLEN("maxlen");
const string DESC("desc");
+ const string ARGCOUNT("argCount");
+ const string ARGS("args");
+ const string DIR("dir");
+ const string DEFAULT("default");
}
void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
{
FieldTable ft;
- /*MGEN:Class.ArgDeclaration*/
schemaNeeded = false;
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 9ac73c0219..6c8b21bc59 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -130,6 +130,7 @@ Broker::Broker(const Broker::Options& conf) :
sessionManager(conf.ack)
{
if(conf.enableMgmt){
+ ManagementAgent::enableManagement ();
managementAgent = ManagementAgent::getAgent ();
managementAgent->setInterval (conf.mgmtPubInterval);
@@ -154,7 +155,8 @@ Broker::Broker(const Broker::Options& conf) :
Vhost* vhost = new Vhost (this);
vhostObject = Vhost::shared_ptr (vhost);
- queues.setParent (vhost);
+ queues.setParent (vhost);
+ exchanges.setParent (vhost);
}
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
@@ -284,7 +286,6 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
case management::Broker::METHOD_JOINCLUSTER :
case management::Broker::METHOD_LEAVECLUSTER :
- case management::Broker::METHOD_CRASH :
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
}
diff --git a/cpp/src/qpid/broker/Deliverable.h b/cpp/src/qpid/broker/Deliverable.h
index bdea550159..e46d2024bf 100644
--- a/cpp/src/qpid/broker/Deliverable.h
+++ b/cpp/src/qpid/broker/Deliverable.h
@@ -30,6 +30,7 @@ namespace qpid {
bool delivered;
Deliverable() : delivered(false) {}
virtual void deliverTo(Queue::shared_ptr& queue) = 0;
+ virtual uint64_t contentSize() { return 0; }
virtual ~Deliverable(){}
};
}
diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp
index e3fc39ce14..e79a3aa773 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.cpp
+++ b/cpp/src/qpid/broker/DeliverableMessage.cpp
@@ -37,3 +37,7 @@ Message& DeliverableMessage::getMessage()
return *msg;
}
+uint64_t DeliverableMessage::contentSize ()
+{
+ return msg->contentSize ();
+}
diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h
index 39e5c04110..440d1184eb 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.h
+++ b/cpp/src/qpid/broker/DeliverableMessage.h
@@ -33,6 +33,7 @@ namespace qpid {
DeliverableMessage(intrusive_ptr<Message>& msg);
virtual void deliverTo(Queue::shared_ptr& queue);
Message& getMessage();
+ uint64_t contentSize();
virtual ~DeliverableMessage(){}
};
}
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 87f363ff3d..43b707a5c8 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -25,16 +25,37 @@
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
+using qpid::management::Manageable;
-DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {}
-DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+DirectExchange::DirectExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
RWlock::ScopedWlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = queues.begin(); i != queues.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
if (i == queues.end()) {
- bindings[routingKey].push_back(queue);
+ Binding::shared_ptr binding (new Binding (routingKey, queue, this));
+ bindings[routingKey].push_back(binding);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
} else{
return false;
@@ -43,14 +64,21 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = queues.begin(); i != queues.end(); i++)
+ if ((*i)->queue == queue)
+ break;
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
if (i < queues.end()) {
queues.erase(i);
- if(queues.empty()){
+ if (queues.empty()) {
bindings.erase(routingKey);
}
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
} else {
return false;
@@ -59,38 +87,65 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedRlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
int count(0);
- for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
- msg.deliverTo(*i);
- }
+
+ for(i = queues.begin(); i != queues.end(); i++, count++) {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding.get() != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+
if(!count){
QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ }
+ else {
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
+
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
}
}
bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
{
+ std::vector<Binding::shared_ptr>::iterator j;
+
if (routingKey) {
Bindings::iterator i = bindings.find(*routingKey);
- return i != bindings.end() && (!queue || find(i->second.begin(), i->second.end(), queue) != i->second.end());
+
+ if (i == bindings.end())
+ return false;
+ if (!queue)
+ return true;
+ for (j = i->second.begin(); j != i->second.end(); j++)
+ if ((*j)->queue == queue)
+ return true;
} else if (!queue) {
//if no queue or routing key is specified, just report whether any bindings exist
return bindings.size() > 0;
} else {
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
- if (find(i->second.begin(), i->second.end(), queue) != i->second.end()) {
- return true;
- }
- }
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++)
+ for (j = i->second.begin(); j != i->second.end(); j++)
+ if ((*j)->queue == queue)
+ return true;
return false;
}
-}
-
-DirectExchange::~DirectExchange(){
+ return false;
}
+DirectExchange::~DirectExchange() {}
const std::string DirectExchange::typeName("direct");
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 243f51d6a8..118f2ed4d3 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -31,17 +31,17 @@
namespace qpid {
namespace broker {
class DirectExchange : public virtual Exchange{
- typedef std::vector<Queue::shared_ptr> Queues;
- typedef std::map<string, Queues > Bindings;
+ typedef std::vector<Binding::shared_ptr> Queues;
+ typedef std::map<string, Queues> Bindings;
Bindings bindings;
qpid::sys::RWlock lock;
public:
static const std::string typeName;
- DirectExchange(const std::string& name);
+ DirectExchange(const std::string& name, management::Manageable* parent = 0);
DirectExchange(const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
virtual std::string getType() const { return typeName; }
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 2d5cb09d7c..83466085bc 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -21,10 +21,52 @@
#include "Exchange.h"
#include "ExchangeRegistry.h"
+#include "qpid/management/ManagementAgent.h"
using namespace qpid::broker;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+
+Exchange::Exchange (const string& _name, Manageable* parent) :
+ name(_name), durable(false), persistenceId(0)
+{
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ if (agent.get () != 0)
+ {
+ mgmtExchange = management::Exchange::shared_ptr
+ (new management::Exchange (this, parent, _name));
+ agent->addObject (mgmtExchange);
+ }
+ }
+}
+
+Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
+ Manageable* parent)
+ : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0)
+{
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ if (agent.get () != 0)
+ {
+ mgmtExchange = management::Exchange::shared_ptr
+ (new management::Exchange (this, parent, _name));
+ agent->addObject (mgmtExchange);
+ }
+ }
+}
+
+Exchange::~Exchange ()
+{
+ if (mgmtExchange.get () != 0)
+ mgmtExchange->resourceDestroy ();
+}
Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer)
{
@@ -56,5 +98,43 @@ uint32_t Exchange::encodedSize() const
+ args.size();
}
+ManagementObject::shared_ptr Exchange::GetManagementObject (void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtExchange);
+}
+Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent)
+ : queue(_queue), key(_key)
+{
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ if (agent.get() != 0)
+ {
+ ManagementObject::shared_ptr mo = queue->GetManagementObject();
+ if (mo.get() != 0)
+ {
+ uint64_t queueId = mo->getObjectId();
+ mgmtBinding = management::Binding::shared_ptr
+ (new management::Binding (this, (Manageable*) parent, queueId, key));
+ agent->addObject (mgmtBinding);
+ }
+ }
+ }
+}
+
+Exchange::Binding::~Binding ()
+{
+ if (mgmtBinding.get () != 0)
+ mgmtBinding->resourceDestroy ();
+}
+ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtBinding);
+}
+
+Manageable::status_t Exchange::Binding::ManagementMethod (uint32_t, Args&)
+{
+ return Manageable::STATUS_UNKNOWN_METHOD;
+}
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 5febca0ae9..e9f5b3965a 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -28,13 +28,16 @@
#include "MessageStore.h"
#include "PersistableExchange.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Exchange.h"
+#include "qpid/management/Binding.h"
namespace qpid {
namespace broker {
using std::string;
class ExchangeRegistry;
- class Exchange : public PersistableExchange{
+ class Exchange : public PersistableExchange, public management::Manageable {
private:
const string name;
const bool durable;
@@ -43,13 +46,31 @@ namespace qpid {
uint32_t alternateUsers;
mutable uint64_t persistenceId;
+ protected:
+ struct Binding : public management::Manageable {
+ typedef boost::shared_ptr<Binding> shared_ptr;
+ typedef std::vector<Binding::shared_ptr> vector;
+
+ Queue::shared_ptr queue;
+ const std::string key;
+ const qpid::framing::FieldTable args;
+ management::Binding::shared_ptr mgmtBinding;
+
+ Binding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent = 0);
+ ~Binding ();
+ management::ManagementObject::shared_ptr GetManagementObject () const;
+ management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args);
+ };
+
+ management::Exchange::shared_ptr mgmtExchange;
+
public:
typedef boost::shared_ptr<Exchange> shared_ptr;
- explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){}
- Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args)
- : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){}
- virtual ~Exchange(){}
+ explicit Exchange(const string& name, management::Manageable* parent = 0);
+ Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
+ management::Manageable* parent = 0);
+ virtual ~Exchange();
const string& getName() const { return name; }
bool isDurable() { return durable; }
@@ -75,6 +96,10 @@ namespace qpid {
static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t, management::Args&) { return management::Manageable::STATUS_UNKNOWN_METHOD; }
};
}
}
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 2f58e23c23..58d9d5efb8 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -46,15 +46,15 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
Exchange::shared_ptr exchange;
if(type == TopicExchange::typeName){
- exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent));
}else if(type == DirectExchange::typeName){
- exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent));
}else if(type == FanOutExchange::typeName){
- exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent));
}else if (type == HeadersExchange::typeName) {
- exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent));
}else if (type == ManagementExchange::typeName) {
- exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent));
}else{
throw UnknownExchangeTypeException();
}
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index dd75940c4c..f39bd661fa 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -27,6 +27,7 @@
#include "MessageStore.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
+#include "qpid/management/Manageable.h"
namespace qpid {
namespace broker {
@@ -36,7 +37,9 @@ namespace broker {
typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
ExchangeMap exchanges;
qpid::sys::RWlock lock;
+ management::Manageable* parent;
public:
+ ExchangeRegistry () : parent(0) {}
std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type)
throw(UnknownExchangeTypeException);
std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type,
@@ -45,6 +48,11 @@ namespace broker {
void destroy(const std::string& name);
Exchange::shared_ptr get(const std::string& name);
Exchange::shared_ptr getDefault();
+
+ /**
+ * Register the manageable parent for declared queues
+ */
+ void setParent (management::Manageable* _parent) { parent = _parent; }
};
}
}
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index ea2327c788..714d4ea444 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -25,15 +25,36 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
-FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) :
+ Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+FanOutExchange::FanOutExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
RWlock::ScopedWlock locker(lock);
+ std::vector<Binding::shared_ptr>::iterator i;
+
// Add if not already present.
- Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ for (i = bindings.begin (); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
if (i == bindings.end()) {
- bindings.push_back(queue);
+ Binding::shared_ptr binding (new Binding ("", queue, this));
+ bindings.push_back(binding);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
} else {
return false;
@@ -42,9 +63,17 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/,
bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
RWlock::ScopedWlock locker(lock);
- Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = bindings.begin (); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
if (i != bindings.end()) {
bindings.erase(i);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
} else {
return false;
@@ -53,14 +82,40 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
RWlock::ScopedRlock locker(lock);
- for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
- msg.deliverTo(*i);
+ uint32_t count(0);
+
+ for(std::vector<Binding::shared_ptr>::iterator i = bindings.begin(); i != bindings.end(); ++i, count++){
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding.get() != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+
+ if (mgmtExchange.get() != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
}
}
bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
{
- return std::find(bindings.begin(), bindings.end(), queue) != bindings.end();
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = bindings.begin (); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
+ return i != bindings.end();
}
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index 625afc8cce..4bc92f6b28 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -32,15 +32,16 @@ namespace qpid {
namespace broker {
class FanOutExchange : public virtual Exchange {
- std::vector<Queue::shared_ptr> bindings;
+ std::vector<Binding::shared_ptr> bindings;
qpid::sys::RWlock lock;
public:
static const std::string typeName;
- FanOutExchange(const std::string& name);
+ FanOutExchange(const std::string& name, management::Manageable* parent = 0);
FanOutExchange(const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args,
+ management::Manageable* parent = 0);
virtual std::string getType() const { return typeName; }
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index dd688cdfcf..c0f6cf19d2 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -40,19 +40,40 @@ namespace {
const std::string x_match("x-match");
}
-HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
-HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) :
+ Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+HeadersExchange::HeadersExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
RWlock::ScopedWlock locker(lock);
FieldTable::ValuePtr what = args->get(x_match);
if (!what || (*what != all && *what != any))
throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
- Binding binding(*args, queue);
- Bindings::iterator i =
- std::find(bindings.begin(),bindings.end(), binding);
+ Bindings::iterator i;
+
+ for (i = bindings.begin(); i != bindings.end(); i++)
+ if (i->first == *args && i->second->queue == queue)
+ break;
+
if (i == bindings.end()) {
- bindings.push_back(binding);
+ Binding::shared_ptr binding (new Binding ("", queue, this));
+ HeaderMap headerMap(*args, binding);
+
+ bindings.push_back(headerMap);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
} else {
return false;
@@ -61,10 +82,16 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/
bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
RWlock::ScopedWlock locker(lock);
- Bindings::iterator i =
- std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
+ Bindings::iterator i;
+ for (i = bindings.begin(); i != bindings.end(); i++)
+ if (i->first == *args && i->second->queue == queue)
+ break;
+
if (i != bindings.end()) {
bindings.erase(i);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
} else {
return false;
@@ -73,9 +100,29 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
- RWlock::ScopedRlock locker(lock);;
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, *args)) msg.deliverTo(i->second);
+ RWlock::ScopedRlock locker(lock);
+ uint32_t count(0);
+
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) {
+ if (match(i->first, *args)) msg.deliverTo(i->second->queue);
+ if (i->second->mgmtBinding.get() != 0)
+ i->second->mgmtBinding->inc_msgMatched ();
+ }
+
+ if (mgmtExchange.get() != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
}
}
@@ -83,7 +130,7 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons
bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
{
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if ( (!args || equal(i->first, *args)) && (!queue || i->second == queue)) {
+ if ( (!args || equal(i->first, *args)) && (!queue || i->second->queue == queue)) {
return true;
}
}
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index f7abf3514b..4f654179c5 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -32,8 +32,8 @@ namespace broker {
class HeadersExchange : public virtual Exchange {
- typedef std::pair<qpid::framing::FieldTable, Queue::shared_ptr> Binding;
- typedef std::vector<Binding> Bindings;
+ typedef std::pair<qpid::framing::FieldTable, Binding::shared_ptr> HeaderMap;
+ typedef std::vector<HeaderMap> Bindings;
Bindings bindings;
qpid::sys::RWlock lock;
@@ -41,9 +41,10 @@ class HeadersExchange : public virtual Exchange {
public:
static const std::string typeName;
- HeadersExchange(const string& name);
+ HeadersExchange(const string& name, management::Manageable* parent = 0);
HeadersExchange(const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args,
+ management::Manageable* parent = 0);
virtual std::string getType() const { return typeName; }
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 4dba60cd0d..e2fd998cc0 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -59,11 +59,14 @@ Queue::Queue(const string& _name, bool _autodelete,
{
if (parent != 0)
{
- mgmtObject = management::Queue::shared_ptr
- (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0));
-
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
- agent->addObject (mgmtObject);
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::Queue::shared_ptr
+ (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0));
+ agent->addObject (mgmtObject);
+ }
}
}
@@ -93,14 +96,14 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
if (!enqueue(0, msg)){
push(msg);
msg->enqueueComplete();
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
mgmtObject->inc_byteDepth (msg->contentSize ());
}
}else {
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
@@ -118,7 +121,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
void Queue::recover(intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
@@ -136,7 +139,7 @@ void Queue::recover(intrusive_ptr<Message>& msg){
void Queue::process(intrusive_ptr<Message>& msg){
push(msg);
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgTxnEnqueues ();
@@ -319,7 +322,7 @@ void Queue::consume(Consumer&, bool requestExclusive){
}
consumerCount++;
- if (mgmtObject != 0){
+ if (mgmtObject.get() != 0){
mgmtObject->inc_consumers ();
}
}
@@ -329,7 +332,7 @@ void Queue::cancel(Consumer& c){
Mutex::ScopedLock locker(consumerLock);
consumerCount--;
if(exclusive) exclusive = false;
- if (mgmtObject != 0){
+ if (mgmtObject.get() != 0){
mgmtObject->dec_consumers ();
}
}
@@ -341,16 +344,6 @@ QueuedMessage Queue::dequeue(){
if(!messages.empty()){
msg = messages.front();
pop();
- if (mgmtObject != 0){
- mgmtObject->inc_msgTotalDequeues ();
- //mgmtObject->inc_byteTotalDequeues (msg->contentSize ());
- mgmtObject->dec_msgDepth ();
- //mgmtObject->dec_byteDepth (msg->contentSize ());
- if (0){//msg->isPersistent ()) {
- mgmtObject->inc_msgPersistDequeues ();
- //mgmtObject->inc_bytePersistDequeues (msg->contentSize ());
- }
- }
}
return msg;
}
@@ -366,7 +359,19 @@ uint32_t Queue::purge(){
* Assumes messageLock is held
*/
void Queue::pop(){
- if (policy.get()) policy->dequeued(messages.front().payload->contentSize());
+ QueuedMessage& msg = messages.front();
+
+ if (policy.get()) policy->dequeued(msg.payload->contentSize());
+ if (mgmtObject.get() != 0){
+ mgmtObject->inc_msgTotalDequeues ();
+ mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
+ mgmtObject->dec_msgDepth ();
+ mgmtObject->dec_byteDepth (msg.payload->contentSize());
+ if (msg.payload->isPersistent ()){
+ mgmtObject->inc_msgPersistDequeues ();
+ mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
+ }
+ }
messages.pop_front();
}
@@ -473,7 +478,8 @@ void Queue::destroy()
}
}
-void Queue::bound(const string& exchange, const string& key, const FieldTable& args)
+void Queue::bound(const string& exchange, const string& key,
+ const FieldTable& args)
{
bindings.add(exchange, key, args);
}
@@ -584,8 +590,24 @@ ManagementObject::shared_ptr Queue::GetManagementObject (void) const
return dynamic_pointer_cast<ManagementObject> (mgmtObject);
}
-Manageable::status_t Queue::ManagementMethod (uint32_t /*methodId*/,
+Manageable::status_t Queue::ManagementMethod (uint32_t methodId,
Args& /*args*/)
{
- return Manageable::STATUS_OK;
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+
+ switch (methodId)
+ {
+ case management::Queue::METHOD_PURGE :
+ purge ();
+ status = Manageable::STATUS_OK;
+ break;
+
+ case management::Queue::METHOD_INCREASEJOURNALSIZE :
+ status = Manageable::STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+
+ return status;
}
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index bc761cf34d..5330ee4fd0 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -115,9 +115,19 @@ bool TopicPattern::match(const Tokens& target) const
return do_match(begin(), end(), target.begin(), target.end());
}
-TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
-TopicExchange::TopicExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+TopicExchange::TopicExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+TopicExchange::TopicExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
@@ -125,7 +135,11 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
if (isBound(queue, routingPattern)) {
return false;
} else {
- bindings[routingPattern].push_back(queue);
+ Binding::shared_ptr binding (new Binding (routingKey, queue, this));
+ bindings[routingPattern].push_back(binding);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
}
}
@@ -133,12 +147,19 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
- Queue::vector& qv(bi->second);
+ Binding::vector& qv(bi->second);
if (bi == bindings.end()) return false;
- Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
+
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++)
+ if ((*q)->queue == queue)
+ break;
if(q == qv.end()) return false;
qv.erase(q);
if(qv.empty()) bindings.erase(bi);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
}
@@ -146,21 +167,45 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern)
{
BindingMap::iterator bi = bindings.find(pattern);
if (bi == bindings.end()) return false;
- Queue::vector& qv(bi->second);
- return find(qv.begin(), qv.end(), queue) != qv.end();
+ Binding::vector& qv(bi->second);
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++)
+ if ((*q)->queue == queue)
+ break;
+ return q != qv.end();
}
void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedRlock l(lock);
+ uint32_t count(0);
Tokens tokens(routingKey);
+
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (i->first.match(tokens)) {
- Queue::vector& qv(i->second);
- for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
- msg.deliverTo(*j);
+ Binding::vector& qv(i->second);
+ for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
+ msg.deliverTo((*j)->queue);
+ if ((*j)->mgmtBinding.get() != 0)
+ (*j)->mgmtBinding->inc_msgMatched ();
}
}
}
+
+ if (mgmtExchange.get() != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
}
bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
@@ -176,16 +221,16 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing
return true;
}
}
- return false;
} else {
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- Queue::vector& qv(i->second);
- if (find(qv.begin(), qv.end(), queue) != qv.end()) {
- return true;
- }
+ Binding::vector& qv(i->second);
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++)
+ if ((*q)->queue == queue)
+ return true;
}
- return false;
}
+ return false;
}
TopicExchange::~TopicExchange() {}
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index e2cc1a3535..2e107142b7 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -71,7 +71,7 @@ class TopicPattern : public Tokens
};
class TopicExchange : public virtual Exchange{
- typedef std::map<TopicPattern, Queue::vector> BindingMap;
+ typedef std::map<TopicPattern, Binding::vector> BindingMap;
BindingMap bindings;
qpid::sys::RWlock lock;
@@ -79,9 +79,9 @@ class TopicExchange : public virtual Exchange{
public:
static const std::string typeName;
- TopicExchange(const string& name);
+ TopicExchange(const string& name, management::Manageable* parent = 0);
TopicExchange(const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
virtual std::string getType() const { return typeName; }
diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp
index 07e72c49f4..0ad2eac080 100644
--- a/cpp/src/qpid/broker/TxPublish.cpp
+++ b/cpp/src/qpid/broker/TxPublish.cpp
@@ -67,3 +67,7 @@ void TxPublish::Commit::operator()(Queue::shared_ptr& queue){
queue->process(msg);
}
+uint64_t TxPublish::contentSize ()
+{
+ return msg->contentSize ();
+}
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
index b4323864bc..085dd28316 100644
--- a/cpp/src/qpid/broker/TxPublish.h
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -66,10 +66,12 @@ namespace qpid {
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
-
+
virtual void deliverTo(Queue::shared_ptr& queue);
virtual ~TxPublish(){}
+
+ uint64_t contentSize();
};
}
}
diff --git a/cpp/src/qpid/broker/Vhost.cpp b/cpp/src/qpid/broker/Vhost.cpp
index 635f345a86..537d2abf0e 100644
--- a/cpp/src/qpid/broker/Vhost.cpp
+++ b/cpp/src/qpid/broker/Vhost.cpp
@@ -27,11 +27,14 @@ Vhost::Vhost (management::Manageable* parentBroker)
{
if (parentBroker != 0)
{
- mgmtObject = management::Vhost::shared_ptr
- (new management::Vhost (this, parentBroker, "/"));
-
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
- agent->addObject (mgmtObject);
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::Vhost::shared_ptr
+ (new management::Vhost (this, parentBroker, "/"));
+ agent->addObject (mgmtObject);
+ }
}
}
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 85c2acce1d..90da74404b 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -33,6 +33,7 @@ using namespace qpid::broker;
using namespace qpid::sys;
ManagementAgent::shared_ptr ManagementAgent::agent;
+bool ManagementAgent::enabled = 0;
ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
{
@@ -40,16 +41,21 @@ ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
}
+void ManagementAgent::enableManagement (void)
+{
+ enabled = 1;
+}
+
ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
{
- if (agent.get () == 0)
+ if (enabled && agent.get () == 0)
agent = shared_ptr (new ManagementAgent (10));
return agent;
}
-void ManagementAgent::setExchange (Exchange::shared_ptr _mexchange,
- Exchange::shared_ptr _dexchange)
+void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange,
+ broker::Exchange::shared_ptr _dexchange)
{
mExchange = _mexchange;
dExchange = _dexchange;
@@ -57,6 +63,7 @@ void ManagementAgent::setExchange (Exchange::shared_ptr _mexchange,
void ManagementAgent::addObject (ManagementObject::shared_ptr object)
{
+ RWlock::ScopedWlock writeLock (userLock);
uint64_t objectId = nextObjectId++;
object->setObjectId (objectId);
@@ -74,6 +81,8 @@ void ManagementAgent::Periodic::fire ()
void ManagementAgent::clientAdded (void)
{
+ RWlock::ScopedRlock readLock (userLock);
+
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
iter != managementObjects.end ();
iter++)
@@ -94,7 +103,7 @@ void ManagementAgent::EncodeHeader (Buffer& buf)
void ManagementAgent::SendBuffer (Buffer& buf,
uint32_t length,
- Exchange::shared_ptr exchange,
+ broker::Exchange::shared_ptr exchange,
string routingKey)
{
intrusive_ptr<Message> msg (new Message ());
@@ -129,9 +138,10 @@ void ManagementAgent::PeriodicProcessing (void)
{
#define BUFSIZE 65536
#define THRESHOLD 16384
- char msgChars[BUFSIZE];
- uint32_t contentSize;
- string routingKey;
+ RWlock::ScopedWlock writeLock (userLock);
+ char msgChars[BUFSIZE];
+ uint32_t contentSize;
+ string routingKey;
std::list<uint64_t> deleteList;
if (managementObjects.empty ())
@@ -157,7 +167,7 @@ void ManagementAgent::PeriodicProcessing (void)
SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
- if (object->getConfigChanged ())
+ if (object->getConfigChanged () || object->isDeleted ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
EncodeHeader (msgBuffer);
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index c33a59adff..a4f10632da 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -25,6 +25,7 @@
#include "qpid/Options.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Timer.h"
+#include "qpid/sys/Mutex.h"
#include "ManagementObject.h"
#include <boost/shared_ptr.hpp>
@@ -41,11 +42,12 @@ class ManagementAgent
typedef boost::shared_ptr<ManagementAgent> shared_ptr;
+ static void enableManagement (void);
static shared_ptr getAgent (void);
void setInterval (uint16_t _interval) { interval = _interval; }
- void setExchange (broker::Exchange::shared_ptr mgmtExchange,
- broker::Exchange::shared_ptr directExchange);
+ void setExchange (broker::Exchange::shared_ptr mgmtExchange,
+ broker::Exchange::shared_ptr directExchange);
void addObject (ManagementObject::shared_ptr object);
void clientAdded (void);
void dispatchCommand (broker::Deliverable& msg,
@@ -64,6 +66,9 @@ class ManagementAgent
};
static shared_ptr agent;
+ static bool enabled;
+
+ qpid::sys::RWlock userLock;
ManagementObjectMap managementObjects;
broker::Timer timer;
broker::Exchange::shared_ptr mExchange;
diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp
index f18b6fc402..ee18f026e7 100644
--- a/cpp/src/qpid/management/ManagementExchange.cpp
+++ b/cpp/src/qpid/management/ManagementExchange.cpp
@@ -27,13 +27,14 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-ManagementExchange::ManagementExchange (const string& _name) :
- Exchange (_name), TopicExchange(_name) {}
+ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent) :
+ Exchange (_name, _parent), TopicExchange(_name, _parent) {}
ManagementExchange::ManagementExchange (const std::string& _name,
bool _durable,
- const FieldTable& _args) :
- Exchange (_name, _durable, _args),
- TopicExchange(_name, _durable, _args) {}
+ const FieldTable& _args,
+ Manageable* _parent) :
+ Exchange (_name, _durable, _args, _parent),
+ TopicExchange(_name, _durable, _args, _parent) {}
bool ManagementExchange::bind (Queue::shared_ptr queue,
diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h
index 6ccdf47182..0fcd65b092 100644
--- a/cpp/src/qpid/management/ManagementExchange.h
+++ b/cpp/src/qpid/management/ManagementExchange.h
@@ -35,9 +35,10 @@ class ManagementExchange : public virtual TopicExchange
public:
static const std::string typeName;
- ManagementExchange (const string& name);
+ ManagementExchange (const string& name, Manageable* _parent = 0);
ManagementExchange (const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args,
+ Manageable* _parent = 0);
virtual std::string getType() const { return typeName; }