diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-07-17 20:46:45 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-07-17 20:46:45 +0000 |
commit | 1d1f5c021b620f01c6bcb5fbded0eb1b50e445ab (patch) | |
tree | 1c3f2c94806c73b1c6a93c4350ebf2f23b26c981 /cpp | |
parent | 55b232f3d2448dab91c926ba171cc5fd9b5a04c5 (diff) | |
download | qpid-python-1d1f5c021b620f01c6bcb5fbded0eb1b50e445ab.tar.gz |
Updated queue class, can run dispatch on seperate thread or on
thread servicing the request. current set to use a worker - better
test results.
controlled by setting serilizable true - no worker, false, use a worker
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557052 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 72 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.h | 7 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 6 |
3 files changed, 45 insertions, 40 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index 0e5e0f2bb1..b26d1d3ed7 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -29,8 +29,10 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include <iostream> +#include <boost/bind.hpp> #include "QueueRegistry.h" + using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; @@ -44,11 +46,10 @@ Queue::Queue(const string& _name, bool _autodelete, autodelete(_autodelete), store(_store), owner(_owner), - queueing(false), - dispatching(false), next(0), exclusive(0), - persistenceId(0) + persistenceId(0), + serializer(false) { } @@ -69,21 +70,28 @@ void Queue::recover(Message::shared_ptr& msg){ } void Queue::process(Message::shared_ptr& msg){ - RWlock::ScopedWlock locker(messageLock); - if(queueing || !dispatch(msg)){ - push(msg); - } + + push(msg); + serializer.execute(boost::bind(&Queue::dispatch, this)); + } void Queue::requeue(Message::shared_ptr& msg){ - RWlock::ScopedWlock locker(messageLock); - if(queueing || !dispatch(msg)){ - queueing = true; - messages.push_front(msg); + + { + Mutex::ScopedLock locker(messageLock); + messages.push_front(msg); } + serializer.execute(boost::bind(&Queue::dispatch, this)); + } + bool Queue::dispatch(Message::shared_ptr& msg){ + + + RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide.... + if(consumers.empty()){ return false; }else if(exclusive){ @@ -96,7 +104,6 @@ bool Queue::dispatch(Message::shared_ptr& msg){ while(c){ next++; if(c->deliver(msg)) return true; - next = next % consumers.size(); c = next == start ? 0 : consumers[next]; } @@ -104,28 +111,22 @@ bool Queue::dispatch(Message::shared_ptr& msg){ } } -bool Queue::startDispatching(){ - RWlock::ScopedRlock locker(messageLock); - if(queueing && !dispatching){ - dispatching = true; - return true; - }else{ - return false; - } -} void Queue::dispatch(){ - bool proceed = startDispatching(); - while(proceed){ - RWlock::ScopedWlock locker(messageLock); - if(!messages.empty() && dispatch(messages.front())){ + + Message::shared_ptr msg; + while(true){ + { + Mutex::ScopedLock locker(messageLock); + if (messages.empty()) break; + msg = messages.front(); + } + if( dispatch(msg) ){ pop(); - }else{ - dispatching = false; - proceed = false; - queueing = !messages.empty(); - } + }else break; + } + } void Queue::consume(Consumer* c, bool requestExclusive){ @@ -153,7 +154,7 @@ void Queue::cancel(Consumer* c){ } Message::shared_ptr Queue::dequeue(){ - RWlock::ScopedWlock locker(messageLock); + Mutex::ScopedLock locker(messageLock); Message::shared_ptr msg; if(!messages.empty()){ msg = messages.front(); @@ -163,19 +164,20 @@ Message::shared_ptr Queue::dequeue(){ } uint32_t Queue::purge(){ - RWlock::ScopedWlock locker(messageLock); + Mutex::ScopedLock locker(messageLock); int count = messages.size(); while(!messages.empty()) pop(); return count; } void Queue::pop(){ + Mutex::ScopedLock locker(messageLock); if (policy.get()) policy->dequeued(messages.front()->contentSize()); messages.pop_front(); } void Queue::push(Message::shared_ptr& msg){ - queueing = true; + Mutex::ScopedLock locker(messageLock); messages.push_back(msg); if (policy.get()) { policy->enqueued(msg->contentSize()); @@ -186,7 +188,7 @@ void Queue::push(Message::shared_ptr& msg){ } uint32_t Queue::getMessageCount() const{ - RWlock::ScopedRlock locker(messageLock); + Mutex::ScopedLock locker(messageLock); return messages.size(); } @@ -241,7 +243,7 @@ void Queue::configure(const FieldTable& _settings) void Queue::destroy() { if (alternateExchange.get()) { - RWlock::ScopedWlock locker(messageLock); + Mutex::ScopedLock locker(messageLock); while(!messages.empty()){ DeliverableMessage msg(messages.front()); alternateExchange->route(msg, msg.getMessage().getRoutingKey(), diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index 667604eea9..415e22f04c 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -30,6 +30,7 @@ #include "Consumer.h" #include "BrokerMessage.h" #include "qpid/framing/FieldTable.h" +#include "qpid/sys/Serializer.h" #include "qpid/sys/Monitor.h" #include "PersistableQueue.h" #include "QueuePolicy.h" @@ -65,21 +66,19 @@ namespace qpid { const ConnectionToken* const owner; Consumers consumers; Messages messages; - bool queueing; - bool dispatching; int next; mutable qpid::sys::RWlock consumerLock; - mutable qpid::sys::RWlock messageLock; + mutable qpid::sys::Mutex messageLock; Consumer* exclusive; mutable uint64_t persistenceId; framing::FieldTable settings; std::auto_ptr<QueuePolicy> policy; QueueBindings bindings; boost::shared_ptr<Exchange> alternateExchange; + qpid::sys::Serializer serializer; void pop(); void push(Message::shared_ptr& msg); - bool startDispatching(); bool dispatch(Message::shared_ptr& msg); void setPolicy(std::auto_ptr<QueuePolicy> policy); diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 7d6f5f4672..7648011b2a 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -84,12 +84,16 @@ class QueueTest : public CppUnit::TestCase Message::shared_ptr msg3 = message("e", "C"); queue->deliver(msg1); - CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); + /** if dispatched on diff thread, force dispatch so don't have to wait for thread. Only do in text */ + queue->dispatch(); + CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); queue->deliver(msg2); + queue->dispatch(); CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get()); queue->deliver(msg3); + queue->dispatch(); CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get()); //Test cancellation: |