diff options
-rw-r--r-- | cpp/src/qpid/broker/Timer.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Timer.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/sys/BlockingQueue.h | 6 | ||||
-rw-r--r-- | cpp/src/tests/BrokerFixture.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 17 |
7 files changed, 39 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Timer.cpp b/cpp/src/qpid/broker/Timer.cpp index 28b1aa56d7..173f350cde 100644 --- a/cpp/src/qpid/broker/Timer.cpp +++ b/cpp/src/qpid/broker/Timer.cpp @@ -85,17 +85,13 @@ void Timer::start() void Timer::stop() { - signalStop(); - runner.join(); -} - -void Timer::signalStop() -{ - Monitor::ScopedLock l(monitor); - if (active) { + { + Monitor::ScopedLock l(monitor); + if (!active) return; active = false; monitor.notifyAll(); } + runner.join(); } bool Later::operator()(const intrusive_ptr<TimerTask>& a, diff --git a/cpp/src/qpid/broker/Timer.h b/cpp/src/qpid/broker/Timer.h index d1f606f326..dcb02a5e0a 100644 --- a/cpp/src/qpid/broker/Timer.h +++ b/cpp/src/qpid/broker/Timer.h @@ -59,7 +59,6 @@ class Timer : private qpid::sys::Runnable { bool active; virtual void run(); - void signalStop(); public: Timer(); diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp index f44a04837b..951996f005 100644 --- a/cpp/src/qpid/client/LocalQueue.cpp +++ b/cpp/src/qpid/client/LocalQueue.cpp @@ -47,11 +47,18 @@ Message LocalQueue::pop() { void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; } -bool LocalQueue::empty() +bool LocalQueue::empty() const { if (!queue) throw ClosedException(); - return queue->isEmpty(); + return queue->empty(); +} + +size_t LocalQueue::size() const +{ + if (!queue) + throw ClosedException(); + return queue->size(); } }} // namespace qpid::client diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h index d7e7e9dbd8..eba28f6599 100644 --- a/cpp/src/qpid/client/LocalQueue.h +++ b/cpp/src/qpid/client/LocalQueue.h @@ -44,8 +44,8 @@ class LocalQueue *@exception ClosedException if subscription has been closed. */ Message pop(); - bool empty(); - + bool empty() const; + size_t size() const; void setAckPolicy(AckPolicy); private: diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h index 56d41574df..dd709c6bff 100644 --- a/cpp/src/qpid/sys/BlockingQueue.h +++ b/cpp/src/qpid/sys/BlockingQueue.h @@ -103,10 +103,14 @@ public: return closed; } - bool isEmpty() const { + bool empty() const { Waitable::ScopedLock l(lock); return queue.empty(); } + size_t size() const { + Waitable::ScopedLock l(lock); + return queue.size(); + } private: diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index 76cf2e8761..1a397c76c8 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -43,6 +43,8 @@ struct BrokerFixture { BrokerFixture() { Broker::Options opts; opts.port=0; + // Management doesn't play well with multiple in-process brokers. + opts.enableMgmt=false; opts.workerThreads=1; broker = Broker::create(opts); // TODO aconway 2007-12-05: At one point BrokerFixture diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index f9de3b7619..60cfe04510 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -167,5 +167,22 @@ BOOST_FIXTURE_TEST_CASE(testSuspendResume, ClientSessionFixture) BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); } +BOOST_FIXTURE_TEST_CASE(testSendToSelf, SessionFixture) { + // https://bugzilla.redhat.com/show_bug.cgi?id=410551 + // Deadlock if SubscriptionManager run() concurrent with session ack. + LocalQueue myq; + session.queueDeclare(queue="myq", exclusive=true, autoDelete=true); + subs.subscribe(myq, "myq"); + string data("msg"); + Message msg(data, "myq"); + const int count=100; // Verified with count=100000 in a loop. + for (int i = 0; i < count; ++i) + session.messageTransfer(content=msg); + for (int j = 0; j < count; ++j) { + Message m=myq.pop(); + BOOST_CHECK_EQUAL(m.getData(), data); + } +} + QPID_AUTO_TEST_SUITE_END() |