summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/DirectExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/DirectExchange.cpp')
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp97
1 files changed, 76 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 87f363ff3d..43b707a5c8 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -25,16 +25,37 @@
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
+using qpid::management::Manageable;
-DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {}
-DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+DirectExchange::DirectExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
RWlock::ScopedWlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = queues.begin(); i != queues.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
if (i == queues.end()) {
- bindings[routingKey].push_back(queue);
+ Binding::shared_ptr binding (new Binding (routingKey, queue, this));
+ bindings[routingKey].push_back(binding);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
} else{
return false;
@@ -43,14 +64,21 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = queues.begin(); i != queues.end(); i++)
+ if ((*i)->queue == queue)
+ break;
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
if (i < queues.end()) {
queues.erase(i);
- if(queues.empty()){
+ if (queues.empty()) {
bindings.erase(routingKey);
}
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
} else {
return false;
@@ -59,38 +87,65 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedRlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
int count(0);
- for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
- msg.deliverTo(*i);
- }
+
+ for(i = queues.begin(); i != queues.end(); i++, count++) {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding.get() != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+
if(!count){
QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ }
+ else {
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
+
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
}
}
bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
{
+ std::vector<Binding::shared_ptr>::iterator j;
+
if (routingKey) {
Bindings::iterator i = bindings.find(*routingKey);
- return i != bindings.end() && (!queue || find(i->second.begin(), i->second.end(), queue) != i->second.end());
+
+ if (i == bindings.end())
+ return false;
+ if (!queue)
+ return true;
+ for (j = i->second.begin(); j != i->second.end(); j++)
+ if ((*j)->queue == queue)
+ return true;
} else if (!queue) {
//if no queue or routing key is specified, just report whether any bindings exist
return bindings.size() > 0;
} else {
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
- if (find(i->second.begin(), i->second.end(), queue) != i->second.end()) {
- return true;
- }
- }
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++)
+ for (j = i->second.begin(); j != i->second.end(); j++)
+ if ((*j)->queue == queue)
+ return true;
return false;
}
-}
-
-DirectExchange::~DirectExchange(){
+ return false;
}
+DirectExchange::~DirectExchange() {}
const std::string DirectExchange::typeName("direct");