diff options
Diffstat (limited to 'cpp/src/qpid/broker/HeadersExchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 107 |
1 files changed, 48 insertions, 59 deletions
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 54519a7bf6..38cc0e4050 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -18,7 +18,7 @@ * under the License. * */ -#include "HeadersExchange.h" +#include "qpid/broker/HeadersExchange.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" @@ -28,6 +28,7 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; +namespace _qmf = qmf::org::apache::qpid::broker; // TODO aconway 2006-09-20: More efficient matching algorithm. // The current search algorithm really sucks. @@ -42,16 +43,16 @@ namespace { const std::string empty; } -HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) : - Exchange(_name, _parent) +HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent, Broker* b) : + Exchange(_name, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, - const FieldTable& _args, Manageable* _parent) : - Exchange(_name, _durable, _args, _parent) + const FieldTable& _args, Manageable* _parent, Broker* b) : + Exchange(_name, _durable, _args, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); @@ -73,50 +74,26 @@ std::string HeadersExchange::getMatch(const FieldTable* args) } bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){ - RWlock::ScopedWlock locker(lock); std::string what = getMatch(args); if (what != all && what != any) throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange.")); - Bindings::iterator i; - - for (i = bindings.begin(); i != bindings.end(); i++) - if (i->first == *args && i->second->queue == queue) - break; - - if (i == bindings.end()) { - Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args)); - HeaderMap headerMap(*args, binding); - - bindings.push_back(headerMap); + Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args)); + if (bindings.add_unless(binding, MatchArgs(queue, args))) { if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); - ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); } + routeIVE(); return true; } else { return false; } } -bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){ - RWlock::ScopedWlock locker(lock); - Bindings::iterator i; - for (i = bindings.begin(); i != bindings.end(); i++) { - if (bindingKey.empty() && args) { - if (i->first == *args && i->second->queue == queue) - break; - } else { - if (i->second->key == bindingKey && i->second->queue == queue) - break; - } - } - - if (i != bindings.end()) { - bindings.erase(i); +bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable*){ + if (bindings.remove_if(MatchKey(queue, bindingKey))) { if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); - ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); } return true; } else { @@ -125,41 +102,43 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, } -void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ - if (!args) return;//can't match if there were no headers passed in - - RWlock::ScopedRlock locker(lock); - uint32_t count(0); - - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) { - if (match(i->first, *args)) msg.deliverTo(i->second->queue); - if (i->second->mgmtBinding != 0) - i->second->mgmtBinding->inc_msgMatched (); +void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args) +{ + if (!args) { + //can't match if there were no headers passed in + if (mgmtExchange != 0) { + mgmtExchange->inc_msgReceives(); + mgmtExchange->inc_byteReceives(msg.contentSize()); + mgmtExchange->inc_msgDrops(); + mgmtExchange->inc_byteDrops(msg.contentSize()); + } + return; } - if (mgmtExchange != 0) + PreRoute pr(msg, this); + + ConstBindingList p = bindings.snapshot(); + BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); + if (p.get()) { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - if (count == 0) - { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - else - { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) { + if (match((*i)->args, *args)) { + b->push_back(*i); + } } } + doRoute(msg, b); } bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args) { - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if ( (!args || equal(i->first, *args)) && (!queue || i->second->queue == queue)) { - return true; + Bindings::ConstPtr p = bindings.snapshot(); + if (p.get()){ + for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) { + if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) { + return true; + } } } return false; @@ -227,5 +206,15 @@ bool HeadersExchange::equal(const FieldTable& a, const FieldTable& b) { return true; } +HeadersExchange::MatchArgs::MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a) : queue(q), args(a) {} +bool HeadersExchange::MatchArgs::operator()(Exchange::Binding::shared_ptr b) +{ + return b->queue == queue && b->args == *args; +} +HeadersExchange::MatchKey::MatchKey(Queue::shared_ptr q, const std::string& k) : queue(q), key(k) {} +bool HeadersExchange::MatchKey::operator()(Exchange::Binding::shared_ptr b) +{ + return b->queue == queue && b->key == key; +} |