diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 71 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 42 | ||||
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 56 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.h | 22 | ||||
-rw-r--r-- | cpp/src/qpid/sys/CopyOnWriteArray.h | 126 | ||||
-rw-r--r-- | cpp/src/tests/ExchangeTest.cpp | 22 |
11 files changed, 238 insertions, 130 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index d52994a6e4..d4214ad052 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -542,6 +542,7 @@ nobase_include_HEADERS = \ qpid/sys/AtomicValue_gcc.h \ qpid/sys/AtomicValue_mutex.h \ qpid/sys/BlockingQueue.h \ + qpid/sys/CopyOnWriteArray.h \ qpid/sys/Condition.h \ qpid/sys/ConnectionCodec.h \ qpid/sys/ConnectionInputHandler.h \ diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 4aa68bee9c..bc6d7fe495 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -42,41 +42,22 @@ DirectExchange::DirectExchange(const std::string& _name, bool _durable, } bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){ - RWlock::ScopedWlock l(lock); - std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); - std::vector<Binding::shared_ptr>::iterator i; - - for (i = queues.begin(); i != queues.end(); i++) - if ((*i)->queue == queue) - break; - - if (i == queues.end()) { - Binding::shared_ptr binding (new Binding (routingKey, queue, this)); - bindings[routingKey].push_back(binding); + Mutex::ScopedLock l(lock); + Binding::shared_ptr b(new Binding (routingKey, queue, this)); + if (bindings[routingKey].add_unless(b, MatchQueue(queue))) { if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); } return true; - } else{ + } else { return false; } } bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ - RWlock::ScopedWlock l(lock); - std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); - std::vector<Binding::shared_ptr>::iterator i; - - for (i = queues.begin(); i != queues.end(); i++) - if ((*i)->queue == queue) - break; - - if (i < queues.end()) { - queues.erase(i); - if (queues.empty()) { - bindings.erase(routingKey); - } + Mutex::ScopedLock l(lock); + if (bindings[routingKey].remove_if(MatchQueue(queue))) { if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); @@ -88,16 +69,20 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c } void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ - RWlock::ScopedRlock l(lock); - std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); - std::vector<Binding::shared_ptr>::iterator i; + Queues::ConstPtr p; + { + Mutex::ScopedLock l(lock); + p = bindings[routingKey].snapshot(); + } int count(0); - for(i = queues.begin(); i != queues.end(); i++, count++) { - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); - } + if (p) { + for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } + } if(!count){ QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey); @@ -105,8 +90,7 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie mgmtExchange->inc_msgDrops (); mgmtExchange->inc_byteDrops (msg.contentSize ()); } - } - else { + } else { if (mgmtExchange != 0) { mgmtExchange->inc_msgRoutes (count); mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); @@ -122,8 +106,7 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) { - std::vector<Binding::shared_ptr>::iterator j; - + Mutex::ScopedLock l(lock); if (routingKey) { Bindings::iterator i = bindings.find(*routingKey); @@ -131,17 +114,17 @@ bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routin return false; if (!queue) return true; - for (j = i->second.begin(); j != i->second.end(); j++) - if ((*j)->queue == queue) - return true; + + Queues::ConstPtr p = i->second.snapshot(); + return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end(); } else if (!queue) { //if no queue or routing key is specified, just report whether any bindings exist return bindings.size() > 0; } else { - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) - for (j = i->second.begin(); j != i->second.end(); j++) - if ((*j)->queue == queue) - return true; + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { + Queues::ConstPtr p = i->second.snapshot(); + if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true; + } return false; } diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 118f2ed4d3..2516ce4a13 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -25,16 +25,17 @@ #include <vector> #include "Exchange.h" #include "qpid/framing/FieldTable.h" -#include "qpid/sys/Monitor.h" +#include "qpid/sys/CopyOnWriteArray.h" +#include "qpid/sys/Mutex.h" #include "Queue.h" namespace qpid { namespace broker { class DirectExchange : public virtual Exchange{ - typedef std::vector<Binding::shared_ptr> Queues; + typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Queues; typedef std::map<string, Queues> Bindings; Bindings bindings; - qpid::sys::RWlock lock; + qpid::sys::Mutex lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 6416e2fc73..b40d6f9ed9 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -153,3 +153,11 @@ Manageable::status_t Exchange::Binding::ManagementMethod (uint32_t, Args&) { return Manageable::STATUS_UNKNOWN_METHOD; } + + +Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {} + +bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b) +{ + return b->queue == queue; +} diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index f4ac4373e4..2e3b5ba13a 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -62,6 +62,12 @@ namespace qpid { management::ManagementObject* GetManagementObject () const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); }; + struct MatchQueue + { + const Queue::shared_ptr queue; + MatchQueue(Queue::shared_ptr q); + bool operator()(Exchange::Binding::shared_ptr b); + }; management::Exchange* mgmtExchange; diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 373e9ab1cc..019c943ca1 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -40,18 +40,10 @@ FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, mgmtExchange->set_type (typeName); } -bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ - RWlock::ScopedWlock locker(lock); - std::vector<Binding::shared_ptr>::iterator i; - - // Add if not already present. - for (i = bindings.begin (); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; - - if (i == bindings.end()) { - Binding::shared_ptr binding (new Binding ("", queue, this)); - bindings.push_back(binding); +bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/) +{ + Binding::shared_ptr binding (new Binding ("", queue, this)); + if (bindings.add_unless(binding, MatchQueue(queue))) { if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); @@ -62,16 +54,9 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, } } -bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ - RWlock::ScopedWlock locker(lock); - std::vector<Binding::shared_ptr>::iterator i; - - for (i = bindings.begin (); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; - - if (i != bindings.end()) { - bindings.erase(i); +bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/) +{ + if (bindings.remove_if(MatchQueue(queue))) { if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); @@ -83,10 +68,10 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* } void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ - RWlock::ScopedRlock locker(lock); uint32_t count(0); - for(std::vector<Binding::shared_ptr>::iterator i = bindings.begin(); i != bindings.end(); ++i, count++){ + BindingsArray::ConstPtr p = bindings.snapshot(); + for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){ msg.deliverTo((*i)->queue); if ((*i)->mgmtBinding != 0) (*i)->mgmtBinding->inc_msgMatched (); @@ -111,13 +96,8 @@ void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const) { - std::vector<Binding::shared_ptr>::iterator i; - - for (i = bindings.begin (); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; - - return i != bindings.end(); + BindingsArray::ConstPtr ptr = bindings.snapshot(); + return ptr && std::find_if(ptr->begin(), ptr->end(), MatchQueue(queue)) != ptr->end(); } diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index 4bc92f6b28..cfe9875024 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -25,16 +25,15 @@ #include <vector> #include "Exchange.h" #include "qpid/framing/FieldTable.h" -#include "qpid/sys/Monitor.h" +#include "qpid/sys/CopyOnWriteArray.h" #include "Queue.h" namespace qpid { namespace broker { class FanOutExchange : public virtual Exchange { - std::vector<Binding::shared_ptr> bindings; - qpid::sys::RWlock lock; - + typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> BindingsArray; + BindingsArray bindings; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 54519a7bf6..f7842239da 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -73,22 +73,12 @@ std::string HeadersExchange::getMatch(const FieldTable* args) } bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){ - RWlock::ScopedWlock locker(lock); std::string what = getMatch(args); if (what != all && what != any) throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange.")); - Bindings::iterator i; - - for (i = bindings.begin(); i != bindings.end(); i++) - if (i->first == *args && i->second->queue == queue) - break; - - if (i == bindings.end()) { - Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args)); - HeaderMap headerMap(*args, binding); - - bindings.push_back(headerMap); + Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args)); + if (bindings.add_unless(binding, MatchArgs(queue, args))) { if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); @@ -99,21 +89,8 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co } } -bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){ - RWlock::ScopedWlock locker(lock); - Bindings::iterator i; - for (i = bindings.begin(); i != bindings.end(); i++) { - if (bindingKey.empty() && args) { - if (i->first == *args && i->second->queue == queue) - break; - } else { - if (i->second->key == bindingKey && i->second->queue == queue) - break; - } - } - - if (i != bindings.end()) { - bindings.erase(i); +bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable*){ + if (bindings.remove_if(MatchKey(queue, bindingKey))) { if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); @@ -128,13 +105,13 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ if (!args) return;//can't match if there were no headers passed in - RWlock::ScopedRlock locker(lock); uint32_t count(0); - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) { - if (match(i->first, *args)) msg.deliverTo(i->second->queue); - if (i->second->mgmtBinding != 0) - i->second->mgmtBinding->inc_msgMatched (); + Bindings::ConstPtr p = bindings.snapshot(); + for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) { + if (match((*i)->args, *args)) msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched (); } if (mgmtExchange != 0) @@ -157,8 +134,9 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args) { - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if ( (!args || equal(i->first, *args)) && (!queue || i->second->queue == queue)) { + Bindings::ConstPtr p = bindings.snapshot(); + for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) { + if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) { return true; } } @@ -227,5 +205,15 @@ bool HeadersExchange::equal(const FieldTable& a, const FieldTable& b) { return true; } +HeadersExchange::MatchArgs::MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a) : queue(q), args(a) {} +bool HeadersExchange::MatchArgs::operator()(Exchange::Binding::shared_ptr b) +{ + return b->queue == queue && b->args == *args; +} +HeadersExchange::MatchKey::MatchKey(Queue::shared_ptr q, const std::string& k) : queue(q), key(k) {} +bool HeadersExchange::MatchKey::operator()(Exchange::Binding::shared_ptr b) +{ + return b->queue == queue && b->key == key; +} diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 6e101e193a..e10fab2250 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -24,7 +24,8 @@ #include <vector> #include "Exchange.h" #include "qpid/framing/FieldTable.h" -#include "qpid/sys/Monitor.h" +#include "qpid/sys/CopyOnWriteArray.h" +#include "qpid/sys/Mutex.h" #include "Queue.h" namespace qpid { @@ -33,10 +34,25 @@ namespace broker { class HeadersExchange : public virtual Exchange { typedef std::pair<qpid::framing::FieldTable, Binding::shared_ptr> HeaderMap; - typedef std::vector<HeaderMap> Bindings; + typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Bindings; + + struct MatchArgs + { + const Queue::shared_ptr queue; + const qpid::framing::FieldTable* args; + MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a); + bool operator()(Exchange::Binding::shared_ptr b); + }; + struct MatchKey + { + const Queue::shared_ptr queue; + const std::string& key; + MatchKey(Queue::shared_ptr q, const std::string& k); + bool operator()(Exchange::Binding::shared_ptr b); + }; Bindings bindings; - qpid::sys::RWlock lock; + qpid::sys::Mutex lock; static std::string getMatch(const framing::FieldTable* args); diff --git a/cpp/src/qpid/sys/CopyOnWriteArray.h b/cpp/src/qpid/sys/CopyOnWriteArray.h new file mode 100644 index 0000000000..c5bdcc0942 --- /dev/null +++ b/cpp/src/qpid/sys/CopyOnWriteArray.h @@ -0,0 +1,126 @@ +#ifndef QPID_SYS_COPYONWRITEARRAY_H +#define QPID_SYS_COPYONWRITEARRAY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Mutex.h" +#include <algorithm> +#include <vector> +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace sys { + +/** + * An array that copies on adding/removing element allowing lock-free + * iteration. + */ +template <class T> +class CopyOnWriteArray +{ +public: + typedef boost::shared_ptr<const std::vector<T> > ConstPtr; + + CopyOnWriteArray() {} + CopyOnWriteArray(const CopyOnWriteArray& c) : array(c.array) {} + + void add(T& t) + { + Mutex::ScopedLock l(lock); + ArrayPtr copy(array ? new std::vector<T>(*array) : new std::vector<T>()); + copy->push_back(t); + array = copy; + } + + bool remove(T& t) + { + Mutex::ScopedLock l(lock); + if (array && std::find(array->begin(), array->end(), t) != array->end()) { + ArrayPtr copy(new std::vector<T>(*array)); + copy->erase(std::find(copy->begin(), copy->end(), t)); + array = copy; + return true; + } else { + return false; + } + } + + template <class F> + bool add_unless(T& t, F f) + { + Mutex::ScopedLock l(lock); + if (array && find_if(array->begin(), array->end(), f) != array->end()) { + return false; + } else { + ArrayPtr copy(array ? new std::vector<T>(*array) : new std::vector<T>()); + copy->push_back(t); + array = copy; + return true; + } + } + + template <class F> + bool remove_if(F f) + { + Mutex::ScopedLock l(lock); + if (array && std::find_if(array->begin(), array->end(), f) != array->end()) { + ArrayPtr copy(new std::vector<T>(*array)); + copy->erase(std::remove_if(copy->begin(), copy->end(), f), copy->end()); + array = copy; + return true; + } + return false; + } + + template <class F> + F for_each(F f) + { + ArrayPtr a; + { + Mutex::ScopedLock l(lock); + a = array; + } + if (!a) return f; + return std::for_each(a->begin(), a->end(), f); + } + + ConstPtr snapshot() + { + ConstPtr a; + { + Mutex::ScopedLock l(lock); + a = array; + } + return a; + } + +private: + typedef boost::shared_ptr< std::vector<T> > ArrayPtr; + Mutex lock; + ArrayPtr array; +}; + +}} + + + +#endif /*!QPID_SYS_COPYONWRITEARRAY_H*/ diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp index e496f0c478..94ee36065d 100644 --- a/cpp/src/tests/ExchangeTest.cpp +++ b/cpp/src/tests/ExchangeTest.cpp @@ -76,9 +76,9 @@ QPID_AUTO_TEST_CASE(testIsBound) string k3("xyz"); FanOutExchange fanout("fanout"); - fanout.bind(a, "", 0); - fanout.bind(b, "", 0); - fanout.bind(c, "", 0); + BOOST_CHECK(fanout.bind(a, "", 0)); + BOOST_CHECK(fanout.bind(b, "", 0)); + BOOST_CHECK(fanout.bind(c, "", 0)); BOOST_CHECK(fanout.isBound(a, 0, 0)); BOOST_CHECK(fanout.isBound(b, 0, 0)); @@ -86,10 +86,10 @@ QPID_AUTO_TEST_CASE(testIsBound) BOOST_CHECK(!fanout.isBound(d, 0, 0)); DirectExchange direct("direct"); - direct.bind(a, k1, 0); - direct.bind(a, k3, 0); - direct.bind(b, k2, 0); - direct.bind(c, k1, 0); + BOOST_CHECK(direct.bind(a, k1, 0)); + BOOST_CHECK(direct.bind(a, k3, 0)); + BOOST_CHECK(direct.bind(b, k2, 0)); + BOOST_CHECK(direct.bind(c, k1, 0)); BOOST_CHECK(direct.isBound(a, 0, 0)); BOOST_CHECK(direct.isBound(a, &k1, 0)); @@ -104,10 +104,10 @@ QPID_AUTO_TEST_CASE(testIsBound) BOOST_CHECK(!direct.isBound(d, &k3, 0)); TopicExchange topic("topic"); - topic.bind(a, k1, 0); - topic.bind(a, k3, 0); - topic.bind(b, k2, 0); - topic.bind(c, k1, 0); + BOOST_CHECK(topic.bind(a, k1, 0)); + BOOST_CHECK(topic.bind(a, k3, 0)); + BOOST_CHECK(topic.bind(b, k2, 0)); + BOOST_CHECK(topic.bind(c, k1, 0)); BOOST_CHECK(topic.isBound(a, 0, 0)); BOOST_CHECK(topic.isBound(a, &k1, 0)); |