diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-07-18 18:03:54 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-07-18 18:03:54 +0000 |
commit | a51da643a383c5c55fcd33861e379607037983f2 (patch) | |
tree | a25e13e341ab4bbef2e5212919f5b6c6c60f9594 /cpp | |
parent | 51a364d23a5982a4a17417206de50e16f679d1ca (diff) | |
download | qpid-python-a51da643a383c5c55fcd33861e379607037983f2.tar.gz |
Fixed MT safety issues pointed out by Gordon.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557343 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/BrokerChannelTest.cpp | 3 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 31 |
7 files changed, 39 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index f0dc159752..2a0aa9ffee 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -311,7 +311,7 @@ void BrokerAdapter::BasicHandlerImpl::consume( if(!nowait) client.consumeOk(newTag, context.getRequestId()); //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); + queue->requestDispatch(); } void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index 523a834715..9b6bdf5a2b 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -253,7 +253,7 @@ void Channel::ConsumerImpl::cancel(){ void Channel::ConsumerImpl::requestDispatch(){ if(blocked) - queue->dispatch(); + queue->requestDispatch(); } void Channel::handleInlineTransfer(Message::shared_ptr msg){ diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index b26d1d3ed7..cf6beff375 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -87,6 +87,11 @@ void Queue::requeue(Message::shared_ptr& msg){ } +void Queue::requestDispatch(){ + serializer.execute(boost::bind(&Queue::dispatch, this)); +} + + bool Queue::dispatch(Message::shared_ptr& msg){ diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index 415e22f04c..0ed368e404 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -81,7 +81,11 @@ namespace qpid { void push(Message::shared_ptr& msg); bool dispatch(Message::shared_ptr& msg); void setPolicy(std::auto_ptr<QueuePolicy> policy); - + /** + * only called by serilizer + */ + void dispatch(); + public: typedef boost::shared_ptr<Queue> shared_ptr; @@ -120,12 +124,12 @@ namespace qpid { */ void recover(Message::shared_ptr& msg); /** - * Dispatch any queued messages providing there are + * Request dispatch any queued messages providing there are * consumers for them. Only one thread can be dispatching - * at any time, but this method (rather than the caller) - * is responsible for ensuring that. + * at any time, so this call schedules the despatch based on + * the serilizer policy. */ - void dispatch(); + void requestDispatch(); void consume(Consumer* c, bool exclusive = false); void cancel(Consumer* c); uint32_t purge(); diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index f586ea92fc..252b465cc5 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -134,7 +134,7 @@ MessageHandlerImpl::consume(const MethodContext& context, noLocal ? &connection : 0, &filter); client.ok(context.getRequestId()); // Dispatch messages as there is now a consumer. - queue->dispatch(); + queue->requestDispatch(); } void diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index 251ac624ab..acbe867cfa 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -212,6 +212,7 @@ class BrokerChannelTest : public CppUnit::TestCase string tag("test"); channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); queue->deliver(msg); + sleep(2); CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); @@ -293,6 +294,7 @@ class BrokerChannelTest : public CppUnit::TestCase queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); + sleep(2); Message::shared_ptr next = queue->dequeue(); CPPUNIT_ASSERT_EQUAL(msg1, next); @@ -336,6 +338,7 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size()); channel.flow(true); + sleep(2); //ensure no messages have been delivered CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 7648011b2a..ddf2314f8d 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -37,8 +37,14 @@ using namespace qpid::sys; class TestConsumer : public virtual Consumer{ public: Message::shared_ptr last; + bool received; + TestConsumer(): received(false) {}; - virtual bool deliver(Message::shared_ptr& msg); + virtual bool deliver(Message::shared_ptr& msg){ + last = msg; + received = true; + return true; + }; }; class FailOnDeliver : public Deliverable @@ -84,16 +90,19 @@ class QueueTest : public CppUnit::TestCase Message::shared_ptr msg3 = message("e", "C"); queue->deliver(msg1); - /** if dispatched on diff thread, force dispatch so don't have to wait for thread. Only do in text */ - queue->dispatch(); - CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); + if (!c1.received) + sleep(2); + CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); queue->deliver(msg2); - queue->dispatch(); + if (!c2.received) + sleep(2); CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get()); + c1.received = false; queue->deliver(msg3); - queue->dispatch(); + if (!c1.received) + sleep(2); CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get()); //Test cancellation: @@ -146,7 +155,10 @@ class QueueTest : public CppUnit::TestCase TestConsumer consumer; queue->consume(&consumer); - queue->dispatch(); + queue->requestDispatch(); + if (!consumer.received) + sleep(2); + CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get()); CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount()); @@ -195,9 +207,4 @@ class QueueTest : public CppUnit::TestCase CPPUNIT_PLUGIN_IMPLEMENT(); CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest); -//TestConsumer -bool TestConsumer::deliver(Message::shared_ptr& msg){ - last = msg; - return true; -} |