diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/include/qpid/messaging/Receiver.h | 24 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Receiver.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/ReceiverImpl.h | 1 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/tests/MessagingFixture.h | 6 | ||||
-rw-r--r-- | cpp/src/tests/MessagingThreadTests.cpp | 99 |
8 files changed, 132 insertions, 10 deletions
diff --git a/cpp/include/qpid/messaging/Receiver.h b/cpp/include/qpid/messaging/Receiver.h index a368b113c1..2cd024f26f 100644 --- a/cpp/include/qpid/messaging/Receiver.h +++ b/cpp/include/qpid/messaging/Receiver.h @@ -49,8 +49,7 @@ class Receiver : public qpid::messaging::Handle<ReceiverImpl> /** * Retrieves a message from this receivers local queue, or waits * for upto the specified timeout for a message to become - * available. Returns false if there is no message to give after - * waiting for the specified timeout. + * available. */ QPID_CLIENT_EXTERN bool get(Message& message, Duration timeout=Duration::FOREVER); /** @@ -59,7 +58,8 @@ class Receiver : public qpid::messaging::Handle<ReceiverImpl> * available. * * @exception NoMessageAvailable if there is no message to give - * after waiting for the specified timeout. + * after waiting for the specified timeout, or if the Receiver is + * closed, in which case isClose() will be true. */ QPID_CLIENT_EXTERN Message get(Duration timeout=Duration::FOREVER); /** @@ -68,6 +68,10 @@ class Receiver : public qpid::messaging::Handle<ReceiverImpl> * available. Unlike get() this method will check with the server * that there is no message for the subscription this receiver is * serving before returning false. + * + * @return false if there is no message to give after + * waiting for the specified timeout, or if the Receiver is + * closed, in which case isClose() will be true. */ QPID_CLIENT_EXTERN bool fetch(Message& message, Duration timeout=Duration::FOREVER); /** @@ -78,7 +82,8 @@ class Receiver : public qpid::messaging::Handle<ReceiverImpl> * serving before throwing an exception. * * @exception NoMessageAvailable if there is no message to give - * after waiting for the specified timeout. + * after waiting for the specified timeout, or if the Receiver is + * closed, in which case isClose() will be true. */ QPID_CLIENT_EXTERN Message fetch(Duration timeout=Duration::FOREVER); /** @@ -88,19 +93,19 @@ class Receiver : public qpid::messaging::Handle<ReceiverImpl> */ QPID_CLIENT_EXTERN void setCapacity(uint32_t); /** - * Returns the capacity of the receiver. The capacity determines + * @return the capacity of the receiver. The capacity determines * how many incoming messages can be held in the receiver before * being requested by a client via fetch() (or pushed to a * listener). */ QPID_CLIENT_EXTERN uint32_t getCapacity(); /** - * Returns the number of messages received and waiting to be + * @return the number of messages received and waiting to be * fetched. */ QPID_CLIENT_EXTERN uint32_t getAvailable(); /** - * Returns a count of the number of messages received on this + * @return a count of the number of messages received on this * receiver that have been acknowledged, but for which that * acknowledgement has not yet been confirmed as processed by the * server. @@ -113,6 +118,11 @@ class Receiver : public qpid::messaging::Handle<ReceiverImpl> QPID_CLIENT_EXTERN void close(); /** + * Return true if the receiver was closed by a call to close() + */ + QPID_CLIENT_EXTERN bool isClosed() const; + + /** * Returns the name of this receiver. */ QPID_CLIENT_EXTERN const std::string& getName() const; diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index 49cfec7497..9b706ab3de 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -193,6 +193,13 @@ void ReceiverImpl::closeImpl() } } +bool ReceiverImpl::isClosed() const { + sys::Mutex::ScopedLock l(lock); + return state == CANCELLED; +} + + + void ReceiverImpl::setCapacityImpl(uint32_t c) { sys::Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index c7e24b774a..5693b7b71f 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -65,6 +65,8 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl uint32_t getUnsettled(); void received(qpid::messaging::Message& message); qpid::messaging::Session getSession() const; + bool isClosed() const; + private: mutable sys::Mutex lock; boost::intrusive_ptr<SessionImpl> parent; diff --git a/cpp/src/qpid/messaging/Receiver.cpp b/cpp/src/qpid/messaging/Receiver.cpp index 552c1db16c..78e0c5daa3 100644 --- a/cpp/src/qpid/messaging/Receiver.cpp +++ b/cpp/src/qpid/messaging/Receiver.cpp @@ -44,4 +44,5 @@ uint32_t Receiver::getUnsettled() { return impl->getUnsettled(); } void Receiver::close() { impl->close(); } const std::string& Receiver::getName() const { return impl->getName(); } Session Receiver::getSession() const { return impl->getSession(); } +bool Receiver::isClosed() const { return impl->isClosed(); } }} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/ReceiverImpl.h b/cpp/src/qpid/messaging/ReceiverImpl.h index 07da0636f7..57059bfd28 100644 --- a/cpp/src/qpid/messaging/ReceiverImpl.h +++ b/cpp/src/qpid/messaging/ReceiverImpl.h @@ -45,6 +45,7 @@ class ReceiverImpl : public virtual qpid::RefCounted virtual void close() = 0; virtual const std::string& getName() const = 0; virtual Session getSession() const = 0; + virtual bool isClosed() const = 0; }; }} // namespace qpid::messaging diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 02b006665e..96810a7750 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -68,6 +68,8 @@ unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \ unit_test_SOURCES= unit_test.cpp unit_test.h \ MessagingSessionTests.cpp \ + MessagingThreadTests.cpp \ + MessagingFixture.h \ ClientSessionTest.cpp \ BrokerFixture.h SocketProxy.h \ exception_test.cpp \ diff --git a/cpp/src/tests/MessagingFixture.h b/cpp/src/tests/MessagingFixture.h index 5fb3fc56b2..5546b4ef49 100644 --- a/cpp/src/tests/MessagingFixture.h +++ b/cpp/src/tests/MessagingFixture.h @@ -182,7 +182,7 @@ struct MultiQueueFixture : MessagingFixture }; -std::vector<std::string> fetch(messaging::Receiver& receiver, int count, messaging::Duration timeout=messaging::Duration::SECOND*5) +inline std::vector<std::string> fetch(messaging::Receiver& receiver, int count, messaging::Duration timeout=messaging::Duration::SECOND*5) { std::vector<std::string> data; messaging::Message message; @@ -193,7 +193,7 @@ std::vector<std::string> fetch(messaging::Receiver& receiver, int count, messagi } -void send(messaging::Sender& sender, uint count = 1, uint start = 1, +inline void send(messaging::Sender& sender, uint count = 1, uint start = 1, const std::string& base = "Message") { for (uint i = start; i < start + count; ++i) { @@ -201,7 +201,7 @@ void send(messaging::Sender& sender, uint count = 1, uint start = 1, } } -void receive(messaging::Receiver& receiver, uint count = 1, uint start = 1, +inline void receive(messaging::Receiver& receiver, uint count = 1, uint start = 1, const std::string& base = "Message", messaging::Duration timeout=messaging::Duration::SECOND*5) { diff --git a/cpp/src/tests/MessagingThreadTests.cpp b/cpp/src/tests/MessagingThreadTests.cpp new file mode 100644 index 0000000000..9cf139ddf5 --- /dev/null +++ b/cpp/src/tests/MessagingThreadTests.cpp @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MessagingFixture.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" +#include <boost/lexical_cast.hpp> + +namespace qpid { +namespace tests { +QPID_AUTO_TEST_SUITE(MessagingThreadTests) + +using namespace messaging; +using namespace boost::assign; +using namespace std; + +struct ReceiveThread : public sys::Runnable { + Receiver receiver; + vector<string> received; + string error; + + ReceiveThread(Receiver s) : receiver(s) {} + void run() { + try { + while(true) { + Message m = receiver.fetch(Duration::SECOND*5); + if (m.getContent() == "END") break; + received.push_back(m.getContent()); + } + } catch (const NoMessageAvailable& e) { + // Indicates that fetch timed out OR receiver was closed by other thread. + if (!receiver.isClosed()) // timeout + error = e.what(); + } catch (const std::exception& e) { + error = e.what(); + } + } +}; + +QPID_AUTO_TEST_CASE(testConcurrentSendReceive) { + QueueFixture fix; + Sender s = fix.session.createSender(fix.queue); + Receiver r = fix.session.createReceiver(fix.queue+";{link:{reliability:unreliable}}"); + ReceiveThread rt(r); + sys::Thread thread(rt); + const size_t COUNT=1000; + for (size_t i = 0; i < COUNT; ++i) { + s.send(Message()); + } + s.send(Message("END")); + thread.join(); + BOOST_CHECK_EQUAL(rt.error, string()); + BOOST_CHECK_EQUAL(COUNT, rt.received.size()); +} + +QPID_AUTO_TEST_CASE(testCloseBusyReceiver) { + QueueFixture fix; + Receiver r = fix.session.createReceiver(fix.queue); + ReceiveThread rt(r); + sys::Thread thread(rt); + r.close(); + thread.join(); + BOOST_CHECK_EQUAL(rt.error, string()); + + // Check that using a closed receiver gives the right result. + Message m; + BOOST_CHECK(!r.fetch(m, Duration(0))); + BOOST_CHECK_THROW(r.fetch(Duration(0)), NoMessageAvailable); +} + +QPID_AUTO_TEST_CASE(testCloseSessionBusyReceiver) { + QueueFixture fix; + Receiver r = fix.session.createReceiver(fix.queue); + ReceiveThread rt(r); + sys::Thread thread(rt); + fix.session.close(); + thread.join(); + BOOST_CHECK_EQUAL(rt.error, string()); +} + +QPID_AUTO_TEST_SUITE_END() +}} // namespace qpid::tests |