summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/HeadersExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/HeadersExchange.cpp')
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp107
1 files changed, 48 insertions, 59 deletions
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 54519a7bf6..38cc0e4050 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -18,7 +18,7 @@
* under the License.
*
*/
-#include "HeadersExchange.h"
+#include "qpid/broker/HeadersExchange.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
@@ -28,6 +28,7 @@
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
+namespace _qmf = qmf::org::apache::qpid::broker;
// TODO aconway 2006-09-20: More efficient matching algorithm.
// The current search algorithm really sucks.
@@ -42,16 +43,16 @@ namespace {
const std::string empty;
}
-HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) :
- Exchange(_name, _parent)
+HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent, Broker* b) :
+ Exchange(_name, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
}
HeadersExchange::HeadersExchange(const std::string& _name, bool _durable,
- const FieldTable& _args, Manageable* _parent) :
- Exchange(_name, _durable, _args, _parent)
+ const FieldTable& _args, Manageable* _parent, Broker* b) :
+ Exchange(_name, _durable, _args, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
@@ -73,50 +74,26 @@ std::string HeadersExchange::getMatch(const FieldTable* args)
}
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){
- RWlock::ScopedWlock locker(lock);
std::string what = getMatch(args);
if (what != all && what != any)
throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
- Bindings::iterator i;
-
- for (i = bindings.begin(); i != bindings.end(); i++)
- if (i->first == *args && i->second->queue == queue)
- break;
-
- if (i == bindings.end()) {
- Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
- HeaderMap headerMap(*args, binding);
-
- bindings.push_back(headerMap);
+ Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
+ if (bindings.add_unless(binding, MatchArgs(queue, args))) {
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
- ((management::Queue*) queue->GetManagementObject())->inc_bindingCount();
}
+ routeIVE();
return true;
} else {
return false;
}
}
-bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){
- RWlock::ScopedWlock locker(lock);
- Bindings::iterator i;
- for (i = bindings.begin(); i != bindings.end(); i++) {
- if (bindingKey.empty() && args) {
- if (i->first == *args && i->second->queue == queue)
- break;
- } else {
- if (i->second->key == bindingKey && i->second->queue == queue)
- break;
- }
- }
-
- if (i != bindings.end()) {
- bindings.erase(i);
+bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable*){
+ if (bindings.remove_if(MatchKey(queue, bindingKey))) {
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
- ((management::Queue*) queue->GetManagementObject())->dec_bindingCount();
}
return true;
} else {
@@ -125,41 +102,43 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey,
}
-void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
- if (!args) return;//can't match if there were no headers passed in
-
- RWlock::ScopedRlock locker(lock);
- uint32_t count(0);
-
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) {
- if (match(i->first, *args)) msg.deliverTo(i->second->queue);
- if (i->second->mgmtBinding != 0)
- i->second->mgmtBinding->inc_msgMatched ();
+void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args)
+{
+ if (!args) {
+ //can't match if there were no headers passed in
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_msgReceives();
+ mgmtExchange->inc_byteReceives(msg.contentSize());
+ mgmtExchange->inc_msgDrops();
+ mgmtExchange->inc_byteDrops(msg.contentSize());
+ }
+ return;
}
- if (mgmtExchange != 0)
+ PreRoute pr(msg, this);
+
+ ConstBindingList p = bindings.snapshot();
+ BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+ if (p.get())
{
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- if (count == 0)
- {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- else
- {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
+ if (match((*i)->args, *args)) {
+ b->push_back(*i);
+ }
}
}
+ doRoute(msg, b);
}
bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
{
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if ( (!args || equal(i->first, *args)) && (!queue || i->second->queue == queue)) {
- return true;
+ Bindings::ConstPtr p = bindings.snapshot();
+ if (p.get()){
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
+ if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) {
+ return true;
+ }
}
}
return false;
@@ -227,5 +206,15 @@ bool HeadersExchange::equal(const FieldTable& a, const FieldTable& b) {
return true;
}
+HeadersExchange::MatchArgs::MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a) : queue(q), args(a) {}
+bool HeadersExchange::MatchArgs::operator()(Exchange::Binding::shared_ptr b)
+{
+ return b->queue == queue && b->args == *args;
+}
+HeadersExchange::MatchKey::MatchKey(Queue::shared_ptr q, const std::string& k) : queue(q), key(k) {}
+bool HeadersExchange::MatchKey::operator()(Exchange::Binding::shared_ptr b)
+{
+ return b->queue == queue && b->key == key;
+}