summaryrefslogtreecommitdiff
path: root/cpp/src/tests/MessagingThreadTests.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-05-14 13:55:18 +0000
committerAlan Conway <aconway@apache.org>2010-05-14 13:55:18 +0000
commit854062d73c485027f2f40abac159cb19e85c0dca (patch)
tree018655bb2fe2f0320bf39ea4fb6e9b25e833d778 /cpp/src/tests/MessagingThreadTests.cpp
parent18a06573ea14a98029764d1badf42695b3a31643 (diff)
downloadqpid-python-854062d73c485027f2f40abac159cb19e85c0dca.tar.gz
Initial multi-thread unit test for messaging API.
- added Receiver::isClosed() to test for local close. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@944261 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/MessagingThreadTests.cpp')
-rw-r--r--cpp/src/tests/MessagingThreadTests.cpp99
1 files changed, 99 insertions, 0 deletions
diff --git a/cpp/src/tests/MessagingThreadTests.cpp b/cpp/src/tests/MessagingThreadTests.cpp
new file mode 100644
index 0000000000..9cf139ddf5
--- /dev/null
+++ b/cpp/src/tests/MessagingThreadTests.cpp
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessagingFixture.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace tests {
+QPID_AUTO_TEST_SUITE(MessagingThreadTests)
+
+using namespace messaging;
+using namespace boost::assign;
+using namespace std;
+
+struct ReceiveThread : public sys::Runnable {
+ Receiver receiver;
+ vector<string> received;
+ string error;
+
+ ReceiveThread(Receiver s) : receiver(s) {}
+ void run() {
+ try {
+ while(true) {
+ Message m = receiver.fetch(Duration::SECOND*5);
+ if (m.getContent() == "END") break;
+ received.push_back(m.getContent());
+ }
+ } catch (const NoMessageAvailable& e) {
+ // Indicates that fetch timed out OR receiver was closed by other thread.
+ if (!receiver.isClosed()) // timeout
+ error = e.what();
+ } catch (const std::exception& e) {
+ error = e.what();
+ }
+ }
+};
+
+QPID_AUTO_TEST_CASE(testConcurrentSendReceive) {
+ QueueFixture fix;
+ Sender s = fix.session.createSender(fix.queue);
+ Receiver r = fix.session.createReceiver(fix.queue+";{link:{reliability:unreliable}}");
+ ReceiveThread rt(r);
+ sys::Thread thread(rt);
+ const size_t COUNT=1000;
+ for (size_t i = 0; i < COUNT; ++i) {
+ s.send(Message());
+ }
+ s.send(Message("END"));
+ thread.join();
+ BOOST_CHECK_EQUAL(rt.error, string());
+ BOOST_CHECK_EQUAL(COUNT, rt.received.size());
+}
+
+QPID_AUTO_TEST_CASE(testCloseBusyReceiver) {
+ QueueFixture fix;
+ Receiver r = fix.session.createReceiver(fix.queue);
+ ReceiveThread rt(r);
+ sys::Thread thread(rt);
+ r.close();
+ thread.join();
+ BOOST_CHECK_EQUAL(rt.error, string());
+
+ // Check that using a closed receiver gives the right result.
+ Message m;
+ BOOST_CHECK(!r.fetch(m, Duration(0)));
+ BOOST_CHECK_THROW(r.fetch(Duration(0)), NoMessageAvailable);
+}
+
+QPID_AUTO_TEST_CASE(testCloseSessionBusyReceiver) {
+ QueueFixture fix;
+ Receiver r = fix.session.createReceiver(fix.queue);
+ ReceiveThread rt(r);
+ sys::Thread thread(rt);
+ fix.session.close();
+ thread.join();
+ BOOST_CHECK_EQUAL(rt.error, string());
+}
+
+QPID_AUTO_TEST_SUITE_END()
+}} // namespace qpid::tests