diff options
author | Alan Conway <aconway@apache.org> | 2010-05-14 13:55:18 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-05-14 13:55:18 +0000 |
commit | 854062d73c485027f2f40abac159cb19e85c0dca (patch) | |
tree | 018655bb2fe2f0320bf39ea4fb6e9b25e833d778 /cpp/src/tests | |
parent | 18a06573ea14a98029764d1badf42695b3a31643 (diff) | |
download | qpid-python-854062d73c485027f2f40abac159cb19e85c0dca.tar.gz |
Initial multi-thread unit test for messaging API.
- added Receiver::isClosed() to test for local close.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@944261 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/tests/MessagingFixture.h | 6 | ||||
-rw-r--r-- | cpp/src/tests/MessagingThreadTests.cpp | 99 |
3 files changed, 104 insertions, 3 deletions
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 02b006665e..96810a7750 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -68,6 +68,8 @@ unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \ unit_test_SOURCES= unit_test.cpp unit_test.h \ MessagingSessionTests.cpp \ + MessagingThreadTests.cpp \ + MessagingFixture.h \ ClientSessionTest.cpp \ BrokerFixture.h SocketProxy.h \ exception_test.cpp \ diff --git a/cpp/src/tests/MessagingFixture.h b/cpp/src/tests/MessagingFixture.h index 5fb3fc56b2..5546b4ef49 100644 --- a/cpp/src/tests/MessagingFixture.h +++ b/cpp/src/tests/MessagingFixture.h @@ -182,7 +182,7 @@ struct MultiQueueFixture : MessagingFixture }; -std::vector<std::string> fetch(messaging::Receiver& receiver, int count, messaging::Duration timeout=messaging::Duration::SECOND*5) +inline std::vector<std::string> fetch(messaging::Receiver& receiver, int count, messaging::Duration timeout=messaging::Duration::SECOND*5) { std::vector<std::string> data; messaging::Message message; @@ -193,7 +193,7 @@ std::vector<std::string> fetch(messaging::Receiver& receiver, int count, messagi } -void send(messaging::Sender& sender, uint count = 1, uint start = 1, +inline void send(messaging::Sender& sender, uint count = 1, uint start = 1, const std::string& base = "Message") { for (uint i = start; i < start + count; ++i) { @@ -201,7 +201,7 @@ void send(messaging::Sender& sender, uint count = 1, uint start = 1, } } -void receive(messaging::Receiver& receiver, uint count = 1, uint start = 1, +inline void receive(messaging::Receiver& receiver, uint count = 1, uint start = 1, const std::string& base = "Message", messaging::Duration timeout=messaging::Duration::SECOND*5) { diff --git a/cpp/src/tests/MessagingThreadTests.cpp b/cpp/src/tests/MessagingThreadTests.cpp new file mode 100644 index 0000000000..9cf139ddf5 --- /dev/null +++ b/cpp/src/tests/MessagingThreadTests.cpp @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MessagingFixture.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" +#include <boost/lexical_cast.hpp> + +namespace qpid { +namespace tests { +QPID_AUTO_TEST_SUITE(MessagingThreadTests) + +using namespace messaging; +using namespace boost::assign; +using namespace std; + +struct ReceiveThread : public sys::Runnable { + Receiver receiver; + vector<string> received; + string error; + + ReceiveThread(Receiver s) : receiver(s) {} + void run() { + try { + while(true) { + Message m = receiver.fetch(Duration::SECOND*5); + if (m.getContent() == "END") break; + received.push_back(m.getContent()); + } + } catch (const NoMessageAvailable& e) { + // Indicates that fetch timed out OR receiver was closed by other thread. + if (!receiver.isClosed()) // timeout + error = e.what(); + } catch (const std::exception& e) { + error = e.what(); + } + } +}; + +QPID_AUTO_TEST_CASE(testConcurrentSendReceive) { + QueueFixture fix; + Sender s = fix.session.createSender(fix.queue); + Receiver r = fix.session.createReceiver(fix.queue+";{link:{reliability:unreliable}}"); + ReceiveThread rt(r); + sys::Thread thread(rt); + const size_t COUNT=1000; + for (size_t i = 0; i < COUNT; ++i) { + s.send(Message()); + } + s.send(Message("END")); + thread.join(); + BOOST_CHECK_EQUAL(rt.error, string()); + BOOST_CHECK_EQUAL(COUNT, rt.received.size()); +} + +QPID_AUTO_TEST_CASE(testCloseBusyReceiver) { + QueueFixture fix; + Receiver r = fix.session.createReceiver(fix.queue); + ReceiveThread rt(r); + sys::Thread thread(rt); + r.close(); + thread.join(); + BOOST_CHECK_EQUAL(rt.error, string()); + + // Check that using a closed receiver gives the right result. + Message m; + BOOST_CHECK(!r.fetch(m, Duration(0))); + BOOST_CHECK_THROW(r.fetch(Duration(0)), NoMessageAvailable); +} + +QPID_AUTO_TEST_CASE(testCloseSessionBusyReceiver) { + QueueFixture fix; + Receiver r = fix.session.createReceiver(fix.queue); + ReceiveThread rt(r); + sys::Thread thread(rt); + fix.session.close(); + thread.join(); + BOOST_CHECK_EQUAL(rt.error, string()); +} + +QPID_AUTO_TEST_SUITE_END() +}} // namespace qpid::tests |