summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/client/Demux.cpp9
-rw-r--r--qpid/cpp/src/qpid/client/Demux.h1
-rw-r--r--qpid/cpp/src/qpid/client/SessionCore.cpp9
-rw-r--r--qpid/cpp/src/tests/InProcessBroker.h38
-rw-r--r--qpid/cpp/src/tests/Makefile.am7
-rw-r--r--qpid/cpp/src/tests/exception_test.cpp116
6 files changed, 115 insertions, 65 deletions
diff --git a/qpid/cpp/src/qpid/client/Demux.cpp b/qpid/cpp/src/qpid/client/Demux.cpp
index bd1dda0ae9..e61103981b 100644
--- a/qpid/cpp/src/qpid/client/Demux.cpp
+++ b/qpid/cpp/src/qpid/client/Demux.cpp
@@ -74,6 +74,15 @@ void Demux::close()
defaultQueue->close();
}
+void Demux::open()
+{
+ sys::Mutex::ScopedLock l(lock);
+ for (iterator i = records.begin(); i != records.end(); i++) {
+ i->queue->open();
+ }
+ defaultQueue->open();
+}
+
Demux::QueuePtr Demux::add(const std::string& name, Condition condition)
{
sys::Mutex::ScopedLock l(lock);
diff --git a/qpid/cpp/src/qpid/client/Demux.h b/qpid/cpp/src/qpid/client/Demux.h
index 5aaf75db44..234282a8d2 100644
--- a/qpid/cpp/src/qpid/client/Demux.h
+++ b/qpid/cpp/src/qpid/client/Demux.h
@@ -51,6 +51,7 @@ public:
void handle(framing::FrameSet::shared_ptr);
void close();
+ void open();
QueuePtr add(const std::string& name, Condition);
void remove(const std::string& name);
diff --git a/qpid/cpp/src/qpid/client/SessionCore.cpp b/qpid/cpp/src/qpid/client/SessionCore.cpp
index 3f042bc13a..ee9e9570ed 100644
--- a/qpid/cpp/src/qpid/client/SessionCore.cpp
+++ b/qpid/cpp/src/qpid/client/SessionCore.cpp
@@ -125,26 +125,26 @@ void SessionCore::detach(int c, const std::string& t) {
channel.next = 0;
code=c;
text=t;
+ l3.getDemux().close();
}
void SessionCore::doClose(int code, const std::string& text) {
if (state != CLOSED) {
session.reset();
- l3.getDemux().close();
- l3.getCompletionTracker().close();
detach(code, text);
setState(CLOSED);
+ l3.getCompletionTracker().close();
}
invariant();
}
void SessionCore::doSuspend(int code, const std::string& text) {
- if (state != CLOSED) {
- invariant();
+ if (state != CLOSED && state != SUSPENDED) {
detach(code, text);
session->suspend();
setState(SUSPENDED);
}
+ invariant();
}
ExecutionHandler& SessionCore::getExecution() { // user thread
@@ -221,6 +221,7 @@ void SessionCore::resume(shared_ptr<ConnectionImpl> c) {
channel.handle(*i); // Direct to channel.
check();
}
+ l3.getDemux().open();
}
}
diff --git a/qpid/cpp/src/tests/InProcessBroker.h b/qpid/cpp/src/tests/InProcessBroker.h
index 9fa0135502..f014941743 100644
--- a/qpid/cpp/src/tests/InProcessBroker.h
+++ b/qpid/cpp/src/tests/InProcessBroker.h
@@ -99,7 +99,8 @@ class InProcessConnector :
}
}
}
- catch (const ClosedException&) {
+ catch (const std::exception& e) {
+ QPID_LOG(debug, QPID_MSG(receiver << " Terminated: " << e.what()));
return;
}
}
@@ -155,7 +156,8 @@ class InProcessConnector :
}
};
- InProcessConnector(shared_ptr<broker::Broker> b,
+
+ InProcessConnector(shared_ptr<broker::Broker> b=broker::Broker::create(),
framing::ProtocolVersion v=framing::ProtocolVersion()) :
Connector(v),
protocolInit(v),
@@ -204,6 +206,8 @@ class InProcessConnector :
clientOut.queue.setConnectionInputHandler(0);
}
+ shared_ptr<broker::Broker> getBroker() { return broker; }
+
private:
sys::Mutex lock;
framing::ProtocolInitiation protocolInit;
@@ -215,29 +219,25 @@ class InProcessConnector :
};
struct InProcessConnection : public client::Connection {
- InProcessConnection(shared_ptr<broker::Broker> b)
+ /** Connect to an existing broker */
+ InProcessConnection(shared_ptr<broker::Broker> b=broker::Broker::create())
: client::Connection(
- shared_ptr<client::Connector>(
- new InProcessConnector(b)))
- {
- open("");
- }
+ shared_ptr<client::Connector>(new InProcessConnector(b)))
+ { open(""); }
- ~InProcessConnection() { }
+ InProcessConnector& getConnector() {
+ return static_cast<InProcessConnector&>(*impl->getConnector());
+ }
/** Simulate disconnected network connection. */
- void disconnect() { impl->getConnector()->close(); }
+ void disconnect() { getConnector().close(); }
- /** Sliently discard frames sent by either party, lost network traffic. */
- void discard() {
- dynamic_pointer_cast<InProcessConnector>(
- impl->getConnector())->discard();
- }
-};
+ /** Discard frames, simulates lost network traffic. */
+ void discard() { getConnector().discard(); }
-/** A connector with its own broker */
-struct InProcessBroker : public InProcessConnector {
- InProcessBroker() : InProcessConnector(broker::Broker::create()) {}
+ shared_ptr<broker::Broker> getBroker() {
+ return getConnector().getBroker();
+ }
};
} // namespace qpid
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index b99819f431..aa72a15c3f 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -26,8 +26,10 @@ CLEANFILES=
TESTS+=unit_test
check_PROGRAMS+=unit_test
-unit_test_LDADD=-lboost_unit_test_framework -lboost_regex $(lib_common)
+unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \
+ $(lib_client) $(lib_broker)
unit_test_SOURCES= unit_test.cpp unit_test.h \
+ exception_test.cpp \
RefCounted.cpp RefCountedMap.cpp \
SessionState.cpp Blob.cpp logging.cpp \
Url.cpp Uuid.cpp \
@@ -91,7 +93,6 @@ unit_tests = \
testprogs= \
client_test \
- exception_test \
topic_listener \
topic_publisher
# echo_service
@@ -100,7 +101,7 @@ check_PROGRAMS += $(testprogs) interop_runner
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test
-system_tests = client_test exception_test quick_perftest quick_topictest
+system_tests = client_test quick_perftest quick_topictest
TESTS += run-unit-tests start_broker $(system_tests) python_tests stop_broker
EXTRA_DIST += \
diff --git a/qpid/cpp/src/tests/exception_test.cpp b/qpid/cpp/src/tests/exception_test.cpp
index 3783ae6901..d7094c9bfd 100644
--- a/qpid/cpp/src/tests/exception_test.cpp
+++ b/qpid/cpp/src/tests/exception_test.cpp
@@ -19,48 +19,86 @@
*
*/
-#include <iostream>
+#include "unit_test.h"
+#include "InProcessBroker.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/framing/reply_exceptions.h"
-#include "TestOptions.h"
-#include "qpid/client/Channel.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/Message.h"
+QPID_AUTO_TEST_SUITE(exception_test)
-using namespace qpid::client;
-using namespace qpid::sys;
-using std::string;
-int main(int argc, char** argv)
-{
- qpid::TestOptions opts;
- opts.parse(argc, argv);
+using namespace std;
+using namespace qpid;
+using namespace client;
+using namespace framing;
- try {
- Connection con(opts.trace);
- con.open(opts.host, opts.port, opts.username, opts.password, opts.virtualhost);
+struct Fixture {
+ InProcessConnection connection;
+ InProcessConnection connection2;
+ Session_0_10 session;
+ SubscriptionManager sub;
+ LocalQueue q;
- Queue queue("I don't exist!");
- Channel channel;
- con.openChannel(channel);
- channel.start();
- //test handling of get (which is a bit odd)
- try {
- Message msg;
- if (channel.get(msg, queue)) {
- std::cout << "Received " << msg.getData() << " from " << queue.getName() << std::endl;
- } else {
- std::cout << "Queue " << queue.getName() << " was empty." << std::endl;
- }
- con.close();
- return 1;
- } catch (const qpid::ChannelException& e) {
- std::cout << "get failed as expected: " << e.what() << std::endl;
- }
-
- con.close();
- return 0;
- } catch(const std::exception& e) {
- std::cout << "got unexpected exception: " << e.what() << std::endl;
- return 1;
+ Fixture() : connection(),
+ connection2(connection.getBroker()),
+ session(connection.newSession()),
+ sub(session)
+ {
+ session.queueDeclare(arg::queue="q");
}
-}
+};
+
+
+// TODO aconway 2007-11-30: need InProcessBroker to be a more accurate
+// simulation of shutdown behaviour. It should override only
+// Connector.run() to substitute NetworkQueues for the Dispatcher.
+//
+// template <class Ex>
+// struct Catcher : public sys::Runnable {
+// Session_0_10 s;
+// boost::function<void ()> f;
+// bool caught;
+// Catcher(Session_0_10 s_, boost::function<void ()> f_)
+// : s(s_), f(f_), caught(false) {}
+// void run() {
+// try { f(); } catch(const Ex& e) {
+// caught=true;
+// BOOST_MESSAGE(e.what());
+// }
+// }
+// };
+
+// BOOST_FIXTURE_TEST_CASE(DisconnectedGet, Fixture) {
+// Catcher<Exception> get(session, boost::bind(&Session_0_10::get, session));
+// sub.subscribe(q, "q");
+// sys::Thread t(get);
+// connection.disconnect();
+// t.join();
+// BOOST_CHECK(get.caught);
+// }
+
+// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, Fixture) {
+// struct NullListener : public MessageListener {
+// void received(Message&) { BOOST_FAIL("Unexpected message"); }
+// } l;
+// sub.subscribe(l, "q");
+// connection.disconnect();
+// try {
+// sub.run();
+// BOOST_FAIL("Expected exception");
+// } catch (const Exception&e) { BOOST_FAIL(e.what()); }
+// try {
+// session.queueDeclare(arg::queue="foo");
+// BOOST_FAIL("Expected exception");
+// } catch (const Exception&e) { BOOST_FAIL(e.what()); }
+// }
+
+// TODO aconway 2007-11-30: setSynchronous appears not to work.
+// BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, Fixture) {
+// session.setSynchronous(true);
+// BOOST_CHECK_THROW(sub.subscribe(q, "no such queue"), NotFoundException);
+// }
+
+QPID_AUTO_TEST_SUITE_END()