summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2007-07-17 20:46:45 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2007-07-17 20:46:45 +0000
commit1d1f5c021b620f01c6bcb5fbded0eb1b50e445ab (patch)
tree1c3f2c94806c73b1c6a93c4350ebf2f23b26c981 /cpp/src
parent55b232f3d2448dab91c926ba171cc5fd9b5a04c5 (diff)
downloadqpid-python-1d1f5c021b620f01c6bcb5fbded0eb1b50e445ab.tar.gz
Updated queue class, can run dispatch on seperate thread or on
thread servicing the request. current set to use a worker - better test results. controlled by setting serilizable true - no worker, false, use a worker git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557052 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp72
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.h7
-rw-r--r--cpp/src/tests/QueueTest.cpp6
3 files changed, 45 insertions, 40 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index 0e5e0f2bb1..b26d1d3ed7 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -29,8 +29,10 @@
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include <iostream>
+#include <boost/bind.hpp>
#include "QueueRegistry.h"
+
using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -44,11 +46,10 @@ Queue::Queue(const string& _name, bool _autodelete,
autodelete(_autodelete),
store(_store),
owner(_owner),
- queueing(false),
- dispatching(false),
next(0),
exclusive(0),
- persistenceId(0)
+ persistenceId(0),
+ serializer(false)
{
}
@@ -69,21 +70,28 @@ void Queue::recover(Message::shared_ptr& msg){
}
void Queue::process(Message::shared_ptr& msg){
- RWlock::ScopedWlock locker(messageLock);
- if(queueing || !dispatch(msg)){
- push(msg);
- }
+
+ push(msg);
+ serializer.execute(boost::bind(&Queue::dispatch, this));
+
}
void Queue::requeue(Message::shared_ptr& msg){
- RWlock::ScopedWlock locker(messageLock);
- if(queueing || !dispatch(msg)){
- queueing = true;
- messages.push_front(msg);
+
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages.push_front(msg);
}
+ serializer.execute(boost::bind(&Queue::dispatch, this));
+
}
+
bool Queue::dispatch(Message::shared_ptr& msg){
+
+
+ RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide....
+
if(consumers.empty()){
return false;
}else if(exclusive){
@@ -96,7 +104,6 @@ bool Queue::dispatch(Message::shared_ptr& msg){
while(c){
next++;
if(c->deliver(msg)) return true;
-
next = next % consumers.size();
c = next == start ? 0 : consumers[next];
}
@@ -104,28 +111,22 @@ bool Queue::dispatch(Message::shared_ptr& msg){
}
}
-bool Queue::startDispatching(){
- RWlock::ScopedRlock locker(messageLock);
- if(queueing && !dispatching){
- dispatching = true;
- return true;
- }else{
- return false;
- }
-}
void Queue::dispatch(){
- bool proceed = startDispatching();
- while(proceed){
- RWlock::ScopedWlock locker(messageLock);
- if(!messages.empty() && dispatch(messages.front())){
+
+ Message::shared_ptr msg;
+ while(true){
+ {
+ Mutex::ScopedLock locker(messageLock);
+ if (messages.empty()) break;
+ msg = messages.front();
+ }
+ if( dispatch(msg) ){
pop();
- }else{
- dispatching = false;
- proceed = false;
- queueing = !messages.empty();
- }
+ }else break;
+
}
+
}
void Queue::consume(Consumer* c, bool requestExclusive){
@@ -153,7 +154,7 @@ void Queue::cancel(Consumer* c){
}
Message::shared_ptr Queue::dequeue(){
- RWlock::ScopedWlock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
Message::shared_ptr msg;
if(!messages.empty()){
msg = messages.front();
@@ -163,19 +164,20 @@ Message::shared_ptr Queue::dequeue(){
}
uint32_t Queue::purge(){
- RWlock::ScopedWlock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
int count = messages.size();
while(!messages.empty()) pop();
return count;
}
void Queue::pop(){
+ Mutex::ScopedLock locker(messageLock);
if (policy.get()) policy->dequeued(messages.front()->contentSize());
messages.pop_front();
}
void Queue::push(Message::shared_ptr& msg){
- queueing = true;
+ Mutex::ScopedLock locker(messageLock);
messages.push_back(msg);
if (policy.get()) {
policy->enqueued(msg->contentSize());
@@ -186,7 +188,7 @@ void Queue::push(Message::shared_ptr& msg){
}
uint32_t Queue::getMessageCount() const{
- RWlock::ScopedRlock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
return messages.size();
}
@@ -241,7 +243,7 @@ void Queue::configure(const FieldTable& _settings)
void Queue::destroy()
{
if (alternateExchange.get()) {
- RWlock::ScopedWlock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
while(!messages.empty()){
DeliverableMessage msg(messages.front());
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h
index 667604eea9..415e22f04c 100644
--- a/cpp/src/qpid/broker/BrokerQueue.h
+++ b/cpp/src/qpid/broker/BrokerQueue.h
@@ -30,6 +30,7 @@
#include "Consumer.h"
#include "BrokerMessage.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Serializer.h"
#include "qpid/sys/Monitor.h"
#include "PersistableQueue.h"
#include "QueuePolicy.h"
@@ -65,21 +66,19 @@ namespace qpid {
const ConnectionToken* const owner;
Consumers consumers;
Messages messages;
- bool queueing;
- bool dispatching;
int next;
mutable qpid::sys::RWlock consumerLock;
- mutable qpid::sys::RWlock messageLock;
+ mutable qpid::sys::Mutex messageLock;
Consumer* exclusive;
mutable uint64_t persistenceId;
framing::FieldTable settings;
std::auto_ptr<QueuePolicy> policy;
QueueBindings bindings;
boost::shared_ptr<Exchange> alternateExchange;
+ qpid::sys::Serializer serializer;
void pop();
void push(Message::shared_ptr& msg);
- bool startDispatching();
bool dispatch(Message::shared_ptr& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 7d6f5f4672..7648011b2a 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -84,12 +84,16 @@ class QueueTest : public CppUnit::TestCase
Message::shared_ptr msg3 = message("e", "C");
queue->deliver(msg1);
- CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
+ /** if dispatched on diff thread, force dispatch so don't have to wait for thread. Only do in text */
+ queue->dispatch();
+ CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
queue->deliver(msg2);
+ queue->dispatch();
CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
queue->deliver(msg3);
+ queue->dispatch();
CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());
//Test cancellation: