/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "MessagingFixture.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include namespace qpid { namespace tests { QPID_AUTO_TEST_SUITE(MessagingThreadTests) using namespace messaging; using namespace boost::assign; using namespace std; struct ReceiveThread : public sys::Runnable { Receiver receiver; vector received; string error; ReceiveThread(Receiver s) : receiver(s) {} void run() { try { while(true) { Message m = receiver.fetch(Duration::SECOND*5); if (m.getContent() == "END") break; received.push_back(m.getContent()); } } catch (const NoMessageAvailable& e) { // Indicates that fetch timed out OR receiver was closed by other thread. if (!receiver.isClosed()) // timeout error = e.what(); } catch (const std::exception& e) { error = e.what(); } } }; struct NextReceiverThread : public sys::Runnable { Session session; vector received; string error; NextReceiverThread(Session s) : session(s) {} void run() { try { while(true) { Message m = session.nextReceiver(Duration::SECOND*5).fetch(); if (m.getContent() == "END") break; received.push_back(m.getContent()); } } catch (const std::exception& e) { error = e.what(); } } }; QPID_AUTO_TEST_CASE(testConcurrentSendReceive) { MessagingFixture fix; Sender s = fix.session.createSender("concurrent;{create:always}"); Receiver r = fix.session.createReceiver("concurrent;{create:always,link:{reliability:unreliable}}"); ReceiveThread rt(r); sys::Thread thread(rt); const size_t COUNT=100; for (size_t i = 0; i < COUNT; ++i) { s.send(Message()); } s.send(Message("END")); thread.join(); BOOST_CHECK_EQUAL(rt.error, string()); BOOST_CHECK_EQUAL(COUNT, rt.received.size()); } QPID_AUTO_TEST_CASE(testCloseBusyReceiver) { MessagingFixture fix; Receiver r = fix.session.createReceiver("closeReceiver;{create:always}"); ReceiveThread rt(r); sys::Thread thread(rt); sys::usleep(1000); // Give the receive thread time to block. r.close(); thread.join(); BOOST_CHECK_EQUAL(rt.error, string()); // Fetching on closed receiver should fail. Message m; BOOST_CHECK(!r.fetch(m, Duration(0))); BOOST_CHECK_THROW(r.fetch(Duration(0)), NoMessageAvailable); } QPID_AUTO_TEST_CASE(testCloseSessionBusyReceiver) { MessagingFixture fix; Receiver r = fix.session.createReceiver("closeSession;{create:always}"); ReceiveThread rt(r); sys::Thread thread(rt); sys::usleep(1000); // Give the receive thread time to block. fix.session.close(); thread.join(); BOOST_CHECK_EQUAL(rt.error, string()); // Fetching on closed receiver should fail. Message m; BOOST_CHECK(!r.fetch(m, Duration(0))); BOOST_CHECK_THROW(r.fetch(Duration(0)), NoMessageAvailable); } QPID_AUTO_TEST_CASE(testConcurrentSendNextReceiver) { MessagingFixture fix; Receiver r = fix.session.createReceiver("concurrent;{create:always,link:{reliability:unreliable}}"); const size_t COUNT=100; r.setCapacity(COUNT); NextReceiverThread rt(fix.session); sys::Thread thread(rt); sys::usleep(1000); // Give the receive thread time to block. Sender s = fix.session.createSender("concurrent;{create:always}"); for (size_t i = 0; i < COUNT; ++i) { s.send(Message()); } s.send(Message("END")); thread.join(); BOOST_CHECK_EQUAL(rt.error, string()); BOOST_CHECK_EQUAL(COUNT, rt.received.size()); } QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests