summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-08 00:36:42 +0000
committerAlan Conway <aconway@apache.org>2008-10-08 00:36:42 +0000
commit83633e01c8c8df0aa23196a68eb29dcdc245fb48 (patch)
treee9c25e373d6ef862777f4f671b5384f27906c5dd /qpid/cpp
parentb56f731ad9d5d4b4b21f953dbd1103662b3d0b06 (diff)
downloadqpid-python-83633e01c8c8df0aa23196a68eb29dcdc245fb48.tar.gz
rubygen/framing.0-10/constants.rb: create functions for all 3 exception subclasses.
client: added session suspend/resume functions, resume not implemented yet. ClientSessionTest: enabled compilation of suspend/resume tests with expected failures. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@702674 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rwxr-xr-xqpid/cpp/rubygen/framing.0-10/constants.rb58
-rw-r--r--qpid/cpp/src/qpid/client/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/client/Results.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/Results.h1
-rw-r--r--qpid/cpp/src/qpid/client/SessionBase_0_10.cpp6
-rw-r--r--qpid/cpp/src/qpid/client/SessionBase_0_10.h8
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.cpp30
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.h10
-rw-r--r--qpid/cpp/src/tests/ClientSessionTest.cpp64
9 files changed, 105 insertions, 77 deletions
diff --git a/qpid/cpp/rubygen/framing.0-10/constants.rb b/qpid/cpp/rubygen/framing.0-10/constants.rb
index 179e5f9f93..c2b43dbf37 100755
--- a/qpid/cpp/rubygen/framing.0-10/constants.rb
+++ b/qpid/cpp/rubygen/framing.0-10/constants.rb
@@ -50,18 +50,18 @@ class ConstantsGen < CppGen
h_file("#{@dir}/enum") {
# Constants for enum domains.
namespace(@namespace) {
- @amqp.domains.each { |d| define_enum(d.enum) if d.enum }
+ @amqp.domains.each { |d| declare_enum(d.enum) if d.enum }
@amqp.classes.each { |c|
enums=c.collect_all(AmqpEnum)
if !enums.empty? then
- namespace(c.nsname) { enums.each { |e| define_enum(e) } }
+ namespace(c.nsname) { enums.each { |e| declare_enum(e) } }
end
}
}
}
end
- def define_enum(enum)
+ def declare_enum(enum)
# Generated like this: enum containing_class::Foo { FOO_X, FOO_Y; }
name="#{enum.parent.name.caps}"
prefix=enum.parent.name.shout+"_"
@@ -70,7 +70,7 @@ class ConstantsGen < CppGen
}
end
- def define_exception(c, base, package)
+ def declare_exception(c, base, package)
name=c.name.caps+"Exception"
genl
doxygen_comment { genl c.doc }
@@ -80,9 +80,26 @@ class ConstantsGen < CppGen
}
end
- def define_exceptions_for(class_name, domain_name, base)
+ def declare_exceptions(class_name, domain_name, base)
enum = @amqp.class_(class_name).domain(domain_name).enum
- enum.choices.each { |c| define_exception(c, base, class_name) unless c.name == "normal" }
+ enum.choices.each { |c| declare_exception(c, base, class_name) unless c.name == "normal" }
+ genl
+ genl "sys::ExceptionHolder create#{base}(int code, const std::string& text);"
+ end
+
+ def create_exception(class_name, domain_name, base, invalid)
+ scope("sys::ExceptionHolder create#{base}(int code, const std::string& text) {") {
+ genl "sys::ExceptionHolder holder;"
+ scope("switch (code) {") {
+ enum = @amqp.class_(class_name).domain(domain_name).enum
+ enum.choices.each { |c|
+ genl "case #{c.value}: holder = new #{c.name.caps}Exception(text); break;" unless c.name == "normal"
+ }
+ genl "default: assert(0);"
+ genl " holder = new #{invalid}(QPID_MSG(\"Bad exception code: \" << code << \": \" << text));"
+ }
+ genl "return holder;"
+ }
end
def reply_exceptions_h()
@@ -90,12 +107,9 @@ class ConstantsGen < CppGen
include "qpid/Exception"
include "qpid/sys/ExceptionHolder"
namespace(@namespace) {
- define_exceptions_for("execution", "error-code", "SessionException")
- define_exceptions_for("connection", "close-code", "ConnectionException")
- define_exceptions_for("session", "detach-code", "ChannelException")
- genl
- genl "void throwExecutionException(int code, const std::string& text);"
- genl "void setExecutionException(sys::ExceptionHolder& holder, int code, const std::string& text);"
+ declare_exceptions("execution", "error-code", "SessionException")
+ declare_exceptions("connection", "close-code", "ConnectionException")
+ declare_exceptions("session", "detach-code", "ChannelException")
}
}
end
@@ -106,21 +120,11 @@ class ConstantsGen < CppGen
include "<sstream>"
include "<assert.h>"
namespace("qpid::framing") {
- scope("void throwExecutionException(int code, const std::string& text) {"){
- genl "sys::ExceptionHolder h;"
- genl "setExecutionException(h, code, text);"
- genl "h.raise();"
- }
- scope("void setExecutionException(sys::ExceptionHolder& holder, int code, const std::string& text) {"){
- scope("switch (code) {") {
- enum = @amqp.class_("execution").domain("error-code").enum
- enum.choices.each { |c|
- genl "case #{c.value}: holder = new #{c.name.caps}Exception(text); break;"
- }
- genl 'default: assert(0);'
- genl ' holder = new InvalidArgumentException(QPID_MSG("Bad exception code: " << code << ": " << text));'
- }
- }
+ create_exception("execution", "error-code", "SessionException", "InvalidArgumentException")
+ # FIXME aconway 2008-10-07: there are no good exception codes in 0-10 for an invalid code.
+ # The following choices are arbitrary.
+ create_exception("connection", "close-code", "ConnectionException", "FramingErrorException")
+ create_exception("session", "detach-code", "ChannelException", "NotAttachedException")
}
}
end
diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h
index cf5b09b255..a1575dd524 100644
--- a/qpid/cpp/src/qpid/client/Connection.h
+++ b/qpid/cpp/src/qpid/client/Connection.h
@@ -169,6 +169,7 @@ class Connection
friend class ConnectionAccess; ///<@internal
+ friend class SessionBase_0_10; ///<@internal
};
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/Results.cpp b/qpid/cpp/src/qpid/client/Results.cpp
index c605af2878..7a2d0b6f71 100644
--- a/qpid/cpp/src/qpid/client/Results.cpp
+++ b/qpid/cpp/src/qpid/client/Results.cpp
@@ -31,6 +31,10 @@ namespace client {
Results::Results() {}
+Results::~Results() {
+ try { close(); } catch (const std::exception& e) { assert(0); }
+}
+
void Results::close()
{
for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) {
diff --git a/qpid/cpp/src/qpid/client/Results.h b/qpid/cpp/src/qpid/client/Results.h
index 667f35089c..4c49f6b05b 100644
--- a/qpid/cpp/src/qpid/client/Results.h
+++ b/qpid/cpp/src/qpid/client/Results.h
@@ -38,6 +38,7 @@ public:
typedef boost::shared_ptr<FutureResult> FutureResultPtr;
Results();
+ ~Results();
void completed(const framing::SequenceSet& set);
void received(const framing::SequenceNumber& id, const std::string& result);
FutureResultPtr listenForResult(const framing::SequenceNumber& point);
diff --git a/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp b/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
index 974acbfcf6..50cfb4b09d 100644
--- a/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
+++ b/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
@@ -19,10 +19,12 @@
*
*/
#include "SessionBase_0_10.h"
+#include "Connection.h"
#include "qpid/framing/all_method_bodies.h"
namespace qpid {
namespace client {
+
using namespace framing;
SessionBase_0_10::SessionBase_0_10() {}
@@ -57,6 +59,10 @@ void SessionBase_0_10::sendCompletion()
impl->sendCompletion();
}
+void SessionBase_0_10::suspend() { impl->suspend(); }
+void SessionBase_0_10::resume(Connection c) { impl->resume(c.impl); }
+uint32_t SessionBase_0_10::timeout(uint32_t seconds) { return impl->setTimeout(seconds); }
+
SessionId SessionBase_0_10::getId() const { return impl->getId(); }
framing::FrameSet::shared_ptr SessionBase_0_10::get() { return impl->get(); }
diff --git a/qpid/cpp/src/qpid/client/SessionBase_0_10.h b/qpid/cpp/src/qpid/client/SessionBase_0_10.h
index 8634164dd1..429f684424 100644
--- a/qpid/cpp/src/qpid/client/SessionBase_0_10.h
+++ b/qpid/cpp/src/qpid/client/SessionBase_0_10.h
@@ -38,6 +38,8 @@
namespace qpid {
namespace client {
+class Connection;
+
using std::string;
using framing::Content;
using framing::FieldTable;
@@ -91,6 +93,12 @@ class SessionBase_0_10 {
/** Set the timeout for this session. */
uint32_t timeout(uint32_t seconds);
+ /** Suspend the session - detach it from its connection */
+ void suspend();
+
+ /** Resume a suspended session with a new connection */
+ void resume(Connection);
+
Execution& getExecution();
void flush();
void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp
index 3e1ea8b724..5c61248b5a 100644
--- a/qpid/cpp/src/qpid/client/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp
@@ -105,7 +105,7 @@ void SessionImpl::open(uint32_t timeout) // user thread
waitFor(ATTACHED);
//TODO: timeout will not be set locally until get response to
//confirm, should we wait for that?
- proxy.requestTimeout(timeout);
+ setTimeout(timeout);
proxy.commandPoint(nextOut, 0);
} else {
throw Exception("Open already called for this session");
@@ -115,11 +115,7 @@ void SessionImpl::open(uint32_t timeout) // user thread
void SessionImpl::close() //user thread
{
Lock l(state);
- if (detachedLifetime) {
- proxy.requestTimeout(0);
- //should we wait for the timeout response?
- detachedLifetime = 0;
- }
+ if (detachedLifetime) setTimeout(0);
detach();
waitFor(DETACHED);
}
@@ -613,11 +609,8 @@ void SessionImpl::exception(uint16_t errorCode,
error = EXCEPTION;
code = errorCode;
text = description;
- if (detachedLifetime) {
- proxy.requestTimeout(0);
- //should we wait for the timeout response?
- detachedLifetime = 0;
- }
+ if (detachedLifetime)
+ setTimeout(0);
}
@@ -639,10 +632,10 @@ inline void SessionImpl::waitFor(State s) //call with lock held
void SessionImpl::check() const //call with lock held.
{
switch (error) {
- case OK: break;
- case CONNECTION_CLOSE: throw ConnectionException(code, text);
- case SESSION_DETACH: throw ChannelException(code, text);
- case EXCEPTION: throwExecutionException(code, text);
+ case OK: break;
+ case CONNECTION_CLOSE: throw ConnectionException(code, text);
+ case SESSION_DETACH: throw ChannelException(code, text);
+ case EXCEPTION: createSessionException(code, text).raise();
}
}
@@ -668,4 +661,11 @@ void SessionImpl::handleClosed()
results.close();
}
+uint32_t SessionImpl::setTimeout(uint32_t seconds) {
+ proxy.requestTimeout(seconds);
+ // FIXME aconway 2008-10-07: wait for timeout response from broker
+ // and use value retured by broker.
+ detachedLifetime = seconds;
+ return detachedLifetime;
+}
}}
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h
index c63774a23a..989294b99e 100644
--- a/qpid/cpp/src/qpid/client/SessionImpl.h
+++ b/qpid/cpp/src/qpid/client/SessionImpl.h
@@ -95,6 +95,12 @@ public:
void connectionClosed(uint16_t code, const std::string& text);
void connectionBroke(uint16_t code, const std::string& text);
+ /** Set timeout in seconds, returns actual timeout allowed by broker */
+ uint32_t setTimeout(uint32_t requestedSeconds);
+
+ /** Get timeout in seconds. */
+ uint32_t getTimeout() const;
+
private:
enum ErrorType {
OK,
@@ -131,7 +137,6 @@ private:
Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
void sendContent(const framing::MethodContent&);
void waitForCompletionImpl(const framing::SequenceNumber& id);
- void requestTimeout(uint32_t timeout);
void sendCompletionImpl();
@@ -140,7 +145,8 @@ private:
void attach(const std::string& name, bool force);
void attached(const std::string& name);
void detach(const std::string& name);
- void detached(const std::string& name, uint8_t detachCode);
+ void detached(const std::string& name, uint8_t detachCode);
+ void requestTimeout(uint32_t timeout);
void timeout(uint32_t timeout);
void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);
void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp
index 0b46d39047..85497ace5d 100644
--- a/qpid/cpp/src/tests/ClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/ClientSessionTest.cpp
@@ -162,41 +162,39 @@ QPID_AUTO_TEST_CASE(testDispatcherThread)
BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData());
}
-// FIXME aconway 2008-05-26: Re-enable with final resume implementation.
-//
-// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1)
-// {
-// ClientSessionFixture fix;
-// fix.session.suspend(); // session has 0 timeout.
-// try {
-// fix.connection.resume(fix.session);
-// BOOST_FAIL("Expected InvalidArgumentException.");
-// } catch(const InternalErrorException&) {}
-// }
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1)
+{
+ ClientSessionFixture fix;
+ fix.session.suspend(); // session has 0 timeout.
+ try {
+ fix.connection.resume(fix.session);
+ BOOST_FAIL("Expected InvalidArgumentException.");
+ } catch(const InternalErrorException&) {}
+}
-// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUseSuspendedError, 1)
-// {
-// ClientSessionFixture fix;
-// fix.session =fix.session.timeout(60);
-// fix.session.suspend();
-// try {
-// fix.session.exchangeQuery(name="amq.fanout");
-// BOOST_FAIL("Expected session suspended exception");
-// } catch(const CommandInvalidException&) {}
-// }
+QPID_AUTO_TEST_CASE(testUseSuspendedError)
+{
+ ClientSessionFixture fix;
+ fix.session.timeout(60);
+ fix.session.suspend();
+ try {
+ fix.session.exchangeQuery(arg::exchange="amq.fanout");
+ BOOST_FAIL("Expected session suspended exception");
+ } catch(const NotAttachedException&) {}
+}
-// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1)
-// {
-// ClientSessionFixture fix;
-// fix.session.timeout(60);
-// fix.declareSubscribe();
-// fix.session.suspend();
-// // Make sure we are still subscribed after resume.
-// fix.connection.resume(fix.session);
-// fix.session.messageTransfer(content=TransferContent("my-message", "my-queue"));
-// FrameSet::shared_ptr msg = fix.session.get();
-// BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
-// }
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1)
+{
+ ClientSessionFixture fix;
+ fix.session.timeout(60);
+ fix.declareSubscribe();
+ fix.session.suspend();
+ // Make sure we are still subscribed after resume.
+ fix.connection.resume(fix.session);
+ fix.session.messageTransfer(arg::content=TransferContent("my-message", "my-queue"));
+ FrameSet::shared_ptr msg = fix.session.get();
+ BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
+}
QPID_AUTO_TEST_CASE(testSendToSelf) {