summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xcpp/managementgen/schema.py40
-rw-r--r--cpp/managementgen/templates/Class.cpp5
-rw-r--r--cpp/src/qpid/broker/Broker.cpp5
-rw-r--r--cpp/src/qpid/broker/Deliverable.h1
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.cpp4
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.h1
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp97
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h8
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp80
-rw-r--r--cpp/src/qpid/broker/Exchange.h35
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp10
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h8
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp71
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h7
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp71
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h9
-rw-r--r--cpp/src/qpid/broker/Queue.cpp70
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp77
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h6
-rw-r--r--cpp/src/qpid/broker/TxPublish.cpp4
-rw-r--r--cpp/src/qpid/broker/TxPublish.h4
-rw-r--r--cpp/src/qpid/broker/Vhost.cpp11
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp26
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h9
-rw-r--r--cpp/src/qpid/management/ManagementExchange.cpp11
-rw-r--r--cpp/src/qpid/management/ManagementExchange.h5
26 files changed, 535 insertions, 140 deletions
diff --git a/cpp/managementgen/schema.py b/cpp/managementgen/schema.py
index c3db4eaf53..34121a2544 100755
--- a/cpp/managementgen/schema.py
+++ b/cpp/managementgen/schema.py
@@ -397,6 +397,24 @@ class SchemaArg:
def getDir (self):
return self.dir
+ def genSchema (self, stream):
+ stream.write (" ft = FieldTable ();\n")
+ stream.write (" ft.setString (NAME, \"" + self.name + "\");\n")
+ stream.write (" ft.setInt (TYPE, TYPE_" + self.type.type.base +");\n")
+ stream.write (" ft.setString (DIR, \"" + self.dir + "\");\n")
+ if self.unit != None:
+ stream.write (" ft.setString (UNIT, \"" + self.unit + "\");\n")
+ if self.min != None:
+ stream.write (" ft.setInt (MIN, " + self.min + ");\n")
+ if self.max != None:
+ stream.write (" ft.setInt (MAX, " + self.max + ");\n")
+ if self.maxLen != None:
+ stream.write (" ft.setInt (MAXLEN, " + self.maxLen + ");\n")
+ if self.desc != None:
+ stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n")
+ if self.default != None:
+ stream.write (" ft.setString (DEFAULT, \"" + self.default + "\");\n")
+ stream.write (" buf.put (ft);\n\n")
#=====================================================================================
#
@@ -455,6 +473,16 @@ class SchemaMethod:
dirTag = arg.dir.lower() + "_"
stream.write (" " + ctype + " " + dirTag + arg.getName () + ";\n")
+ def genSchema (self, stream):
+ stream.write (" ft = FieldTable ();\n")
+ stream.write (" ft.setString (NAME, \"" + self.name + "\");\n")
+ stream.write (" ft.setInt (ARGCOUNT, " + str (len (self.args)) + ");\n")
+ if self.desc != None:
+ stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n")
+ stream.write (" buf.put (ft);\n\n")
+ for arg in self.args:
+ arg.genSchema (stream)
+
#=====================================================================================
#
#=====================================================================================
@@ -550,15 +578,6 @@ class SchemaClass:
for inst in self.instElements:
inst.genAccessor (stream)
- def genArgDeclaration (self, stream):
- argsFound = 0
- for method in self.methods:
- argsFound = argsFound + len (method.args)
- for event in self.events:
- argsFound = argsFound + len (event.args)
- if argsFound > 0:
- stream.write ("FieldTable arg;");
-
def genConfigCount (self, stream):
stream.write ("%d" % len (self.configElements))
@@ -683,7 +702,8 @@ class SchemaClass:
number = number + 1
def genMethodSchema (self, stream):
- pass ###########################################################################
+ for method in self.methods:
+ method.genSchema (stream)
def genNameCap (self, stream):
stream.write (self.name.capitalize ())
diff --git a/cpp/managementgen/templates/Class.cpp b/cpp/managementgen/templates/Class.cpp
index 70077d495c..d3b95fd674 100644
--- a/cpp/managementgen/templates/Class.cpp
+++ b/cpp/managementgen/templates/Class.cpp
@@ -53,12 +53,15 @@ namespace {
const string MAX("max");
const string MAXLEN("maxlen");
const string DESC("desc");
+ const string ARGCOUNT("argCount");
+ const string ARGS("args");
+ const string DIR("dir");
+ const string DEFAULT("default");
}
void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
{
FieldTable ft;
- /*MGEN:Class.ArgDeclaration*/
schemaNeeded = false;
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 9ac73c0219..6c8b21bc59 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -130,6 +130,7 @@ Broker::Broker(const Broker::Options& conf) :
sessionManager(conf.ack)
{
if(conf.enableMgmt){
+ ManagementAgent::enableManagement ();
managementAgent = ManagementAgent::getAgent ();
managementAgent->setInterval (conf.mgmtPubInterval);
@@ -154,7 +155,8 @@ Broker::Broker(const Broker::Options& conf) :
Vhost* vhost = new Vhost (this);
vhostObject = Vhost::shared_ptr (vhost);
- queues.setParent (vhost);
+ queues.setParent (vhost);
+ exchanges.setParent (vhost);
}
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
@@ -284,7 +286,6 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
case management::Broker::METHOD_JOINCLUSTER :
case management::Broker::METHOD_LEAVECLUSTER :
- case management::Broker::METHOD_CRASH :
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
}
diff --git a/cpp/src/qpid/broker/Deliverable.h b/cpp/src/qpid/broker/Deliverable.h
index bdea550159..e46d2024bf 100644
--- a/cpp/src/qpid/broker/Deliverable.h
+++ b/cpp/src/qpid/broker/Deliverable.h
@@ -30,6 +30,7 @@ namespace qpid {
bool delivered;
Deliverable() : delivered(false) {}
virtual void deliverTo(Queue::shared_ptr& queue) = 0;
+ virtual uint64_t contentSize() { return 0; }
virtual ~Deliverable(){}
};
}
diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp
index e3fc39ce14..e79a3aa773 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.cpp
+++ b/cpp/src/qpid/broker/DeliverableMessage.cpp
@@ -37,3 +37,7 @@ Message& DeliverableMessage::getMessage()
return *msg;
}
+uint64_t DeliverableMessage::contentSize ()
+{
+ return msg->contentSize ();
+}
diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h
index 39e5c04110..440d1184eb 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.h
+++ b/cpp/src/qpid/broker/DeliverableMessage.h
@@ -33,6 +33,7 @@ namespace qpid {
DeliverableMessage(intrusive_ptr<Message>& msg);
virtual void deliverTo(Queue::shared_ptr& queue);
Message& getMessage();
+ uint64_t contentSize();
virtual ~DeliverableMessage(){}
};
}
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 87f363ff3d..43b707a5c8 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -25,16 +25,37 @@
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
+using qpid::management::Manageable;
-DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {}
-DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+DirectExchange::DirectExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
RWlock::ScopedWlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = queues.begin(); i != queues.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
if (i == queues.end()) {
- bindings[routingKey].push_back(queue);
+ Binding::shared_ptr binding (new Binding (routingKey, queue, this));
+ bindings[routingKey].push_back(binding);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
} else{
return false;
@@ -43,14 +64,21 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = queues.begin(); i != queues.end(); i++)
+ if ((*i)->queue == queue)
+ break;
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
if (i < queues.end()) {
queues.erase(i);
- if(queues.empty()){
+ if (queues.empty()) {
bindings.erase(routingKey);
}
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
} else {
return false;
@@ -59,38 +87,65 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedRlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
int count(0);
- for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
- msg.deliverTo(*i);
- }
+
+ for(i = queues.begin(); i != queues.end(); i++, count++) {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding.get() != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+
if(!count){
QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ }
+ else {
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
+
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
}
}
bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
{
+ std::vector<Binding::shared_ptr>::iterator j;
+
if (routingKey) {
Bindings::iterator i = bindings.find(*routingKey);
- return i != bindings.end() && (!queue || find(i->second.begin(), i->second.end(), queue) != i->second.end());
+
+ if (i == bindings.end())
+ return false;
+ if (!queue)
+ return true;
+ for (j = i->second.begin(); j != i->second.end(); j++)
+ if ((*j)->queue == queue)
+ return true;
} else if (!queue) {
//if no queue or routing key is specified, just report whether any bindings exist
return bindings.size() > 0;
} else {
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
- if (find(i->second.begin(), i->second.end(), queue) != i->second.end()) {
- return true;
- }
- }
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++)
+ for (j = i->second.begin(); j != i->second.end(); j++)
+ if ((*j)->queue == queue)
+ return true;
return false;
}
-}
-
-DirectExchange::~DirectExchange(){
+ return false;
}
+DirectExchange::~DirectExchange() {}
const std::string DirectExchange::typeName("direct");
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 243f51d6a8..118f2ed4d3 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -31,17 +31,17 @@
namespace qpid {
namespace broker {
class DirectExchange : public virtual Exchange{
- typedef std::vector<Queue::shared_ptr> Queues;
- typedef std::map<string, Queues > Bindings;
+ typedef std::vector<Binding::shared_ptr> Queues;
+ typedef std::map<string, Queues> Bindings;
Bindings bindings;
qpid::sys::RWlock lock;
public:
static const std::string typeName;
- DirectExchange(const std::string& name);
+ DirectExchange(const std::string& name, management::Manageable* parent = 0);
DirectExchange(const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
virtual std::string getType() const { return typeName; }
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 2d5cb09d7c..83466085bc 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -21,10 +21,52 @@
#include "Exchange.h"
#include "ExchangeRegistry.h"
+#include "qpid/management/ManagementAgent.h"
using namespace qpid::broker;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+
+Exchange::Exchange (const string& _name, Manageable* parent) :
+ name(_name), durable(false), persistenceId(0)
+{
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ if (agent.get () != 0)
+ {
+ mgmtExchange = management::Exchange::shared_ptr
+ (new management::Exchange (this, parent, _name));
+ agent->addObject (mgmtExchange);
+ }
+ }
+}
+
+Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
+ Manageable* parent)
+ : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0)
+{
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ if (agent.get () != 0)
+ {
+ mgmtExchange = management::Exchange::shared_ptr
+ (new management::Exchange (this, parent, _name));
+ agent->addObject (mgmtExchange);
+ }
+ }
+}
+
+Exchange::~Exchange ()
+{
+ if (mgmtExchange.get () != 0)
+ mgmtExchange->resourceDestroy ();
+}
Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer)
{
@@ -56,5 +98,43 @@ uint32_t Exchange::encodedSize() const
+ args.size();
}
+ManagementObject::shared_ptr Exchange::GetManagementObject (void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtExchange);
+}
+Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent)
+ : queue(_queue), key(_key)
+{
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ if (agent.get() != 0)
+ {
+ ManagementObject::shared_ptr mo = queue->GetManagementObject();
+ if (mo.get() != 0)
+ {
+ uint64_t queueId = mo->getObjectId();
+ mgmtBinding = management::Binding::shared_ptr
+ (new management::Binding (this, (Manageable*) parent, queueId, key));
+ agent->addObject (mgmtBinding);
+ }
+ }
+ }
+}
+
+Exchange::Binding::~Binding ()
+{
+ if (mgmtBinding.get () != 0)
+ mgmtBinding->resourceDestroy ();
+}
+ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtBinding);
+}
+
+Manageable::status_t Exchange::Binding::ManagementMethod (uint32_t, Args&)
+{
+ return Manageable::STATUS_UNKNOWN_METHOD;
+}
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 5febca0ae9..e9f5b3965a 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -28,13 +28,16 @@
#include "MessageStore.h"
#include "PersistableExchange.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Exchange.h"
+#include "qpid/management/Binding.h"
namespace qpid {
namespace broker {
using std::string;
class ExchangeRegistry;
- class Exchange : public PersistableExchange{
+ class Exchange : public PersistableExchange, public management::Manageable {
private:
const string name;
const bool durable;
@@ -43,13 +46,31 @@ namespace qpid {
uint32_t alternateUsers;
mutable uint64_t persistenceId;
+ protected:
+ struct Binding : public management::Manageable {
+ typedef boost::shared_ptr<Binding> shared_ptr;
+ typedef std::vector<Binding::shared_ptr> vector;
+
+ Queue::shared_ptr queue;
+ const std::string key;
+ const qpid::framing::FieldTable args;
+ management::Binding::shared_ptr mgmtBinding;
+
+ Binding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent = 0);
+ ~Binding ();
+ management::ManagementObject::shared_ptr GetManagementObject () const;
+ management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args);
+ };
+
+ management::Exchange::shared_ptr mgmtExchange;
+
public:
typedef boost::shared_ptr<Exchange> shared_ptr;
- explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){}
- Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args)
- : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){}
- virtual ~Exchange(){}
+ explicit Exchange(const string& name, management::Manageable* parent = 0);
+ Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
+ management::Manageable* parent = 0);
+ virtual ~Exchange();
const string& getName() const { return name; }
bool isDurable() { return durable; }
@@ -75,6 +96,10 @@ namespace qpid {
static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t, management::Args&) { return management::Manageable::STATUS_UNKNOWN_METHOD; }
};
}
}
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 2f58e23c23..58d9d5efb8 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -46,15 +46,15 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
Exchange::shared_ptr exchange;
if(type == TopicExchange::typeName){
- exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent));
}else if(type == DirectExchange::typeName){
- exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent));
}else if(type == FanOutExchange::typeName){
- exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent));
}else if (type == HeadersExchange::typeName) {
- exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent));
}else if (type == ManagementExchange::typeName) {
- exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent));
}else{
throw UnknownExchangeTypeException();
}
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index dd75940c4c..f39bd661fa 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -27,6 +27,7 @@
#include "MessageStore.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
+#include "qpid/management/Manageable.h"
namespace qpid {
namespace broker {
@@ -36,7 +37,9 @@ namespace broker {
typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
ExchangeMap exchanges;
qpid::sys::RWlock lock;
+ management::Manageable* parent;
public:
+ ExchangeRegistry () : parent(0) {}
std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type)
throw(UnknownExchangeTypeException);
std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type,
@@ -45,6 +48,11 @@ namespace broker {
void destroy(const std::string& name);
Exchange::shared_ptr get(const std::string& name);
Exchange::shared_ptr getDefault();
+
+ /**
+ * Register the manageable parent for declared queues
+ */
+ void setParent (management::Manageable* _parent) { parent = _parent; }
};
}
}
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index ea2327c788..714d4ea444 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -25,15 +25,36 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
-FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) :
+ Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+FanOutExchange::FanOutExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
RWlock::ScopedWlock locker(lock);
+ std::vector<Binding::shared_ptr>::iterator i;
+
// Add if not already present.
- Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ for (i = bindings.begin (); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
if (i == bindings.end()) {
- bindings.push_back(queue);
+ Binding::shared_ptr binding (new Binding ("", queue, this));
+ bindings.push_back(binding);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
} else {
return false;
@@ -42,9 +63,17 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/,
bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
RWlock::ScopedWlock locker(lock);
- Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = bindings.begin (); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
if (i != bindings.end()) {
bindings.erase(i);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
} else {
return false;
@@ -53,14 +82,40 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
RWlock::ScopedRlock locker(lock);
- for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
- msg.deliverTo(*i);
+ uint32_t count(0);
+
+ for(std::vector<Binding::shared_ptr>::iterator i = bindings.begin(); i != bindings.end(); ++i, count++){
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding.get() != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+
+ if (mgmtExchange.get() != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
}
}
bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
{
- return std::find(bindings.begin(), bindings.end(), queue) != bindings.end();
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = bindings.begin (); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
+ return i != bindings.end();
}
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index 625afc8cce..4bc92f6b28 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -32,15 +32,16 @@ namespace qpid {
namespace broker {
class FanOutExchange : public virtual Exchange {
- std::vector<Queue::shared_ptr> bindings;
+ std::vector<Binding::shared_ptr> bindings;
qpid::sys::RWlock lock;
public:
static const std::string typeName;
- FanOutExchange(const std::string& name);
+ FanOutExchange(const std::string& name, management::Manageable* parent = 0);
FanOutExchange(const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args,
+ management::Manageable* parent = 0);
virtual std::string getType() const { return typeName; }
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index dd688cdfcf..c0f6cf19d2 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -40,19 +40,40 @@ namespace {
const std::string x_match("x-match");
}
-HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
-HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) :
+ Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+HeadersExchange::HeadersExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
RWlock::ScopedWlock locker(lock);
FieldTable::ValuePtr what = args->get(x_match);
if (!what || (*what != all && *what != any))
throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
- Binding binding(*args, queue);
- Bindings::iterator i =
- std::find(bindings.begin(),bindings.end(), binding);
+ Bindings::iterator i;
+
+ for (i = bindings.begin(); i != bindings.end(); i++)
+ if (i->first == *args && i->second->queue == queue)
+ break;
+
if (i == bindings.end()) {
- bindings.push_back(binding);
+ Binding::shared_ptr binding (new Binding ("", queue, this));
+ HeaderMap headerMap(*args, binding);
+
+ bindings.push_back(headerMap);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
} else {
return false;
@@ -61,10 +82,16 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/
bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
RWlock::ScopedWlock locker(lock);
- Bindings::iterator i =
- std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
+ Bindings::iterator i;
+ for (i = bindings.begin(); i != bindings.end(); i++)
+ if (i->first == *args && i->second->queue == queue)
+ break;
+
if (i != bindings.end()) {
bindings.erase(i);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
} else {
return false;
@@ -73,9 +100,29 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
- RWlock::ScopedRlock locker(lock);;
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, *args)) msg.deliverTo(i->second);
+ RWlock::ScopedRlock locker(lock);
+ uint32_t count(0);
+
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) {
+ if (match(i->first, *args)) msg.deliverTo(i->second->queue);
+ if (i->second->mgmtBinding.get() != 0)
+ i->second->mgmtBinding->inc_msgMatched ();
+ }
+
+ if (mgmtExchange.get() != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
}
}
@@ -83,7 +130,7 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons
bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
{
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if ( (!args || equal(i->first, *args)) && (!queue || i->second == queue)) {
+ if ( (!args || equal(i->first, *args)) && (!queue || i->second->queue == queue)) {
return true;
}
}
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index f7abf3514b..4f654179c5 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -32,8 +32,8 @@ namespace broker {
class HeadersExchange : public virtual Exchange {
- typedef std::pair<qpid::framing::FieldTable, Queue::shared_ptr> Binding;
- typedef std::vector<Binding> Bindings;
+ typedef std::pair<qpid::framing::FieldTable, Binding::shared_ptr> HeaderMap;
+ typedef std::vector<HeaderMap> Bindings;
Bindings bindings;
qpid::sys::RWlock lock;
@@ -41,9 +41,10 @@ class HeadersExchange : public virtual Exchange {
public:
static const std::string typeName;
- HeadersExchange(const string& name);
+ HeadersExchange(const string& name, management::Manageable* parent = 0);
HeadersExchange(const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args,
+ management::Manageable* parent = 0);
virtual std::string getType() const { return typeName; }
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 4dba60cd0d..e2fd998cc0 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -59,11 +59,14 @@ Queue::Queue(const string& _name, bool _autodelete,
{
if (parent != 0)
{
- mgmtObject = management::Queue::shared_ptr
- (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0));
-
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
- agent->addObject (mgmtObject);
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::Queue::shared_ptr
+ (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0));
+ agent->addObject (mgmtObject);
+ }
}
}
@@ -93,14 +96,14 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
if (!enqueue(0, msg)){
push(msg);
msg->enqueueComplete();
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
mgmtObject->inc_byteDepth (msg->contentSize ());
}
}else {
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
@@ -118,7 +121,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
void Queue::recover(intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
@@ -136,7 +139,7 @@ void Queue::recover(intrusive_ptr<Message>& msg){
void Queue::process(intrusive_ptr<Message>& msg){
push(msg);
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgTxnEnqueues ();
@@ -319,7 +322,7 @@ void Queue::consume(Consumer&, bool requestExclusive){
}
consumerCount++;
- if (mgmtObject != 0){
+ if (mgmtObject.get() != 0){
mgmtObject->inc_consumers ();
}
}
@@ -329,7 +332,7 @@ void Queue::cancel(Consumer& c){
Mutex::ScopedLock locker(consumerLock);
consumerCount--;
if(exclusive) exclusive = false;
- if (mgmtObject != 0){
+ if (mgmtObject.get() != 0){
mgmtObject->dec_consumers ();
}
}
@@ -341,16 +344,6 @@ QueuedMessage Queue::dequeue(){
if(!messages.empty()){
msg = messages.front();
pop();
- if (mgmtObject != 0){
- mgmtObject->inc_msgTotalDequeues ();
- //mgmtObject->inc_byteTotalDequeues (msg->contentSize ());
- mgmtObject->dec_msgDepth ();
- //mgmtObject->dec_byteDepth (msg->contentSize ());
- if (0){//msg->isPersistent ()) {
- mgmtObject->inc_msgPersistDequeues ();
- //mgmtObject->inc_bytePersistDequeues (msg->contentSize ());
- }
- }
}
return msg;
}
@@ -366,7 +359,19 @@ uint32_t Queue::purge(){
* Assumes messageLock is held
*/
void Queue::pop(){
- if (policy.get()) policy->dequeued(messages.front().payload->contentSize());
+ QueuedMessage& msg = messages.front();
+
+ if (policy.get()) policy->dequeued(msg.payload->contentSize());
+ if (mgmtObject.get() != 0){
+ mgmtObject->inc_msgTotalDequeues ();
+ mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
+ mgmtObject->dec_msgDepth ();
+ mgmtObject->dec_byteDepth (msg.payload->contentSize());
+ if (msg.payload->isPersistent ()){
+ mgmtObject->inc_msgPersistDequeues ();
+ mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
+ }
+ }
messages.pop_front();
}
@@ -473,7 +478,8 @@ void Queue::destroy()
}
}
-void Queue::bound(const string& exchange, const string& key, const FieldTable& args)
+void Queue::bound(const string& exchange, const string& key,
+ const FieldTable& args)
{
bindings.add(exchange, key, args);
}
@@ -584,8 +590,24 @@ ManagementObject::shared_ptr Queue::GetManagementObject (void) const
return dynamic_pointer_cast<ManagementObject> (mgmtObject);
}
-Manageable::status_t Queue::ManagementMethod (uint32_t /*methodId*/,
+Manageable::status_t Queue::ManagementMethod (uint32_t methodId,
Args& /*args*/)
{
- return Manageable::STATUS_OK;
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+
+ switch (methodId)
+ {
+ case management::Queue::METHOD_PURGE :
+ purge ();
+ status = Manageable::STATUS_OK;
+ break;
+
+ case management::Queue::METHOD_INCREASEJOURNALSIZE :
+ status = Manageable::STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+
+ return status;
}
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index bc761cf34d..5330ee4fd0 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -115,9 +115,19 @@ bool TopicPattern::match(const Tokens& target) const
return do_match(begin(), end(), target.begin(), target.end());
}
-TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
-TopicExchange::TopicExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+TopicExchange::TopicExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+TopicExchange::TopicExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
@@ -125,7 +135,11 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
if (isBound(queue, routingPattern)) {
return false;
} else {
- bindings[routingPattern].push_back(queue);
+ Binding::shared_ptr binding (new Binding (routingKey, queue, this));
+ bindings[routingPattern].push_back(binding);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
}
}
@@ -133,12 +147,19 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
- Queue::vector& qv(bi->second);
+ Binding::vector& qv(bi->second);
if (bi == bindings.end()) return false;
- Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
+
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++)
+ if ((*q)->queue == queue)
+ break;
if(q == qv.end()) return false;
qv.erase(q);
if(qv.empty()) bindings.erase(bi);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
}
@@ -146,21 +167,45 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern)
{
BindingMap::iterator bi = bindings.find(pattern);
if (bi == bindings.end()) return false;
- Queue::vector& qv(bi->second);
- return find(qv.begin(), qv.end(), queue) != qv.end();
+ Binding::vector& qv(bi->second);
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++)
+ if ((*q)->queue == queue)
+ break;
+ return q != qv.end();
}
void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedRlock l(lock);
+ uint32_t count(0);
Tokens tokens(routingKey);
+
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (i->first.match(tokens)) {
- Queue::vector& qv(i->second);
- for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
- msg.deliverTo(*j);
+ Binding::vector& qv(i->second);
+ for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
+ msg.deliverTo((*j)->queue);
+ if ((*j)->mgmtBinding.get() != 0)
+ (*j)->mgmtBinding->inc_msgMatched ();
}
}
}
+
+ if (mgmtExchange.get() != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
}
bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
@@ -176,16 +221,16 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing
return true;
}
}
- return false;
} else {
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- Queue::vector& qv(i->second);
- if (find(qv.begin(), qv.end(), queue) != qv.end()) {
- return true;
- }
+ Binding::vector& qv(i->second);
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++)
+ if ((*q)->queue == queue)
+ return true;
}
- return false;
}
+ return false;
}
TopicExchange::~TopicExchange() {}
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index e2cc1a3535..2e107142b7 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -71,7 +71,7 @@ class TopicPattern : public Tokens
};
class TopicExchange : public virtual Exchange{
- typedef std::map<TopicPattern, Queue::vector> BindingMap;
+ typedef std::map<TopicPattern, Binding::vector> BindingMap;
BindingMap bindings;
qpid::sys::RWlock lock;
@@ -79,9 +79,9 @@ class TopicExchange : public virtual Exchange{
public:
static const std::string typeName;
- TopicExchange(const string& name);
+ TopicExchange(const string& name, management::Manageable* parent = 0);
TopicExchange(const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
virtual std::string getType() const { return typeName; }
diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp
index 07e72c49f4..0ad2eac080 100644
--- a/cpp/src/qpid/broker/TxPublish.cpp
+++ b/cpp/src/qpid/broker/TxPublish.cpp
@@ -67,3 +67,7 @@ void TxPublish::Commit::operator()(Queue::shared_ptr& queue){
queue->process(msg);
}
+uint64_t TxPublish::contentSize ()
+{
+ return msg->contentSize ();
+}
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
index b4323864bc..085dd28316 100644
--- a/cpp/src/qpid/broker/TxPublish.h
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -66,10 +66,12 @@ namespace qpid {
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
-
+
virtual void deliverTo(Queue::shared_ptr& queue);
virtual ~TxPublish(){}
+
+ uint64_t contentSize();
};
}
}
diff --git a/cpp/src/qpid/broker/Vhost.cpp b/cpp/src/qpid/broker/Vhost.cpp
index 635f345a86..537d2abf0e 100644
--- a/cpp/src/qpid/broker/Vhost.cpp
+++ b/cpp/src/qpid/broker/Vhost.cpp
@@ -27,11 +27,14 @@ Vhost::Vhost (management::Manageable* parentBroker)
{
if (parentBroker != 0)
{
- mgmtObject = management::Vhost::shared_ptr
- (new management::Vhost (this, parentBroker, "/"));
-
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
- agent->addObject (mgmtObject);
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::Vhost::shared_ptr
+ (new management::Vhost (this, parentBroker, "/"));
+ agent->addObject (mgmtObject);
+ }
}
}
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 85c2acce1d..90da74404b 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -33,6 +33,7 @@ using namespace qpid::broker;
using namespace qpid::sys;
ManagementAgent::shared_ptr ManagementAgent::agent;
+bool ManagementAgent::enabled = 0;
ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
{
@@ -40,16 +41,21 @@ ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
}
+void ManagementAgent::enableManagement (void)
+{
+ enabled = 1;
+}
+
ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
{
- if (agent.get () == 0)
+ if (enabled && agent.get () == 0)
agent = shared_ptr (new ManagementAgent (10));
return agent;
}
-void ManagementAgent::setExchange (Exchange::shared_ptr _mexchange,
- Exchange::shared_ptr _dexchange)
+void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange,
+ broker::Exchange::shared_ptr _dexchange)
{
mExchange = _mexchange;
dExchange = _dexchange;
@@ -57,6 +63,7 @@ void ManagementAgent::setExchange (Exchange::shared_ptr _mexchange,
void ManagementAgent::addObject (ManagementObject::shared_ptr object)
{
+ RWlock::ScopedWlock writeLock (userLock);
uint64_t objectId = nextObjectId++;
object->setObjectId (objectId);
@@ -74,6 +81,8 @@ void ManagementAgent::Periodic::fire ()
void ManagementAgent::clientAdded (void)
{
+ RWlock::ScopedRlock readLock (userLock);
+
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
iter != managementObjects.end ();
iter++)
@@ -94,7 +103,7 @@ void ManagementAgent::EncodeHeader (Buffer& buf)
void ManagementAgent::SendBuffer (Buffer& buf,
uint32_t length,
- Exchange::shared_ptr exchange,
+ broker::Exchange::shared_ptr exchange,
string routingKey)
{
intrusive_ptr<Message> msg (new Message ());
@@ -129,9 +138,10 @@ void ManagementAgent::PeriodicProcessing (void)
{
#define BUFSIZE 65536
#define THRESHOLD 16384
- char msgChars[BUFSIZE];
- uint32_t contentSize;
- string routingKey;
+ RWlock::ScopedWlock writeLock (userLock);
+ char msgChars[BUFSIZE];
+ uint32_t contentSize;
+ string routingKey;
std::list<uint64_t> deleteList;
if (managementObjects.empty ())
@@ -157,7 +167,7 @@ void ManagementAgent::PeriodicProcessing (void)
SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
- if (object->getConfigChanged ())
+ if (object->getConfigChanged () || object->isDeleted ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
EncodeHeader (msgBuffer);
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index c33a59adff..a4f10632da 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -25,6 +25,7 @@
#include "qpid/Options.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Timer.h"
+#include "qpid/sys/Mutex.h"
#include "ManagementObject.h"
#include <boost/shared_ptr.hpp>
@@ -41,11 +42,12 @@ class ManagementAgent
typedef boost::shared_ptr<ManagementAgent> shared_ptr;
+ static void enableManagement (void);
static shared_ptr getAgent (void);
void setInterval (uint16_t _interval) { interval = _interval; }
- void setExchange (broker::Exchange::shared_ptr mgmtExchange,
- broker::Exchange::shared_ptr directExchange);
+ void setExchange (broker::Exchange::shared_ptr mgmtExchange,
+ broker::Exchange::shared_ptr directExchange);
void addObject (ManagementObject::shared_ptr object);
void clientAdded (void);
void dispatchCommand (broker::Deliverable& msg,
@@ -64,6 +66,9 @@ class ManagementAgent
};
static shared_ptr agent;
+ static bool enabled;
+
+ qpid::sys::RWlock userLock;
ManagementObjectMap managementObjects;
broker::Timer timer;
broker::Exchange::shared_ptr mExchange;
diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp
index f18b6fc402..ee18f026e7 100644
--- a/cpp/src/qpid/management/ManagementExchange.cpp
+++ b/cpp/src/qpid/management/ManagementExchange.cpp
@@ -27,13 +27,14 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-ManagementExchange::ManagementExchange (const string& _name) :
- Exchange (_name), TopicExchange(_name) {}
+ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent) :
+ Exchange (_name, _parent), TopicExchange(_name, _parent) {}
ManagementExchange::ManagementExchange (const std::string& _name,
bool _durable,
- const FieldTable& _args) :
- Exchange (_name, _durable, _args),
- TopicExchange(_name, _durable, _args) {}
+ const FieldTable& _args,
+ Manageable* _parent) :
+ Exchange (_name, _durable, _args, _parent),
+ TopicExchange(_name, _durable, _args, _parent) {}
bool ManagementExchange::bind (Queue::shared_ptr queue,
diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h
index 6ccdf47182..0fcd65b092 100644
--- a/cpp/src/qpid/management/ManagementExchange.h
+++ b/cpp/src/qpid/management/ManagementExchange.h
@@ -35,9 +35,10 @@ class ManagementExchange : public virtual TopicExchange
public:
static const std::string typeName;
- ManagementExchange (const string& name);
+ ManagementExchange (const string& name, Manageable* _parent = 0);
ManagementExchange (const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args,
+ Manageable* _parent = 0);
virtual std::string getType() const { return typeName; }