diff options
-rw-r--r-- | qpid/cpp/src/qpid/client/Demux.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Demux.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionCore.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/tests/InProcessBroker.h | 38 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 7 | ||||
-rw-r--r-- | qpid/cpp/src/tests/exception_test.cpp | 116 |
6 files changed, 115 insertions, 65 deletions
diff --git a/qpid/cpp/src/qpid/client/Demux.cpp b/qpid/cpp/src/qpid/client/Demux.cpp index bd1dda0ae9..e61103981b 100644 --- a/qpid/cpp/src/qpid/client/Demux.cpp +++ b/qpid/cpp/src/qpid/client/Demux.cpp @@ -74,6 +74,15 @@ void Demux::close() defaultQueue->close(); } +void Demux::open() +{ + sys::Mutex::ScopedLock l(lock); + for (iterator i = records.begin(); i != records.end(); i++) { + i->queue->open(); + } + defaultQueue->open(); +} + Demux::QueuePtr Demux::add(const std::string& name, Condition condition) { sys::Mutex::ScopedLock l(lock); diff --git a/qpid/cpp/src/qpid/client/Demux.h b/qpid/cpp/src/qpid/client/Demux.h index 5aaf75db44..234282a8d2 100644 --- a/qpid/cpp/src/qpid/client/Demux.h +++ b/qpid/cpp/src/qpid/client/Demux.h @@ -51,6 +51,7 @@ public: void handle(framing::FrameSet::shared_ptr); void close(); + void open(); QueuePtr add(const std::string& name, Condition); void remove(const std::string& name); diff --git a/qpid/cpp/src/qpid/client/SessionCore.cpp b/qpid/cpp/src/qpid/client/SessionCore.cpp index 3f042bc13a..ee9e9570ed 100644 --- a/qpid/cpp/src/qpid/client/SessionCore.cpp +++ b/qpid/cpp/src/qpid/client/SessionCore.cpp @@ -125,26 +125,26 @@ void SessionCore::detach(int c, const std::string& t) { channel.next = 0; code=c; text=t; + l3.getDemux().close(); } void SessionCore::doClose(int code, const std::string& text) { if (state != CLOSED) { session.reset(); - l3.getDemux().close(); - l3.getCompletionTracker().close(); detach(code, text); setState(CLOSED); + l3.getCompletionTracker().close(); } invariant(); } void SessionCore::doSuspend(int code, const std::string& text) { - if (state != CLOSED) { - invariant(); + if (state != CLOSED && state != SUSPENDED) { detach(code, text); session->suspend(); setState(SUSPENDED); } + invariant(); } ExecutionHandler& SessionCore::getExecution() { // user thread @@ -221,6 +221,7 @@ void SessionCore::resume(shared_ptr<ConnectionImpl> c) { channel.handle(*i); // Direct to channel. check(); } + l3.getDemux().open(); } } diff --git a/qpid/cpp/src/tests/InProcessBroker.h b/qpid/cpp/src/tests/InProcessBroker.h index 9fa0135502..f014941743 100644 --- a/qpid/cpp/src/tests/InProcessBroker.h +++ b/qpid/cpp/src/tests/InProcessBroker.h @@ -99,7 +99,8 @@ class InProcessConnector : } } } - catch (const ClosedException&) { + catch (const std::exception& e) { + QPID_LOG(debug, QPID_MSG(receiver << " Terminated: " << e.what())); return; } } @@ -155,7 +156,8 @@ class InProcessConnector : } }; - InProcessConnector(shared_ptr<broker::Broker> b, + + InProcessConnector(shared_ptr<broker::Broker> b=broker::Broker::create(), framing::ProtocolVersion v=framing::ProtocolVersion()) : Connector(v), protocolInit(v), @@ -204,6 +206,8 @@ class InProcessConnector : clientOut.queue.setConnectionInputHandler(0); } + shared_ptr<broker::Broker> getBroker() { return broker; } + private: sys::Mutex lock; framing::ProtocolInitiation protocolInit; @@ -215,29 +219,25 @@ class InProcessConnector : }; struct InProcessConnection : public client::Connection { - InProcessConnection(shared_ptr<broker::Broker> b) + /** Connect to an existing broker */ + InProcessConnection(shared_ptr<broker::Broker> b=broker::Broker::create()) : client::Connection( - shared_ptr<client::Connector>( - new InProcessConnector(b))) - { - open(""); - } + shared_ptr<client::Connector>(new InProcessConnector(b))) + { open(""); } - ~InProcessConnection() { } + InProcessConnector& getConnector() { + return static_cast<InProcessConnector&>(*impl->getConnector()); + } /** Simulate disconnected network connection. */ - void disconnect() { impl->getConnector()->close(); } + void disconnect() { getConnector().close(); } - /** Sliently discard frames sent by either party, lost network traffic. */ - void discard() { - dynamic_pointer_cast<InProcessConnector>( - impl->getConnector())->discard(); - } -}; + /** Discard frames, simulates lost network traffic. */ + void discard() { getConnector().discard(); } -/** A connector with its own broker */ -struct InProcessBroker : public InProcessConnector { - InProcessBroker() : InProcessConnector(broker::Broker::create()) {} + shared_ptr<broker::Broker> getBroker() { + return getConnector().getBroker(); + } }; } // namespace qpid diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index b99819f431..aa72a15c3f 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -26,8 +26,10 @@ CLEANFILES= TESTS+=unit_test check_PROGRAMS+=unit_test -unit_test_LDADD=-lboost_unit_test_framework -lboost_regex $(lib_common) +unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \ + $(lib_client) $(lib_broker) unit_test_SOURCES= unit_test.cpp unit_test.h \ + exception_test.cpp \ RefCounted.cpp RefCountedMap.cpp \ SessionState.cpp Blob.cpp logging.cpp \ Url.cpp Uuid.cpp \ @@ -91,7 +93,6 @@ unit_tests = \ testprogs= \ client_test \ - exception_test \ topic_listener \ topic_publisher # echo_service @@ -100,7 +101,7 @@ check_PROGRAMS += $(testprogs) interop_runner TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test -system_tests = client_test exception_test quick_perftest quick_topictest +system_tests = client_test quick_perftest quick_topictest TESTS += run-unit-tests start_broker $(system_tests) python_tests stop_broker EXTRA_DIST += \ diff --git a/qpid/cpp/src/tests/exception_test.cpp b/qpid/cpp/src/tests/exception_test.cpp index 3783ae6901..d7094c9bfd 100644 --- a/qpid/cpp/src/tests/exception_test.cpp +++ b/qpid/cpp/src/tests/exception_test.cpp @@ -19,48 +19,86 @@ * */ -#include <iostream> +#include "unit_test.h" +#include "InProcessBroker.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" +#include "qpid/framing/reply_exceptions.h" -#include "TestOptions.h" -#include "qpid/client/Channel.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Message.h" +QPID_AUTO_TEST_SUITE(exception_test) -using namespace qpid::client; -using namespace qpid::sys; -using std::string; -int main(int argc, char** argv) -{ - qpid::TestOptions opts; - opts.parse(argc, argv); +using namespace std; +using namespace qpid; +using namespace client; +using namespace framing; - try { - Connection con(opts.trace); - con.open(opts.host, opts.port, opts.username, opts.password, opts.virtualhost); +struct Fixture { + InProcessConnection connection; + InProcessConnection connection2; + Session_0_10 session; + SubscriptionManager sub; + LocalQueue q; - Queue queue("I don't exist!"); - Channel channel; - con.openChannel(channel); - channel.start(); - //test handling of get (which is a bit odd) - try { - Message msg; - if (channel.get(msg, queue)) { - std::cout << "Received " << msg.getData() << " from " << queue.getName() << std::endl; - } else { - std::cout << "Queue " << queue.getName() << " was empty." << std::endl; - } - con.close(); - return 1; - } catch (const qpid::ChannelException& e) { - std::cout << "get failed as expected: " << e.what() << std::endl; - } - - con.close(); - return 0; - } catch(const std::exception& e) { - std::cout << "got unexpected exception: " << e.what() << std::endl; - return 1; + Fixture() : connection(), + connection2(connection.getBroker()), + session(connection.newSession()), + sub(session) + { + session.queueDeclare(arg::queue="q"); } -} +}; + + +// TODO aconway 2007-11-30: need InProcessBroker to be a more accurate +// simulation of shutdown behaviour. It should override only +// Connector.run() to substitute NetworkQueues for the Dispatcher. +// +// template <class Ex> +// struct Catcher : public sys::Runnable { +// Session_0_10 s; +// boost::function<void ()> f; +// bool caught; +// Catcher(Session_0_10 s_, boost::function<void ()> f_) +// : s(s_), f(f_), caught(false) {} +// void run() { +// try { f(); } catch(const Ex& e) { +// caught=true; +// BOOST_MESSAGE(e.what()); +// } +// } +// }; + +// BOOST_FIXTURE_TEST_CASE(DisconnectedGet, Fixture) { +// Catcher<Exception> get(session, boost::bind(&Session_0_10::get, session)); +// sub.subscribe(q, "q"); +// sys::Thread t(get); +// connection.disconnect(); +// t.join(); +// BOOST_CHECK(get.caught); +// } + +// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, Fixture) { +// struct NullListener : public MessageListener { +// void received(Message&) { BOOST_FAIL("Unexpected message"); } +// } l; +// sub.subscribe(l, "q"); +// connection.disconnect(); +// try { +// sub.run(); +// BOOST_FAIL("Expected exception"); +// } catch (const Exception&e) { BOOST_FAIL(e.what()); } +// try { +// session.queueDeclare(arg::queue="foo"); +// BOOST_FAIL("Expected exception"); +// } catch (const Exception&e) { BOOST_FAIL(e.what()); } +// } + +// TODO aconway 2007-11-30: setSynchronous appears not to work. +// BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, Fixture) { +// session.setSynchronous(true); +// BOOST_CHECK_THROW(sub.subscribe(q, "no such queue"), NotFoundException); +// } + +QPID_AUTO_TEST_SUITE_END() |