diff options
author | Alan Conway <aconway@apache.org> | 2008-10-08 00:36:42 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-08 00:36:42 +0000 |
commit | 83633e01c8c8df0aa23196a68eb29dcdc245fb48 (patch) | |
tree | e9c25e373d6ef862777f4f671b5384f27906c5dd /qpid/cpp | |
parent | b56f731ad9d5d4b4b21f953dbd1103662b3d0b06 (diff) | |
download | qpid-python-83633e01c8c8df0aa23196a68eb29dcdc245fb48.tar.gz |
rubygen/framing.0-10/constants.rb: create functions for all 3 exception subclasses.
client: added session suspend/resume functions, resume not implemented yet.
ClientSessionTest: enabled compilation of suspend/resume tests with expected failures.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@702674 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rwxr-xr-x | qpid/cpp/rubygen/framing.0-10/constants.rb | 58 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connection.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Results.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Results.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionBase_0_10.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionBase_0_10.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ClientSessionTest.cpp | 64 |
9 files changed, 105 insertions, 77 deletions
diff --git a/qpid/cpp/rubygen/framing.0-10/constants.rb b/qpid/cpp/rubygen/framing.0-10/constants.rb index 179e5f9f93..c2b43dbf37 100755 --- a/qpid/cpp/rubygen/framing.0-10/constants.rb +++ b/qpid/cpp/rubygen/framing.0-10/constants.rb @@ -50,18 +50,18 @@ class ConstantsGen < CppGen h_file("#{@dir}/enum") { # Constants for enum domains. namespace(@namespace) { - @amqp.domains.each { |d| define_enum(d.enum) if d.enum } + @amqp.domains.each { |d| declare_enum(d.enum) if d.enum } @amqp.classes.each { |c| enums=c.collect_all(AmqpEnum) if !enums.empty? then - namespace(c.nsname) { enums.each { |e| define_enum(e) } } + namespace(c.nsname) { enums.each { |e| declare_enum(e) } } end } } } end - def define_enum(enum) + def declare_enum(enum) # Generated like this: enum containing_class::Foo { FOO_X, FOO_Y; } name="#{enum.parent.name.caps}" prefix=enum.parent.name.shout+"_" @@ -70,7 +70,7 @@ class ConstantsGen < CppGen } end - def define_exception(c, base, package) + def declare_exception(c, base, package) name=c.name.caps+"Exception" genl doxygen_comment { genl c.doc } @@ -80,9 +80,26 @@ class ConstantsGen < CppGen } end - def define_exceptions_for(class_name, domain_name, base) + def declare_exceptions(class_name, domain_name, base) enum = @amqp.class_(class_name).domain(domain_name).enum - enum.choices.each { |c| define_exception(c, base, class_name) unless c.name == "normal" } + enum.choices.each { |c| declare_exception(c, base, class_name) unless c.name == "normal" } + genl + genl "sys::ExceptionHolder create#{base}(int code, const std::string& text);" + end + + def create_exception(class_name, domain_name, base, invalid) + scope("sys::ExceptionHolder create#{base}(int code, const std::string& text) {") { + genl "sys::ExceptionHolder holder;" + scope("switch (code) {") { + enum = @amqp.class_(class_name).domain(domain_name).enum + enum.choices.each { |c| + genl "case #{c.value}: holder = new #{c.name.caps}Exception(text); break;" unless c.name == "normal" + } + genl "default: assert(0);" + genl " holder = new #{invalid}(QPID_MSG(\"Bad exception code: \" << code << \": \" << text));" + } + genl "return holder;" + } end def reply_exceptions_h() @@ -90,12 +107,9 @@ class ConstantsGen < CppGen include "qpid/Exception" include "qpid/sys/ExceptionHolder" namespace(@namespace) { - define_exceptions_for("execution", "error-code", "SessionException") - define_exceptions_for("connection", "close-code", "ConnectionException") - define_exceptions_for("session", "detach-code", "ChannelException") - genl - genl "void throwExecutionException(int code, const std::string& text);" - genl "void setExecutionException(sys::ExceptionHolder& holder, int code, const std::string& text);" + declare_exceptions("execution", "error-code", "SessionException") + declare_exceptions("connection", "close-code", "ConnectionException") + declare_exceptions("session", "detach-code", "ChannelException") } } end @@ -106,21 +120,11 @@ class ConstantsGen < CppGen include "<sstream>" include "<assert.h>" namespace("qpid::framing") { - scope("void throwExecutionException(int code, const std::string& text) {"){ - genl "sys::ExceptionHolder h;" - genl "setExecutionException(h, code, text);" - genl "h.raise();" - } - scope("void setExecutionException(sys::ExceptionHolder& holder, int code, const std::string& text) {"){ - scope("switch (code) {") { - enum = @amqp.class_("execution").domain("error-code").enum - enum.choices.each { |c| - genl "case #{c.value}: holder = new #{c.name.caps}Exception(text); break;" - } - genl 'default: assert(0);' - genl ' holder = new InvalidArgumentException(QPID_MSG("Bad exception code: " << code << ": " << text));' - } - } + create_exception("execution", "error-code", "SessionException", "InvalidArgumentException") + # FIXME aconway 2008-10-07: there are no good exception codes in 0-10 for an invalid code. + # The following choices are arbitrary. + create_exception("connection", "close-code", "ConnectionException", "FramingErrorException") + create_exception("session", "detach-code", "ChannelException", "NotAttachedException") } } end diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h index cf5b09b255..a1575dd524 100644 --- a/qpid/cpp/src/qpid/client/Connection.h +++ b/qpid/cpp/src/qpid/client/Connection.h @@ -169,6 +169,7 @@ class Connection friend class ConnectionAccess; ///<@internal + friend class SessionBase_0_10; ///<@internal }; }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/Results.cpp b/qpid/cpp/src/qpid/client/Results.cpp index c605af2878..7a2d0b6f71 100644 --- a/qpid/cpp/src/qpid/client/Results.cpp +++ b/qpid/cpp/src/qpid/client/Results.cpp @@ -31,6 +31,10 @@ namespace client { Results::Results() {} +Results::~Results() { + try { close(); } catch (const std::exception& e) { assert(0); } +} + void Results::close() { for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) { diff --git a/qpid/cpp/src/qpid/client/Results.h b/qpid/cpp/src/qpid/client/Results.h index 667f35089c..4c49f6b05b 100644 --- a/qpid/cpp/src/qpid/client/Results.h +++ b/qpid/cpp/src/qpid/client/Results.h @@ -38,6 +38,7 @@ public: typedef boost::shared_ptr<FutureResult> FutureResultPtr; Results(); + ~Results(); void completed(const framing::SequenceSet& set); void received(const framing::SequenceNumber& id, const std::string& result); FutureResultPtr listenForResult(const framing::SequenceNumber& point); diff --git a/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp b/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp index 974acbfcf6..50cfb4b09d 100644 --- a/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp +++ b/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp @@ -19,10 +19,12 @@ * */ #include "SessionBase_0_10.h" +#include "Connection.h" #include "qpid/framing/all_method_bodies.h" namespace qpid { namespace client { + using namespace framing; SessionBase_0_10::SessionBase_0_10() {} @@ -57,6 +59,10 @@ void SessionBase_0_10::sendCompletion() impl->sendCompletion(); } +void SessionBase_0_10::suspend() { impl->suspend(); } +void SessionBase_0_10::resume(Connection c) { impl->resume(c.impl); } +uint32_t SessionBase_0_10::timeout(uint32_t seconds) { return impl->setTimeout(seconds); } + SessionId SessionBase_0_10::getId() const { return impl->getId(); } framing::FrameSet::shared_ptr SessionBase_0_10::get() { return impl->get(); } diff --git a/qpid/cpp/src/qpid/client/SessionBase_0_10.h b/qpid/cpp/src/qpid/client/SessionBase_0_10.h index 8634164dd1..429f684424 100644 --- a/qpid/cpp/src/qpid/client/SessionBase_0_10.h +++ b/qpid/cpp/src/qpid/client/SessionBase_0_10.h @@ -38,6 +38,8 @@ namespace qpid { namespace client { +class Connection; + using std::string; using framing::Content; using framing::FieldTable; @@ -91,6 +93,12 @@ class SessionBase_0_10 { /** Set the timeout for this session. */ uint32_t timeout(uint32_t seconds); + /** Suspend the session - detach it from its connection */ + void suspend(); + + /** Resume a suspended session with a new connection */ + void resume(Connection); + Execution& getExecution(); void flush(); void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index 3e1ea8b724..5c61248b5a 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -105,7 +105,7 @@ void SessionImpl::open(uint32_t timeout) // user thread waitFor(ATTACHED); //TODO: timeout will not be set locally until get response to //confirm, should we wait for that? - proxy.requestTimeout(timeout); + setTimeout(timeout); proxy.commandPoint(nextOut, 0); } else { throw Exception("Open already called for this session"); @@ -115,11 +115,7 @@ void SessionImpl::open(uint32_t timeout) // user thread void SessionImpl::close() //user thread { Lock l(state); - if (detachedLifetime) { - proxy.requestTimeout(0); - //should we wait for the timeout response? - detachedLifetime = 0; - } + if (detachedLifetime) setTimeout(0); detach(); waitFor(DETACHED); } @@ -613,11 +609,8 @@ void SessionImpl::exception(uint16_t errorCode, error = EXCEPTION; code = errorCode; text = description; - if (detachedLifetime) { - proxy.requestTimeout(0); - //should we wait for the timeout response? - detachedLifetime = 0; - } + if (detachedLifetime) + setTimeout(0); } @@ -639,10 +632,10 @@ inline void SessionImpl::waitFor(State s) //call with lock held void SessionImpl::check() const //call with lock held. { switch (error) { - case OK: break; - case CONNECTION_CLOSE: throw ConnectionException(code, text); - case SESSION_DETACH: throw ChannelException(code, text); - case EXCEPTION: throwExecutionException(code, text); + case OK: break; + case CONNECTION_CLOSE: throw ConnectionException(code, text); + case SESSION_DETACH: throw ChannelException(code, text); + case EXCEPTION: createSessionException(code, text).raise(); } } @@ -668,4 +661,11 @@ void SessionImpl::handleClosed() results.close(); } +uint32_t SessionImpl::setTimeout(uint32_t seconds) { + proxy.requestTimeout(seconds); + // FIXME aconway 2008-10-07: wait for timeout response from broker + // and use value retured by broker. + detachedLifetime = seconds; + return detachedLifetime; +} }} diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h index c63774a23a..989294b99e 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/SessionImpl.h @@ -95,6 +95,12 @@ public: void connectionClosed(uint16_t code, const std::string& text); void connectionBroke(uint16_t code, const std::string& text); + /** Set timeout in seconds, returns actual timeout allowed by broker */ + uint32_t setTimeout(uint32_t requestedSeconds); + + /** Get timeout in seconds. */ + uint32_t getTimeout() const; + private: enum ErrorType { OK, @@ -131,7 +137,6 @@ private: Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0); void sendContent(const framing::MethodContent&); void waitForCompletionImpl(const framing::SequenceNumber& id); - void requestTimeout(uint32_t timeout); void sendCompletionImpl(); @@ -140,7 +145,8 @@ private: void attach(const std::string& name, bool force); void attached(const std::string& name); void detach(const std::string& name); - void detached(const std::string& name, uint8_t detachCode); + void detached(const std::string& name, uint8_t detachCode); + void requestTimeout(uint32_t timeout); void timeout(uint32_t timeout); void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset); void expected(const framing::SequenceSet& commands, const framing::Array& fragments); diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp index 0b46d39047..85497ace5d 100644 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -162,41 +162,39 @@ QPID_AUTO_TEST_CASE(testDispatcherThread) BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData()); } -// FIXME aconway 2008-05-26: Re-enable with final resume implementation. -// -// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1) -// { -// ClientSessionFixture fix; -// fix.session.suspend(); // session has 0 timeout. -// try { -// fix.connection.resume(fix.session); -// BOOST_FAIL("Expected InvalidArgumentException."); -// } catch(const InternalErrorException&) {} -// } +QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1) +{ + ClientSessionFixture fix; + fix.session.suspend(); // session has 0 timeout. + try { + fix.connection.resume(fix.session); + BOOST_FAIL("Expected InvalidArgumentException."); + } catch(const InternalErrorException&) {} +} -// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUseSuspendedError, 1) -// { -// ClientSessionFixture fix; -// fix.session =fix.session.timeout(60); -// fix.session.suspend(); -// try { -// fix.session.exchangeQuery(name="amq.fanout"); -// BOOST_FAIL("Expected session suspended exception"); -// } catch(const CommandInvalidException&) {} -// } +QPID_AUTO_TEST_CASE(testUseSuspendedError) +{ + ClientSessionFixture fix; + fix.session.timeout(60); + fix.session.suspend(); + try { + fix.session.exchangeQuery(arg::exchange="amq.fanout"); + BOOST_FAIL("Expected session suspended exception"); + } catch(const NotAttachedException&) {} +} -// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1) -// { -// ClientSessionFixture fix; -// fix.session.timeout(60); -// fix.declareSubscribe(); -// fix.session.suspend(); -// // Make sure we are still subscribed after resume. -// fix.connection.resume(fix.session); -// fix.session.messageTransfer(content=TransferContent("my-message", "my-queue")); -// FrameSet::shared_ptr msg = fix.session.get(); -// BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); -// } +QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1) +{ + ClientSessionFixture fix; + fix.session.timeout(60); + fix.declareSubscribe(); + fix.session.suspend(); + // Make sure we are still subscribed after resume. + fix.connection.resume(fix.session); + fix.session.messageTransfer(arg::content=TransferContent("my-message", "my-queue")); + FrameSet::shared_ptr msg = fix.session.get(); + BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); +} QPID_AUTO_TEST_CASE(testSendToSelf) { |