summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/DirectExchange.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-12-06 18:37:18 +0000
committerAlan Conway <aconway@apache.org>2007-12-06 18:37:18 +0000
commitf03223296e70664d3ecd57df357e71202c79eff7 (patch)
tree7d0b5b8bcb6be11ef3294d04c198b4e5e7981fce /cpp/src/qpid/broker/DirectExchange.cpp
parent430e644a4a647c256ae51682d7230c35d954073e (diff)
downloadqpid-python-f03223296e70664d3ecd57df357e71202c79eff7.tar.gz
From Ted Ross <tross@redhat.com>
Queue statistics fixed. Additional objects added (exchange, binding). Changes: M cpp/src/qpid/broker/ExchangeRegistry.h M cpp/src/qpid/broker/ExchangeRegistry.cpp ExchangeRegistry was modified to pass a parent pointer to created exchanges. This parent reference is not stored but is used to link management objects in a hierarchy of ownership. M cpp/src/qpid/broker/Exchange.h M cpp/src/qpid/broker/Exchange.cpp Exchange now inherits Manageable to make it visible via the management interface. The Exchange parent class handles most of the management boilerplate. A Binding struct was introduced to track bindings for management. This is separate from QueueBindings which track bindings for queues. M cpp/src/qpid/broker/HeadersExchange.h M cpp/src/qpid/broker/FanOutExchange.h M cpp/src/qpid/broker/DirectExchange.h M cpp/src/qpid/broker/TopicExchange.h M cpp/src/qpid/broker/HeadersExchange.cpp M cpp/src/qpid/broker/FanOutExchange.cpp M cpp/src/qpid/broker/DirectExchange.cpp M cpp/src/qpid/broker/TopicExchange.cpp M cpp/src/qpid/management/ManagementExchange.cpp M cpp/src/qpid/management/ManagementExchange.h Each exchange type handles management stats in its own specific way. Additionally, the constructors pass the management parent pointer to the constructor or Exchange. An extra layer was added to contain bindings. Instead of directly storing bound queues, the exchanges store "bindings" which are managable constructs. M cpp/src/qpid/broker/Broker.cpp Broker now explicitly enables the management agent. Also sets the management parent (vhost) in the exchange registry. M cpp/src/qpid/broker/Vhost.cpp Updated constructor to be more defensive in case the management agent has not been enabled. M cpp/src/qpid/broker/Queue.cpp Same constructor update as vhost. Moved accounting of dequeues into "pop". Implemented management method handler (purge). M cpp/src/qpid/broker/Deliverable.h A new method was added to extract the content size of the deliverable content (if appropriate). The method is not pure virtual and returns zero if not overridden. M cpp/src/qpid/broker/DeliverableMessage.h M cpp/src/qpid/broker/TxPublish.cpp M cpp/src/qpid/broker/DeliverableMessage.cpp M cpp/src/qpid/broker/TxPublish.h These derivatives of Deliverable were updated with overrides for contenSize. M cpp/src/qpid/management/ManagementAgent.h M cpp/src/qpid/management/ManagementAgent.cpp An "enable" method was added to prevent inadvertent creation of a management agent when not desired. Adding and deleting management objects is now protected by a mutex. Make sure that deleted objects get reported even if neither their configuration nor instrumentation is changed. M specs/management-schema.xml Minor cosmetic updates. Additional parent linkage. M cpp/managementgen/schema.py M cpp/managementgen/templates/Class.cpp Added generated code to publish schema details for methods. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@601807 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/DirectExchange.cpp')
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp97
1 files changed, 76 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 87f363ff3d..43b707a5c8 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -25,16 +25,37 @@
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
+using qpid::management::Manageable;
-DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {}
-DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+DirectExchange::DirectExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
RWlock::ScopedWlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = queues.begin(); i != queues.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
if (i == queues.end()) {
- bindings[routingKey].push_back(queue);
+ Binding::shared_ptr binding (new Binding (routingKey, queue, this));
+ bindings[routingKey].push_back(binding);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
} else{
return false;
@@ -43,14 +64,21 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = queues.begin(); i != queues.end(); i++)
+ if ((*i)->queue == queue)
+ break;
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
if (i < queues.end()) {
queues.erase(i);
- if(queues.empty()){
+ if (queues.empty()) {
bindings.erase(routingKey);
}
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
} else {
return false;
@@ -59,38 +87,65 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedRlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
int count(0);
- for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
- msg.deliverTo(*i);
- }
+
+ for(i = queues.begin(); i != queues.end(); i++, count++) {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding.get() != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+
if(!count){
QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ }
+ else {
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
+
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
}
}
bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
{
+ std::vector<Binding::shared_ptr>::iterator j;
+
if (routingKey) {
Bindings::iterator i = bindings.find(*routingKey);
- return i != bindings.end() && (!queue || find(i->second.begin(), i->second.end(), queue) != i->second.end());
+
+ if (i == bindings.end())
+ return false;
+ if (!queue)
+ return true;
+ for (j = i->second.begin(); j != i->second.end(); j++)
+ if ((*j)->queue == queue)
+ return true;
} else if (!queue) {
//if no queue or routing key is specified, just report whether any bindings exist
return bindings.size() > 0;
} else {
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
- if (find(i->second.begin(), i->second.end(), queue) != i->second.end()) {
- return true;
- }
- }
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++)
+ for (j = i->second.begin(); j != i->second.end(); j++)
+ if ((*j)->queue == queue)
+ return true;
return false;
}
-}
-
-DirectExchange::~DirectExchange(){
+ return false;
}
+DirectExchange::~DirectExchange() {}
const std::string DirectExchange::typeName("direct");