summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-05-26 18:10:05 +0000
committerAlan Conway <aconway@apache.org>2008-05-26 18:10:05 +0000
commitce7678789fe3e8c5caebb59a26aa418fbb95e5d3 (patch)
treeaffd8e2de460cba285e7c25e15f5c3d94444f905
parent0b56077cbb8b6e9cdd982cbdeaa3ec6fe1bd5434 (diff)
downloadqpid-python-ce7678789fe3e8c5caebb59a26aa418fbb95e5d3.tar.gz
Changes to Session API:
- Session is synchronous, no futures. - AsyncSession is async, returns futures. - Conversion functions sync(s) async(s) return a sync/async view of session s. - Connection::newSession - takes name, no timeout - SessionBase::getId - returns SessionId not UUID. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@660258 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/examples/examples/direct/declare_queues.cpp2
-rw-r--r--cpp/examples/examples/direct/direct_producer.cpp6
-rw-r--r--cpp/examples/examples/direct/listener.cpp2
-rw-r--r--cpp/examples/examples/fanout/fanout_producer.cpp6
-rw-r--r--cpp/examples/examples/fanout/listener.cpp5
-rw-r--r--cpp/examples/examples/pub-sub/topic_listener.cpp7
-rw-r--r--cpp/examples/examples/pub-sub/topic_publisher.cpp6
-rw-r--r--cpp/examples/examples/request-response/client.cpp6
-rw-r--r--cpp/examples/examples/request-response/server.cpp7
-rw-r--r--cpp/examples/examples/xml-exchange/declare_queues.cpp2
-rw-r--r--cpp/examples/examples/xml-exchange/listener.cpp2
-rw-r--r--cpp/examples/examples/xml-exchange/xml_producer.cpp6
-rw-r--r--cpp/rubygen/framing.0-10/Session.rb172
-rw-r--r--cpp/src/Makefile.am7
-rw-r--r--cpp/src/qpid/SessionId.h12
-rw-r--r--cpp/src/qpid/client/AckPolicy.h5
-rw-r--r--cpp/src/qpid/client/AsyncSession.h38
-rw-r--r--cpp/src/qpid/client/Connection.cpp19
-rw-r--r--cpp/src/qpid/client/Connection.h12
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp7
-rw-r--r--cpp/src/qpid/client/Dispatcher.h8
-rw-r--r--cpp/src/qpid/client/Session.h1
-rw-r--r--cpp/src/qpid/client/SessionBase.cpp84
-rw-r--r--cpp/src/qpid/client/SessionBase.h148
-rw-r--r--cpp/src/qpid/client/SessionBase_0_10.cpp64
-rw-r--r--cpp/src/qpid/client/SessionBase_0_10.h104
-rw-r--r--cpp/src/qpid/client/SessionBase_0_10Access.h42
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp42
-rw-r--r--cpp/src/qpid/client/SessionImpl.h15
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp22
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h10
-rw-r--r--cpp/src/tests/BrokerFixture.h2
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp84
-rw-r--r--cpp/src/tests/XmlClientSessionTest.cpp2
-rw-r--r--cpp/src/tests/client_test.cpp2
-rw-r--r--cpp/src/tests/consume.cpp2
-rw-r--r--cpp/src/tests/exception_test.cpp1
-rw-r--r--cpp/src/tests/latencytest.cpp10
-rw-r--r--cpp/src/tests/perftest.cpp8
-rw-r--r--cpp/src/tests/publish.cpp6
-rw-r--r--cpp/src/tests/topic_listener.cpp11
-rw-r--r--cpp/src/tests/topic_publisher.cpp10
-rwxr-xr-xcpp/src/tests/topictest2
-rw-r--r--cpp/src/tests/txtest.cpp6
44 files changed, 537 insertions, 478 deletions
diff --git a/cpp/examples/examples/direct/declare_queues.cpp b/cpp/examples/examples/direct/declare_queues.cpp
index 07c9c63ff6..0cdb472665 100644
--- a/cpp/examples/examples/direct/declare_queues.cpp
+++ b/cpp/examples/examples/direct/declare_queues.cpp
@@ -60,7 +60,7 @@ int main(int argc, char** argv) {
try {
connection.open(host, port);
- Session session = connection.newSession(ASYNC);
+ Session session = connection.newSession();
//--------- Main body of program --------------------------------------------
diff --git a/cpp/examples/examples/direct/direct_producer.cpp b/cpp/examples/examples/direct/direct_producer.cpp
index 9b40f733c2..348b82efae 100644
--- a/cpp/examples/examples/direct/direct_producer.cpp
+++ b/cpp/examples/examples/direct/direct_producer.cpp
@@ -67,7 +67,7 @@ int main(int argc, char** argv) {
Message message;
try {
connection.open(host, port);
- Session session = connection.newSession(ASYNC);
+ Session session = connection.newSession();
//--------- Main body of program --------------------------------------------
@@ -85,7 +85,9 @@ int main(int argc, char** argv) {
message_data << "Message " << i;
message.setData(message_data.str());
- session.messageTransfer(arg::content=message, arg::destination="amq.direct");
+ // Asynchronous transfer sends messages as quickly as
+ // possible without waiting for confirmation.
+ async(session).messageTransfer(arg::content=message, arg::destination="amq.direct");
}
// And send a final message to indicate termination.
diff --git a/cpp/examples/examples/direct/listener.cpp b/cpp/examples/examples/direct/listener.cpp
index 7ee68ebf35..fc2fa96ead 100644
--- a/cpp/examples/examples/direct/listener.cpp
+++ b/cpp/examples/examples/direct/listener.cpp
@@ -65,7 +65,7 @@ int main(int argc, char** argv) {
Message msg;
try {
connection.open(host, port);
- Session session = connection.newSession(ASYNC);
+ Session session = connection.newSession();
//--------- Main body of program --------------------------------------------
diff --git a/cpp/examples/examples/fanout/fanout_producer.cpp b/cpp/examples/examples/fanout/fanout_producer.cpp
index 8ae6bbc242..9a80c86c20 100644
--- a/cpp/examples/examples/fanout/fanout_producer.cpp
+++ b/cpp/examples/examples/fanout/fanout_producer.cpp
@@ -67,7 +67,7 @@ int main(int argc, char** argv) {
Message message;
try {
connection.open(host, port);
- Session session = connection.newSession(ASYNC);
+ Session session = connection.newSession();
//--------- Main body of program --------------------------------------------
@@ -83,7 +83,9 @@ int main(int argc, char** argv) {
message_data << "Message " << i;
message.setData(message_data.str());
- session.messageTransfer(arg::content=message, arg::destination="amq.fanout");
+ // Asynchronous transfer sends messages as quickly as
+ // possible without waiting for confirmation.
+ async(session).messageTransfer(arg::content=message, arg::destination="amq.fanout");
}
// And send a final message to indicate termination.
diff --git a/cpp/examples/examples/fanout/listener.cpp b/cpp/examples/examples/fanout/listener.cpp
index c3123bb944..79a115f99f 100644
--- a/cpp/examples/examples/fanout/listener.cpp
+++ b/cpp/examples/examples/fanout/listener.cpp
@@ -65,7 +65,7 @@ int main(int argc, char** argv) {
Message msg;
try {
connection.open(host, port);
- Session session = connection.newSession(ASYNC);
+ Session session = connection.newSession();
//--------- Main body of program --------------------------------------------
@@ -90,9 +90,6 @@ int main(int argc, char** argv) {
Listener listener(subscriptions);
subscriptions.subscribe(listener, myQueue);
- // Wait for the broker to indicate that our queues have been created.
- session.sync();
-
// Deliver messages until the subscription is cancelled
// by Listener::received()
std::cout << "Listening" << std::endl;
diff --git a/cpp/examples/examples/pub-sub/topic_listener.cpp b/cpp/examples/examples/pub-sub/topic_listener.cpp
index 883d6eba42..4d854e57ff 100644
--- a/cpp/examples/examples/pub-sub/topic_listener.cpp
+++ b/cpp/examples/examples/pub-sub/topic_listener.cpp
@@ -91,7 +91,7 @@ void Listener::prepareQueue(std::string queue, std::string routing_key) {
* the queue name parameter with the Session ID.
*/
- queue += session.getId().str();
+ queue += session.getId().getName();
std::cout << "Declaring queue: " << queue << std::endl;
/* Declare an exclusive queue on the broker
@@ -138,7 +138,7 @@ int main(int argc, char** argv) {
Connection connection;
try {
connection.open(host, port);
- Session session = connection.newSession(ASYNC);
+ Session session = connection.newSession();
//--------- Main body of program --------------------------------------------
@@ -153,9 +153,6 @@ int main(int argc, char** argv) {
listener.prepareQueue("news", "#.news");
listener.prepareQueue("weather", "#.weather");
- // Wait for the broker to indicate that our queues have been created.
- session.sync();
-
std::cout << "Listening for messages ..." << std::endl;
// Give up control and receive messages
diff --git a/cpp/examples/examples/pub-sub/topic_publisher.cpp b/cpp/examples/examples/pub-sub/topic_publisher.cpp
index 94cd3a0f56..edf6ff28f7 100644
--- a/cpp/examples/examples/pub-sub/topic_publisher.cpp
+++ b/cpp/examples/examples/pub-sub/topic_publisher.cpp
@@ -72,7 +72,9 @@ void publish_messages(Session& session, string routing_key)
message_data << "Message " << i;
message.setData(message_data.str());
- session.messageTransfer(arg::content=message, arg::destination="amq.topic");
+ // Asynchronous transfer sends messages as quickly as
+ // possible without waiting for confirmation.
+ async(session).messageTransfer(arg::content=message, arg::destination="amq.topic");
}
}
@@ -101,7 +103,7 @@ int main(int argc, char** argv) {
Message message;
try {
connection.open(host, port);
- Session session = connection.newSession(ASYNC);
+ Session session = connection.newSession();
//--------- Main body of program --------------------------------------------
diff --git a/cpp/examples/examples/request-response/client.cpp b/cpp/examples/examples/request-response/client.cpp
index 073af596bf..eb4c00c2f1 100644
--- a/cpp/examples/examples/request-response/client.cpp
+++ b/cpp/examples/examples/request-response/client.cpp
@@ -114,7 +114,7 @@ int main(int argc, char** argv) {
Message request;
try {
connection.open(host, port);
- Session session = connection.newSession(ASYNC);
+ Session session = connection.newSession();
//--------- Main body of program --------------------------------------------
@@ -157,7 +157,9 @@ int main(int argc, char** argv) {
for (int i=0; i<4; i++) {
request.setData(s[i]);
- session.messageTransfer(arg::content=request, arg::destination="amq.direct");
+ // Asynchronous transfer sends messages as quickly as
+ // possible without waiting for confirmation.
+ async(session).messageTransfer(arg::content=request, arg::destination="amq.direct");
std::cout << "Request: " << s[i] << std::endl;
}
diff --git a/cpp/examples/examples/request-response/server.cpp b/cpp/examples/examples/request-response/server.cpp
index 6c9bc7ffa6..1946facd0e 100644
--- a/cpp/examples/examples/request-response/server.cpp
+++ b/cpp/examples/examples/request-response/server.cpp
@@ -120,7 +120,10 @@ void Listener::received(Message& request) {
// Send it back to the user
response.getDeliveryProperties().setRoutingKey(routingKey);
- session.messageTransfer(arg::content=response, arg::destination="amq.direct");
+
+ // Asynchronous transfer sends messages as quickly as
+ // possible without waiting for confirmation.
+ async(session).messageTransfer(arg::content=response, arg::destination="amq.direct");
}
@@ -131,7 +134,7 @@ int main(int argc, char** argv) {
Message message;
try {
connection.open(host, port);
- Session session = connection.newSession(ASYNC);
+ Session session = connection.newSession();
//--------- Main body of program --------------------------------------------
diff --git a/cpp/examples/examples/xml-exchange/declare_queues.cpp b/cpp/examples/examples/xml-exchange/declare_queues.cpp
index b89f95cd97..1307c473c5 100644
--- a/cpp/examples/examples/xml-exchange/declare_queues.cpp
+++ b/cpp/examples/examples/xml-exchange/declare_queues.cpp
@@ -39,7 +39,7 @@ int main(int argc, char** argv) {
try {
connection.open(host, port);
- Session session = connection.newSession(ASYNC);
+ Session session = connection.newSession();
//--------- Main body of program --------------------------------------------
diff --git a/cpp/examples/examples/xml-exchange/listener.cpp b/cpp/examples/examples/xml-exchange/listener.cpp
index b5e057c4b1..559cfaf8c9 100644
--- a/cpp/examples/examples/xml-exchange/listener.cpp
+++ b/cpp/examples/examples/xml-exchange/listener.cpp
@@ -65,7 +65,7 @@ int main(int argc, char** argv) {
Message msg;
try {
connection.open(host, port);
- Session session = connection.newSession(ASYNC);
+ Session session = connection.newSession();
//--------- Main body of program --------------------------------------------
diff --git a/cpp/examples/examples/xml-exchange/xml_producer.cpp b/cpp/examples/examples/xml-exchange/xml_producer.cpp
index 9333e20438..42e9ec4356 100644
--- a/cpp/examples/examples/xml-exchange/xml_producer.cpp
+++ b/cpp/examples/examples/xml-exchange/xml_producer.cpp
@@ -44,7 +44,7 @@ int main(int argc, char** argv) {
Message message;
try {
connection.open(host, port);
- Session session = connection.newSession(ASYNC);
+ Session session = connection.newSession();
//--------- Main body of program --------------------------------------------
@@ -66,7 +66,9 @@ int main(int argc, char** argv) {
std::cout << "Message data: " << message_data.str() << std::endl;
message.setData(message_data.str());
- session.messageTransfer(arg::content=message, arg::destination="xml");
+ // Asynchronous transfer sends messages as quickly as
+ // possible without waiting for confirmation.
+ async(session).messageTransfer(arg::content=message, arg::destination="xml");
}
// And send a final message to indicate termination.
diff --git a/cpp/rubygen/framing.0-10/Session.rb b/cpp/rubygen/framing.0-10/Session.rb
index a2f06bfcd7..d679b1c0bc 100644
--- a/cpp/rubygen/framing.0-10/Session.rb
+++ b/cpp/rubygen/framing.0-10/Session.rb
@@ -25,6 +25,33 @@ class CppGen
end
end
+# Sync vs. async APIs
+module SyncAsync
+ def sync_prefix() @async ? "Async" : "" end
+ def sync_adjective() @async ? "asynchronous" : "synchronous" end
+ def sync_convert() @async ? "async" : "sync" end
+
+
+ def decl_ctor_opeq()
+ genl
+ genl "#{@classname}();"
+ genl "#{@classname}(const #{@version_base}& other);"
+ genl "#{@classname}& operator=(const #{@version_base}& other);"
+ end
+
+ def defn_ctor_opeq(inline="")
+ genl
+ genl "#{inline} #{@classname}::#{@classname}() {}"
+ scope("#{inline} #{@classname}::#{@classname}(const #{@version_base}& other) {") {
+ genl "*this = other;"
+ }
+ scope("#{inline} #{@classname}& #{@classname}::operator=(const #{@version_base}& other) {") {
+ genl "impl = static_cast<const #{@classname}&>(other).impl;"
+ genl "return *this;"
+ }
+ end
+end
+
class ContentField # For extra content parameters
def cppname() "content" end
def signature() "const MethodContent& content" end
@@ -49,55 +76,52 @@ class AmqpMethod
fields_c.map { |f| "arg::keyword_tags::"+f.cppname }.join(',') +
">"
end
- def return_type()
- return "TypedResult<qpid::framing::#{result.cpptype.ret_by_val}>" if (result)
- return "Response" if (not responses().empty?)
- return "Completion"
+
+ def return_type(async)
+ if (async)
+ return "TypedResult<qpid::framing::#{result.cpptype.ret_by_val}>" if (result)
+ return "Completion"
+ else
+ return "qpid::framing::#{result.cpptype.ret_by_val}" if (result)
+ return "void"
+ end
end
+
def session_function() "#{parent.name.lcaps}#{name.caps}"; end
end
class SessionNoKeywordGen < CppGen
-
- def initialize(outdir, amqp)
+ include SyncAsync
+
+ def initialize(outdir, amqp, async)
super(outdir, amqp)
+ @async=async
@chassis="server"
@namespace,@classname,@file=
- parse_classname "qpid::client::no_keyword::Session_#{@amqp.version.bars}"
+ parse_classname "qpid::client::no_keyword::#{sync_prefix}Session_#{@amqp.version.bars}"
+ @version_base="SessionBase_#{@amqp.major}_#{@amqp.minor}"
end
def generate()
h_file(@file) {
- include "qpid/client/SessionBase.h"
-
- namespace("qpid::client") {
- genl "using std::string;"
- genl "using framing::Content;"
- genl "using framing::FieldTable;"
- genl "using framing::MethodContent;"
- genl "using framing::SequenceNumber;"
- genl "using framing::SequenceSet;"
- genl "using framing::Uuid;"
- #the following are nasty... would be better to dynamically
- #include such statements based on params required
- genl "using framing::Xid;"
- genl
- namespace("no_keyword") {
- doxygen_comment {
- genl "AMQP #{@amqp.version} session API."
- genl @amqp.class_("session").doc
+ include "qpid/client/#{@version_base}.h"
+ namespace(@namespace) {
+ doxygen_comment {
+ genl "AMQP #{@amqp.version} #{sync_adjective} session API."
+ genl @amqp.class_("session").doc
+ # FIXME aconway 2008-05-23: additional doc on sync/async use.
+ }
+ cpp_class(@classname, "public #{@version_base}") {
+ public
+ decl_ctor_opeq()
+ session_methods.each { |m|
+ genl
+ doxygen(m)
+ args=m.sig_c_default.join(", ")
+ genl "#{m.return_type(@async)} #{m.session_function}(#{args});"
}
- cpp_class(@classname, "public SessionBase") {
- public
- genl "Session_#{@amqp.version.bars}() {}"
- genl "Session_#{@amqp.version.bars}(shared_ptr<SessionImpl> core) : SessionBase(core) {}"
- session_methods.each { |m|
- genl
- doxygen(m)
- args=m.sig_c_default.join(", ")
- genl "#{m.return_type} #{m.session_function}(#{args});"
- }
- }}}}
+ }
+ }}
cpp_file(@file) {
include @classname
@@ -108,32 +132,47 @@ class SessionNoKeywordGen < CppGen
genl
sig=m.signature_c.join(", ")
func="#{@classname}::#{m.session_function}"
- scope("#{m.return_type} #{func}(#{sig}) {") {
- args=(["ProtocolVersion()"]+m.param_names).join(", ")
- body="#{m.body_name}(#{args})"
- sendargs=body
+ scope("#{m.return_type(@async)} #{func}(#{sig}) {") {
+ args=(["ProtocolVersion(#{@amqp.major},#{@amqp.minor})"]+m.param_names).join(", ")
+ genl "#{m.body_name} body(#{args});";
+ genl "body.setSync(#{@async ? 'false':'true'});"
+ sendargs="body"
sendargs << ", content" if m.content
- genl "return #{m.return_type}(impl->send(#{sendargs}), impl);"
- }}}}
+ async_retval="#{m.return_type(true)}(impl->send(#{sendargs}), impl)"
+ if @async then
+ genl "return #{async_retval};"
+ else
+ if m.result
+ genl "return #{async_retval}.get();"
+ else
+ genl "#{async_retval}.wait();"
+ end
+ end
+ }}
+ defn_ctor_opeq()
+ }}
end
end
class SessionGen < CppGen
+ include SyncAsync
- def initialize(outdir, amqp)
+ def initialize(outdir, amqp, async)
super(outdir, amqp)
+ @async=async
@chassis="server"
- session="Session_#{@amqp.version.bars}"
+ session="#{sync_prefix}Session_#{@amqp.version.bars}"
@base="no_keyword::#{session}"
@fqclass=FqClass.new "qpid::client::#{session}"
@classname=@fqclass.name
@fqbase=FqClass.new("qpid::client::#{@base}")
+ @version_base="SessionBase_#{@amqp.major}_#{@amqp.minor}"
end
- def gen_keyword_decl(m, prefix)
+ def gen_keyword_decl(m)
return if m.fields_c.empty? # Inherited function will do.
- scope("BOOST_PARAMETER_MEMFUN(#{m.return_type}, #{m.session_function}, 0, #{m.fields_c.size}, #{m.argpack_name}) {") {
- scope("return #{prefix}#{m.session_function}(",");") {
+ scope("BOOST_PARAMETER_MEMFUN(#{m.return_type(@async)}, #{m.session_function}, 0, #{m.fields_c.size}, #{m.argpack_name}) {") {
+ scope("return #{@base}::#{m.session_function}(",");") {
gen m.fields_c.map { |f| f.unpack() }.join(",\n")
}
}
@@ -143,20 +182,21 @@ class SessionGen < CppGen
def generate()
keyword_methods=session_methods.reject { |m| m.fields_c.empty? }
max_arity = keyword_methods.map{ |m| m.fields_c.size }.max
+
+ h_file("qpid/client/arg.h") {
+ # Generate keyword tag declarations.
+ genl "#define BOOST_PARAMETER_MAX_ARITY #{max_arity}"
+ include "<boost/parameter.hpp>"
+ namespace("qpid::client::arg") {
+ keyword_methods.map{ |m| m.param_names_c }.flatten.uniq.each { |k|
+ genl "BOOST_PARAMETER_KEYWORD(keyword_tags, #{k})"
+ }}
+ }
h_file(@fqclass.file) {
include @fqbase.file
- genl
- genl "#define BOOST_PARAMETER_MAX_ARITY #{max_arity}"
- include "<boost/parameter.hpp>"
- genl
+ include "qpid/client/arg"
namespace("qpid::client") {
- # Generate keyword tag declarations.
- namespace("arg") {
- keyword_methods.map{ |m| m.param_names_c }.flatten.uniq.each { |k|
- genl "BOOST_PARAMETER_KEYWORD(keyword_tags, #{k})"
- }}
- genl
# Doxygen comment.
doxygen_comment {
genl "AMQP #{@amqp.version} session API with keyword arguments."
@@ -183,17 +223,23 @@ EOS
}
# Session class.
cpp_class(@classname,"public #{@base}") {
+ public
+ decl_ctor_opeq()
private
- genl "#{@classname}(shared_ptr<SessionImpl> core) : #{ @base}(core) {}"
keyword_methods.each { |m| typedef m.argpack_type, m.argpack_name }
genl "friend class Connection;"
public
- genl "#{@classname}() {}"
- keyword_methods.each { |m| gen_keyword_decl(m,@base+"::") }
- }}}
+ keyword_methods.each { |m| gen_keyword_decl(m) }
+ }
+ genl "/** Conversion to #{@classname} from another session type */"
+ genl "inline #{@classname} #{sync_convert}(const #{@version_base}& other) { return #{@clasname}(other); }"
+ defn_ctor_opeq("inline")
+ }}
end
end
-SessionNoKeywordGen.new(ARGV[0], $amqp).generate()
-SessionGen.new(ARGV[0], $amqp).generate()
+SessionNoKeywordGen.new(ARGV[0], $amqp, true).generate()
+SessionNoKeywordGen.new(ARGV[0], $amqp, false).generate()
+SessionGen.new(ARGV[0], $amqp, true).generate()
+SessionGen.new(ARGV[0], $amqp, false).generate()
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 1c8ca9da12..0bd04ec6cb 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -324,7 +324,9 @@ libqpidclient_la_SOURCES = \
qpid/client/MessageListener.cpp \
qpid/client/Queue.cpp \
qpid/client/Results.cpp \
- qpid/client/SessionBase.cpp \
+ qpid/client/SessionBase_0_10.cpp \
+ qpid/client/SessionBase_0_10.h \
+ qpid/client/SessionBase_0_10Access.h \
qpid/client/SessionImpl.cpp \
qpid/client/StateManager.cpp \
qpid/client/SubscriptionManager.cpp
@@ -446,8 +448,9 @@ nobase_include_HEADERS = \
qpid/client/MessageQueue.h \
qpid/client/Queue.h \
qpid/client/Results.h \
- qpid/client/SessionBase.h \
+ qpid/client/SessionBase_0_10.h \
qpid/client/Session.h \
+ qpid/client/AsyncSession.h \
qpid/client/SessionImpl.h \
qpid/client/StateManager.h \
qpid/client/SubscriptionManager.h \
diff --git a/cpp/src/qpid/SessionId.h b/cpp/src/qpid/SessionId.h
index 08553e8b1d..291c42a2bb 100644
--- a/cpp/src/qpid/SessionId.h
+++ b/cpp/src/qpid/SessionId.h
@@ -27,7 +27,17 @@
namespace qpid {
-/** Identifier for a session */
+/** Identifier for a session.
+ * There are two parts to a session identifier:
+ *
+ * getUserId() returns the authentication principal associated with
+ * the session's connection.
+ *
+ * getName() returns the session name.
+ *
+ * The name must be unique among sessions with the same authentication
+ * principal.
+ */
class SessionId : boost::totally_ordered1<SessionId> {
std::string userId;
std::string name;
diff --git a/cpp/src/qpid/client/AckPolicy.h b/cpp/src/qpid/client/AckPolicy.h
index 8d62b6f4f2..d00df1ef26 100644
--- a/cpp/src/qpid/client/AckPolicy.h
+++ b/cpp/src/qpid/client/AckPolicy.h
@@ -22,6 +22,7 @@
*/
#include "qpid/framing/SequenceSet.h"
+#include "qpid/client/AsyncSession.h"
namespace qpid {
namespace client {
@@ -44,7 +45,7 @@ class AckPolicy
*/
AckPolicy(size_t n=1) : interval(n), count(n) {}
- void ack(const Message& msg, Session& session) {
+ void ack(const Message& msg, AsyncSession session) {
accepted.add(msg.getId());
if (!interval) return;
if (--count==0) {
@@ -57,7 +58,7 @@ class AckPolicy
}
}
- void ackOutstanding(Session& session) {
+ void ackOutstanding(AsyncSession session) {
if (!accepted.empty()) {
session.messageAccept(accepted);
accepted.clear();
diff --git a/cpp/src/qpid/client/AsyncSession.h b/cpp/src/qpid/client/AsyncSession.h
new file mode 100644
index 0000000000..150aabe191
--- /dev/null
+++ b/cpp/src/qpid/client/AsyncSession.h
@@ -0,0 +1,38 @@
+#ifndef QPID_CLIENT_ASYNCSESSION_H
+#define QPID_CLIENT_ASYNCSESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/AsyncSession_0_10.h"
+
+namespace qpid {
+namespace client {
+
+/**
+ * AsyncSession is an alias for Session_0_10
+ *
+ * \ingroup clientapi
+ */
+typedef AsyncSession_0_10 AsyncSession;
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_ASYNCSESSION_H*/
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index 82d1eac8b4..bec2b0345d 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -22,6 +22,7 @@
#include "ConnectionSettings.h"
#include "Message.h"
#include "SessionImpl.h"
+#include "SessionBase_0_10Access.h"
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
@@ -72,18 +73,16 @@ void Connection::open(const ConnectionSettings& settings)
max_frame_size = impl->getNegotiatedSettings().maxFrameSize;
}
-Session Connection::newSession(SynchronousMode sync,
- uint32_t detachedLifetime)
-{
+Session Connection::newSession(const std::string& name) {
if (!impl)
throw Exception(QPID_MSG("Connection has not yet been opened"));
-
- shared_ptr<SessionImpl> core(
- new SessionImpl(impl, ++channelIdCounter, max_frame_size));
- core->setSync(sync);
- impl->addSession(core);
- core->open(detachedLifetime);
- return Session(core);
+ shared_ptr<SessionImpl> simpl(
+ new SessionImpl(name, impl, ++channelIdCounter, max_frame_size));
+ impl->addSession(simpl);
+ simpl->open(0);
+ Session s;
+ SessionBase_0_10Access(s).set(simpl);
+ return s;
}
void Connection::resume(Session& session) {
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index 0c01c77509..5337a20bfa 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -26,7 +26,6 @@
#include "ConnectionImpl.h"
#include "qpid/client/Session.h"
#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/framing/Uuid.h"
namespace qpid {
@@ -111,14 +110,11 @@ class Connection
* multiple streams of work to be multiplexed over the same
* connection.
*
- *@param detachedLifetime: A session may be detached from its
- * channel, either by calling Session::suspend() or because of a
- * network failure. The session state is preserved for
- * detachedLifetime seconds to allow a call to resume(). After
- * that the broker may discard the session state. Default is 0,
- * meaning the session cannot be resumed.
+ *@param name: A name to identify the session. @see qpid::SessionId
+ * If the name is empty (the default) then a unique name will be
+ * chosen using a Universally-unique identifier (UUID) algorithm.
*/
- Session newSession(SynchronousMode sync, uint32_t detachedLifetime=0);
+ Session newSession(const std::string& name=std::string());
/**
* Resume a suspended session. A session may be resumed
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index 2bbe5a122f..0bd0cb9d08 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -37,7 +37,8 @@ using qpid::sys::Thread;
namespace qpid {
namespace client {
-Subscriber::Subscriber(Session& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {}
+Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a)
+ : session(s), listener(l), autoAck(a) {}
void Subscriber::received(Message& msg)
{
@@ -47,7 +48,7 @@ void Subscriber::received(Message& msg)
}
}
-Dispatcher::Dispatcher(Session& s, const std::string& q)
+Dispatcher::Dispatcher(const Session& s, const std::string& q)
: session(s), running(false), autoStop(true)
{
queue = q.empty() ?
@@ -88,7 +89,7 @@ void Dispatcher::run()
}
}
}
- session.sync(); // Make sure all our acks are received before returning.
+ sync(session).sync(); // Make sure all our acks are received before returning.
}
catch (const ClosedException&) {} //ignore it and return
catch (const std::exception& e) {
diff --git a/cpp/src/qpid/client/Dispatcher.h b/cpp/src/qpid/client/Dispatcher.h
index e23d0c198c..1b31ddf4cf 100644
--- a/cpp/src/qpid/client/Dispatcher.h
+++ b/cpp/src/qpid/client/Dispatcher.h
@@ -37,13 +37,13 @@ namespace client {
class Subscriber : public MessageListener
{
- Session& session;
+ AsyncSession session;
MessageListener* const listener;
AckPolicy autoAck;
public:
typedef boost::shared_ptr<Subscriber> shared_ptr;
- Subscriber(Session& session, MessageListener* listener, AckPolicy);
+ Subscriber(const Session& session, MessageListener* listener, AckPolicy);
void received(Message& msg);
};
@@ -55,7 +55,7 @@ class Dispatcher : public sys::Runnable
typedef std::map<std::string, Subscriber::shared_ptr> Listeners;
sys::Mutex lock;
sys::Thread worker;
- Session& session;
+ Session session;
Demux::QueuePtr queue;
bool running;
bool autoStop;
@@ -67,7 +67,7 @@ class Dispatcher : public sys::Runnable
bool isStopped();
public:
- Dispatcher(Session& session, const std::string& queue = "");
+ Dispatcher(const Session& session, const std::string& queue = "");
void start();
void run();
diff --git a/cpp/src/qpid/client/Session.h b/cpp/src/qpid/client/Session.h
index 84831ec442..bdabd26c82 100644
--- a/cpp/src/qpid/client/Session.h
+++ b/cpp/src/qpid/client/Session.h
@@ -33,6 +33,7 @@ namespace client {
*/
typedef Session_0_10 Session;
+
}} // namespace qpid::client
#endif /*!QPID_CLIENT_SESSION_H*/
diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp
deleted file mode 100644
index dfd0f62e7e..0000000000
--- a/cpp/src/qpid/client/SessionBase.cpp
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "SessionBase.h"
-#include "qpid/framing/all_method_bodies.h"
-
-namespace qpid {
-namespace client {
-using namespace framing;
-
-SessionBase::SessionBase() {}
-SessionBase::~SessionBase() {}
-SessionBase::SessionBase(shared_ptr<SessionImpl> core) : impl(core) {}
-void SessionBase::suspend() { impl->suspend(); }
-void SessionBase::close() { impl->close(); }
-
-void SessionBase::setSynchronous(bool isSync) { impl->setSync(isSync); }
-void SessionBase::setSynchronous(SynchronousMode m) { impl->setSync(m); }
-bool SessionBase::isSynchronous() const { return impl->isSync(); }
-SynchronousMode SessionBase::getSynchronous() const {
- return SynchronousMode(impl->isSync());
-}
-
-Execution& SessionBase::getExecution()
-{
- return *impl;
-}
-
-void SessionBase::flush()
-{
- impl->sendFlush();
-}
-
-// FIXME aconway 2008-04-24: do we need to provide a non-synchronous version
-// of sync() or bool paramter to allow setting a sync point for a later wait?
-void SessionBase::sync()
-{
- ExecutionSyncBody b;
- b.setSync(true);
- impl->send(b).wait(*impl);
-}
-
-void SessionBase::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer)
-{
- impl->markCompleted(id, cumulative, notifyPeer);
-}
-
-void SessionBase::sendCompletion()
-{
- impl->sendCompletion();
-}
-
-Uuid SessionBase::getId() const { return impl->getId(); }
-framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
-
-SessionBase::ScopedSync::ScopedSync(SessionBase& s) : session(s), change(!s.isSynchronous())
-{
- if (change) session.setSynchronous(true);
-}
-
-SessionBase::ScopedSync::~ScopedSync()
-{
- if (change) session.setSynchronous(false);
-}
-
-
-}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h
deleted file mode 100644
index 7f4a27dc09..0000000000
--- a/cpp/src/qpid/client/SessionBase.h
+++ /dev/null
@@ -1,148 +0,0 @@
-#ifndef QPID_CLIENT_SESSIONBASE_H
-#define QPID_CLIENT_SESSIONBASE_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/framing/Uuid.h"
-#include "qpid/framing/amqp_structs.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/framing/MethodContent.h"
-#include "qpid/framing/TransferContent.h"
-#include "qpid/client/Completion.h"
-#include "qpid/client/ConnectionImpl.h"
-#include "qpid/client/Execution.h"
-#include "qpid/client/SessionImpl.h"
-#include "qpid/client/TypedResult.h"
-#include "qpid/shared_ptr.h"
-#include <string>
-
-namespace qpid {
-namespace client {
-
-using std::string;
-using framing::Content;
-using framing::FieldTable;
-using framing::MethodContent;
-using framing::SequenceNumberSet;
-using framing::Uuid;
-
-enum CreditUnit { MESSAGE=0, BYTE=1 };
-
-/** \defgroup clientapi Synchronous mode of a session.
- *
- * SYNC means that Session functions do not return until the remote
- * broker has confirmed that the command was executed.
- *
- * ASYNC means that the client sends commands asynchronously, Session
- * functions return immediately.
- *
- * ASYNC mode gives better performance for high-volume traffic, but
- * requires some additional caution.
- *
- * Session functions return immediately. If the command causes an
- * exception on the broker, the exception will be thrown on a
- * <em>later</em> function call.
- *
- * If you need to notify some extenal agent that some actions have
- * been taken (e.g. binding queues to exchanges), you must call
- * Session::sync() first to ensure that all the commands are complete.
- *
- * You can freely switch between modes by calling Session::setSynchronous().
- *
- * @see Session::sync(), Session::setSynchronous()
- */
-enum SynchronousMode { SYNC=true, ASYNC=false };
-
-
-/**
- * Basic session operations that are not derived from AMQP XML methods.
- */
-class SessionBase
-{
- public:
- /**
- * Instances of this class turn synchronous mode on for the
- * duration of their scope (and revert back to async if required
- * afterwards).
- */
- class ScopedSync
- {
- SessionBase& session;
- const bool change;
- public:
- ScopedSync(SessionBase& s);
- ~ScopedSync();
- };
-
-
- SessionBase();
- ~SessionBase();
-
- /** Get the next message frame-set from the session. */
- framing::FrameSet::shared_ptr get();
-
- /** Get the session ID */
- Uuid getId() const;
-
- /**
- * In synchronous mode, wait for the broker's response before
- * returning. This gives lower throughput than asynchronous
- * mode.
- *
- * In asynchronous mode commands are sent without waiting
- * for a response (you can use the returned Completion object
- * to wait for completion).
- *
- * @see SynchronousMode
- */
- void setSynchronous(SynchronousMode mode);
- void setSynchronous(bool set);
- bool isSynchronous() const;
- SynchronousMode getSynchronous() const;
-
- /**
- * Suspend the session, which can be resumed on a different connection.
- * @see Connection::resume()
- */
- void suspend();
-
- /** Close the session */
- void close();
-
- Execution& getExecution();
- void sync();
- void flush();
- void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
- void sendCompletion();
-
- typedef framing::TransferContent DefaultContent;
-
- protected:
- shared_ptr<SessionImpl> impl;
- framing::ProtocolVersion version;
- friend class Connection;
- SessionBase(shared_ptr<SessionImpl>);
-};
-
-}} // namespace qpid::client
-
-#endif /*!QPID_CLIENT_SESSIONBASE_H*/
diff --git a/cpp/src/qpid/client/SessionBase_0_10.cpp b/cpp/src/qpid/client/SessionBase_0_10.cpp
new file mode 100644
index 0000000000..974acbfcf6
--- /dev/null
+++ b/cpp/src/qpid/client/SessionBase_0_10.cpp
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionBase_0_10.h"
+#include "qpid/framing/all_method_bodies.h"
+
+namespace qpid {
+namespace client {
+using namespace framing;
+
+SessionBase_0_10::SessionBase_0_10() {}
+SessionBase_0_10::~SessionBase_0_10() {}
+
+void SessionBase_0_10::close() { impl->close(); }
+
+Execution& SessionBase_0_10::getExecution()
+{
+ return *impl;
+}
+
+void SessionBase_0_10::flush()
+{
+ impl->sendFlush();
+}
+
+void SessionBase_0_10::sync()
+{
+ ExecutionSyncBody b;
+ b.setSync(true);
+ impl->send(b).wait(*impl);
+}
+
+void SessionBase_0_10::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer)
+{
+ impl->markCompleted(id, cumulative, notifyPeer);
+}
+
+void SessionBase_0_10::sendCompletion()
+{
+ impl->sendCompletion();
+}
+
+SessionId SessionBase_0_10::getId() const { return impl->getId(); }
+framing::FrameSet::shared_ptr SessionBase_0_10::get() { return impl->get(); }
+
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionBase_0_10.h b/cpp/src/qpid/client/SessionBase_0_10.h
new file mode 100644
index 0000000000..f9ced049a9
--- /dev/null
+++ b/cpp/src/qpid/client/SessionBase_0_10.h
@@ -0,0 +1,104 @@
+#ifndef QPID_CLIENT_SESSIONBASE_H
+#define QPID_CLIENT_SESSIONBASE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/SessionId.h"
+#include "qpid/framing/amqp_structs.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/MethodContent.h"
+#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Completion.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Execution.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/TypedResult.h"
+#include "qpid/shared_ptr.h"
+#include <string>
+
+namespace qpid {
+namespace client {
+
+using std::string;
+using framing::Content;
+using framing::FieldTable;
+using framing::MethodContent;
+using framing::SequenceNumber;
+using framing::SequenceSet;
+using framing::SequenceNumberSet;
+using qpid::SessionId;
+using framing::Xid;
+
+enum CreditUnit { MESSAGE=0, BYTE=1 };
+
+/**
+ * Base class for handles to an AMQP session.
+ *
+ * Subclasses provide the AMQP commands for a given
+ * version of the protocol.
+ */
+class SessionBase_0_10 {
+ public:
+
+ typedef framing::TransferContent DefaultContent;
+
+ SessionBase_0_10();
+ ~SessionBase_0_10();
+
+ /** Get the next message frame-set from the session. */
+ framing::FrameSet::shared_ptr get();
+
+ /** Get the session ID */
+ SessionId getId() const;
+
+ /** Close the session.
+ * A session is automatically closed when all handles to it are destroyed.
+ */
+ void close();
+
+ /** Synchronize the session: sync() waits until all commands
+ * issued on this session have been completed. It is equivalent to
+ * calling Session::executionSync()
+ *
+ * Note sync() is always synchronous, even on an AsyncSession object
+ * because that's almost always what you want. You can call
+ * AsyncSession::executionSync() directly in the unusual event
+ * that you want to do an asynchronous sync.
+ */
+ void sync();
+
+ /** Set the timeout for this session. */
+ uint32_t timeout(uint32_t seconds);
+
+ Execution& getExecution();
+ void flush();
+ void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
+ void sendCompletion();
+
+ protected:
+ boost::shared_ptr<SessionImpl> impl;
+ friend class SessionBase_0_10Access;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SESSIONBASE_H*/
diff --git a/cpp/src/qpid/client/SessionBase_0_10Access.h b/cpp/src/qpid/client/SessionBase_0_10Access.h
new file mode 100644
index 0000000000..e2189a53dd
--- /dev/null
+++ b/cpp/src/qpid/client/SessionBase_0_10Access.h
@@ -0,0 +1,42 @@
+#ifndef QPID_CLIENT_SESSIONBASEACCESS_H
+#define QPID_CLIENT_SESSIONBASEACCESS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/SessionBase_0_10.h"
+
+/**@file @internal Internal use only */
+
+namespace qpid {
+namespace client {
+
+class SessionBase_0_10Access {
+ public:
+ SessionBase_0_10Access(SessionBase_0_10& sb_) : sb(sb_) {}
+ void set(const boost::shared_ptr<SessionImpl>& si) { sb.impl = si; }
+ boost::shared_ptr<SessionImpl> get() { return sb.impl; }
+ private:
+ SessionBase_0_10& sb;
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SESSIONBASEACCESS_H*/
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 18b573b464..58f4bc0aa7 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -48,18 +48,16 @@ typedef sys::Monitor::ScopedUnlock UnLock;
typedef sys::ScopedLock<sys::Semaphore> Acquire;
-SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn,
+SessionImpl::SessionImpl(const std::string& name,
+ shared_ptr<ConnectionImpl> conn,
uint16_t ch, uint64_t _maxFrameSize)
: error(OK),
code(NORMAL),
text(EMPTY),
state(INACTIVE),
- syncMode(false),
detachedLifetime(0),
maxFrameSize(_maxFrameSize),
- id(true),//generate unique uuid for each session
- name(id.str()),
- //TODO: may want to allow application defined names instead
+ id(conn->getNegotiatedSettings().username, name.empty() ? Uuid(true).str() : name),
connection(conn),
ioHandler(*this),
channel(ch),
@@ -81,15 +79,6 @@ SessionImpl::~SessionImpl() {
connection->erase(channel);
}
-void SessionImpl::setSync(bool s) // user thread
-{
- syncMode = s; //syncMode is volatile
-}
-
-bool SessionImpl::isSync() // user thread
-{
- return syncMode; //syncMode is volatile
-}
FrameSet::shared_ptr SessionImpl::get() // user thread
{
@@ -97,7 +86,7 @@ FrameSet::shared_ptr SessionImpl::get() // user thread
return demux.getDefault()->pop();
}
-const Uuid SessionImpl::getId() const //user thread
+const SessionId SessionImpl::getId() const //user thread
{
return id; //id is immutable
}
@@ -107,7 +96,7 @@ void SessionImpl::open(uint32_t timeout) // user thread
Lock l(state);
if (state == INACTIVE) {
setState(ATTACHING);
- proxy.attach(name, false);
+ proxy.attach(id.getName(), false);
waitFor(ATTACHED);
//TODO: timeout will not be set locally until get response to
//confirm, should we wait for that?
@@ -144,7 +133,7 @@ void SessionImpl::suspend() //user thread
void SessionImpl::detach() //call with lock held
{
if (state == ATTACHED) {
- proxy.detach(name);
+ proxy.detach(id.getName());
setState(DETACHING);
}
}
@@ -285,15 +274,11 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con
{
Acquire a(sendLock);
SequenceNumber id = nextOut++;
- bool sync;
{
Lock l(state);
checkOpen();
incompleteOut.add(id);
- sync = syncMode;
}
-
- if (sync) command.getMethod()->setSync(true);
Future f(id);
if (command.getMethod()->resultExpected()) {
Lock l(state);
@@ -308,9 +293,6 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con
if (content) {
sendContent(*content);
}
- if (sync) {
- waitForCompletion(id);
- }
return f;
}
void SessionImpl::sendContent(const MethodContent& content)
@@ -441,27 +423,27 @@ void SessionImpl::attach(const std::string& /*name*/, bool /*force*/)
void SessionImpl::attached(const std::string& _name)
{
Lock l(state);
- if (name != _name) throw InternalErrorException("Incorrect session name");
+ if (id.getName() != _name) throw InternalErrorException("Incorrect session name");
setState(ATTACHED);
}
void SessionImpl::detach(const std::string& _name)
{
Lock l(state);
- if (name != _name) throw InternalErrorException("Incorrect session name");
+ if (id.getName() != _name) throw InternalErrorException("Incorrect session name");
setState(DETACHED);
- QPID_LOG(info, "Session detached by peer: " << name);
+ QPID_LOG(info, "Session detached by peer: " << id);
}
void SessionImpl::detached(const std::string& _name, uint8_t _code)
{
Lock l(state);
- if (name != _name) throw InternalErrorException("Incorrect session name");
+ if (id.getName() != _name) throw InternalErrorException("Incorrect session name");
setState(DETACHED);
if (_code) {
//TODO: make sure this works with execution.exception - don't
//want to overwrite the code from that
- QPID_LOG(error, "Session detached by peer: " << name << " " << code);
+ QPID_LOG(error, "Session detached by peer: " << id << " " << code);
error = SESSION_DETACH;
code = _code;
text = "Session detached by peer";
@@ -561,8 +543,6 @@ void SessionImpl::gap(const framing::SequenceSet& /*commands*/)
throw NotImplementedException("gap not yet supported");
}
-
-
void SessionImpl::sync() {}
void SessionImpl::result(const framing::SequenceNumber& commandId, const std::string& value)
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index 0bcec4dd0c..7bb7136912 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -26,6 +26,7 @@
#include "Execution.h"
#include "Results.h"
+#include "qpid/SessionId.h"
#include "qpid/shared_ptr.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/ChannelHandler.h"
@@ -59,14 +60,14 @@ class SessionImpl : public framing::FrameHandler::InOutHandler,
private framing::AMQP_ClientOperations::ExecutionHandler
{
public:
- SessionImpl(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
+ SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
~SessionImpl();
//NOTE: Public functions called in user thread.
framing::FrameSet::shared_ptr get();
- const framing::Uuid getId() const;
+ const SessionId getId() const;
uint16_t getChannel() const;
void setChannel(uint16_t channel);
@@ -76,8 +77,6 @@ public:
void resume(shared_ptr<ConnectionImpl>);
void suspend();
- void setSync(bool s);
- bool isSync();
void assertOpen() const;
Future send(const framing::AMQBody& command);
@@ -131,7 +130,8 @@ private:
Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
void sendContent(const framing::MethodContent&);
void waitForCompletionImpl(const framing::SequenceNumber& id);
-
+ void requestTimeout(uint32_t timeout);
+
void sendCompletionImpl();
// Note: Following methods are called by network thread in
@@ -140,7 +140,6 @@ private:
void attached(const std::string& name);
void detach(const std::string& name);
void detached(const std::string& name, uint8_t detachCode);
- void requestTimeout(uint32_t timeout);
void timeout(uint32_t timeout);
void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);
void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
@@ -167,11 +166,9 @@ private:
std::string text; // Error text
mutable StateMonitor state;
mutable sys::Semaphore sendLock;
- volatile bool syncMode;
uint32_t detachedLifetime;
const uint64_t maxFrameSize;
- const framing::Uuid id;
- const std::string name;
+ const SessionId id;
shared_ptr<ConnectionImpl> connection;
framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index 2ba3f5fe62..6036f153f6 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -32,23 +32,23 @@
namespace qpid {
namespace client {
-SubscriptionManager::SubscriptionManager(Session& s)
+SubscriptionManager::SubscriptionManager(const Session& s)
: dispatcher(s), session(s),
messages(UNLIMITED), bytes(UNLIMITED), window(true),
acceptMode(0), acquireMode(0),
autoStop(true)
{}
-Completion SubscriptionManager::subscribeInternal(
+void SubscriptionManager::subscribeInternal(
const std::string& q, const std::string& dest)
{
- Completion c = session.messageSubscribe(arg::queue=q, arg::destination=dest,
- arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
- setFlowControl(dest, messages, bytes, window);
- return c;
+ async(session).messageSubscribe( // setFlowControl will sync.
+ arg::queue=q, arg::destination=dest,
+ arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
+ setFlowControl(dest, messages, bytes, window);
}
-Completion SubscriptionManager::subscribe(
+void SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const std::string& d)
{
std::string dest=d.empty() ? q:d;
@@ -56,7 +56,7 @@ Completion SubscriptionManager::subscribe(
return subscribeInternal(q, dest);
}
-Completion SubscriptionManager::subscribe(
+void SubscriptionManager::subscribe(
LocalQueue& lq, const std::string& q, const std::string& d)
{
std::string dest=d.empty() ? q:d;
@@ -68,9 +68,9 @@ Completion SubscriptionManager::subscribe(
void SubscriptionManager::setFlowControl(
const std::string& dest, uint32_t messages, uint32_t bytes, bool window)
{
- session.messageSetFlowMode(dest, window);
- session.messageFlow(dest, 0, messages);
- session.messageFlow(dest, 1, bytes);
+ async(session).messageSetFlowMode(dest, window);
+ async(session).messageFlow(dest, 0, messages);
+ session.messageFlow(dest, 1, bytes); // Only need one sync
}
void SubscriptionManager::setFlowControl(
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 4ccb95c968..4ff962f67b 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -45,10 +45,10 @@ class SubscriptionManager : public sys::Runnable
typedef sys::Mutex::ScopedLock Lock;
typedef sys::Mutex::ScopedUnlock Unlock;
- Completion subscribeInternal(const std::string& q, const std::string& dest);
+ void subscribeInternal(const std::string& q, const std::string& dest);
qpid::client::Dispatcher dispatcher;
- qpid::client::Session& session;
+ qpid::client::Session session;
uint32_t messages;
uint32_t bytes;
bool window;
@@ -58,7 +58,7 @@ class SubscriptionManager : public sys::Runnable
bool autoStop;
public:
- SubscriptionManager(Session& session);
+ SubscriptionManager(const Session& session);
/**
* Subscribe a MessagesListener to receive messages from queue.
@@ -68,7 +68,7 @@ class SubscriptionManager : public sys::Runnable
*@param tag Unique destination tag for the listener.
* If not specified, the queue name is used.
*/
- Completion subscribe(MessageListener& listener,
+ void subscribe(MessageListener& listener,
const std::string& queue,
const std::string& tag=std::string());
@@ -79,7 +79,7 @@ class SubscriptionManager : public sys::Runnable
*@param tag Unique destination tag for the listener.
* If not specified, the queue name is used.
*/
- Completion subscribe(LocalQueue& localQueue,
+ void subscribe(LocalQueue& localQueue,
const std::string& queue,
const std::string& tag=std::string());
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index 83b3f621c7..31f63d71a0 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -92,7 +92,7 @@ struct SessionFixtureT : BrokerFixture {
qpid::client::LocalQueue lq;
SessionFixtureT() : connection(broker->getPort()),
- session(connection.newSession(qpid::client::ASYNC)),
+ session(connection.newSession("SessionFixture")),
subs(session)
{}
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 801e33d412..1dade47ee9 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -106,19 +106,19 @@ struct ClientSessionFixture : public ProxySessionFixture
QPID_AUTO_TEST_CASE(testQueueQuery) {
ClientSessionFixture fix;
- fix.session = fix.connection.newSession(ASYNC);
+ fix.session = fix.connection.newSession();
fix.session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true);
- TypedResult<QueueQueryResult> result = fix.session.queueQuery(string("my-queue"));
- BOOST_CHECK_EQUAL(false, result.get().getDurable());
- BOOST_CHECK_EQUAL(true, result.get().getExclusive());
+ QueueQueryResult result = fix.session.queueQuery(string("my-queue"));
+ BOOST_CHECK_EQUAL(false, result.getDurable());
+ BOOST_CHECK_EQUAL(true, result.getExclusive());
BOOST_CHECK_EQUAL(string("amq.fanout"),
- result.get().getAlternateExchange());
+ result.getAlternateExchange());
}
QPID_AUTO_TEST_CASE(testTransfer)
{
ClientSessionFixture fix;
- fix.session=fix.connection.newSession(ASYNC);
+ fix.session=fix.connection.newSession();
fix.declareSubscribe();
fix.session.messageTransfer(acceptMode=1, content=TransferContent("my-message", "my-queue"));
//get & test the message:
@@ -133,7 +133,7 @@ QPID_AUTO_TEST_CASE(testTransfer)
QPID_AUTO_TEST_CASE(testDispatcher)
{
ClientSessionFixture fix;
- fix.session =fix.connection.newSession(ASYNC);
+ fix.session =fix.connection.newSession();
fix.declareSubscribe();
size_t count = 100;
for (size_t i = 0; i < count; ++i)
@@ -148,7 +148,7 @@ QPID_AUTO_TEST_CASE(testDispatcher)
QPID_AUTO_TEST_CASE(testDispatcherThread)
{
ClientSessionFixture fix;
- fix.session =fix.connection.newSession(ASYNC);
+ fix.session =fix.connection.newSession();
fix.declareSubscribe();
size_t count = 10;
DummyListener listener(fix.session, "my-dest", count);
@@ -162,40 +162,42 @@ QPID_AUTO_TEST_CASE(testDispatcherThread)
BOOST_CHECK_EQUAL(lexical_cast<string>(i), listener.messages[i].getData());
}
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1)
-{
- ClientSessionFixture fix;
- fix.session =fix.connection.newSession(ASYNC, 0);
- fix.session.suspend(); // session has 0 timeout.
- try {
- fix.connection.resume(fix.session);
- BOOST_FAIL("Expected InvalidArgumentException.");
- } catch(const InternalErrorException&) {}
-}
-
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUseSuspendedError, 1)
-{
- ClientSessionFixture fix;
- fix.session =fix.connection.newSession(ASYNC, 60);
- fix.session.suspend();
- try {
- fix.session.exchangeQuery(name="amq.fanout");
- BOOST_FAIL("Expected session suspended exception");
- } catch(const CommandInvalidException&) {}
-}
+// FIXME aconway 2008-05-26: Re-enable with final resume implementation.
+//
+// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1)
+// {
+// ClientSessionFixture fix;
+// fix.session.suspend(); // session has 0 timeout.
+// try {
+// fix.connection.resume(fix.session);
+// BOOST_FAIL("Expected InvalidArgumentException.");
+// } catch(const InternalErrorException&) {}
+// }
+
+// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUseSuspendedError, 1)
+// {
+// ClientSessionFixture fix;
+// fix.session =fix.session.timeout(60);
+// fix.session.suspend();
+// try {
+// fix.session.exchangeQuery(name="amq.fanout");
+// BOOST_FAIL("Expected session suspended exception");
+// } catch(const CommandInvalidException&) {}
+// }
+
+// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1)
+// {
+// ClientSessionFixture fix;
+// fix.session.timeout(60);
+// fix.declareSubscribe();
+// fix.session.suspend();
+// // Make sure we are still subscribed after resume.
+// fix.connection.resume(fix.session);
+// fix.session.messageTransfer(content=TransferContent("my-message", "my-queue"));
+// FrameSet::shared_ptr msg = fix.session.get();
+// BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
+// }
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1)
-{
- ClientSessionFixture fix;
- fix.session =fix.connection.newSession(ASYNC, 60);
- fix.declareSubscribe();
- fix.session.suspend();
- // Make sure we are still subscribed after resume.
- fix.connection.resume(fix.session);
- fix.session.messageTransfer(content=TransferContent("my-message", "my-queue"));
- FrameSet::shared_ptr msg = fix.session.get();
- BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
-}
QPID_AUTO_TEST_CASE(testSendToSelf) {
ClientSessionFixture fix;
diff --git a/cpp/src/tests/XmlClientSessionTest.cpp b/cpp/src/tests/XmlClientSessionTest.cpp
index dc6cce24fc..9b0c035f37 100644
--- a/cpp/src/tests/XmlClientSessionTest.cpp
+++ b/cpp/src/tests/XmlClientSessionTest.cpp
@@ -121,7 +121,7 @@ struct ClientSessionFixture : public ProxySessionFixture
QPID_AUTO_TEST_CASE(testXmlBinding) {
ClientSessionFixture f;
- Session session = f.connection.newSession(ASYNC);
+ Session session = f.connection.newSession();
SubscriptionManager subscriptions(session);
SubscribedLocalQueue localQueue(subscriptions);
diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp
index 20e8b21a3a..04269b299d 100644
--- a/cpp/src/tests/client_test.cpp
+++ b/cpp/src/tests/client_test.cpp
@@ -92,7 +92,7 @@ int main(int argc, char** argv)
//Create and open a session on the connection through which
//most functionality is exposed:
- Session session = connection.newSession(ASYNC);
+ Session session = connection.newSession();
if (opts.verbose) std::cout << "Opened session." << std::endl;
diff --git a/cpp/src/tests/consume.cpp b/cpp/src/tests/consume.cpp
index 6d2f0a7413..43e08a80b7 100644
--- a/cpp/src/tests/consume.cpp
+++ b/cpp/src/tests/consume.cpp
@@ -62,7 +62,7 @@ struct Client
Client()
{
opts.open(connection);
- session = connection.newSession(ASYNC);
+ session = connection.newSession();
}
void consume()
diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp
index f75269c959..a656e0cf1a 100644
--- a/cpp/src/tests/exception_test.cpp
+++ b/cpp/src/tests/exception_test.cpp
@@ -96,7 +96,6 @@ QPID_AUTO_TEST_CASE(DisconnectedListen) {
QPID_AUTO_TEST_CASE(NoSuchQueueTest) {
ProxySessionFixture fix;
- fix.session.setSynchronous(true);
BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue"), NotFoundException);
}
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp
index 0b343d0243..f4cbade36b 100644
--- a/cpp/src/tests/latencytest.cpp
+++ b/cpp/src/tests/latencytest.cpp
@@ -30,7 +30,7 @@
#include "TestOptions.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
-#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
using namespace qpid;
@@ -99,7 +99,7 @@ class Client : public Runnable
{
protected:
Connection connection;
- Session session;
+ AsyncSession session;
Thread thread;
string queue;
@@ -157,7 +157,7 @@ public:
Client::Client(const string& q) : queue(q)
{
opts.open(connection);
- session = connection.newSession(ASYNC);
+ session = connection.newSession();
}
void Client::start()
@@ -262,7 +262,7 @@ void Sender::sendByCount()
uint64_t sentAt(current_time());
msg.getDeliveryProperties().setTimestamp(sentAt);
//msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
- session.messageTransfer(arg::content=msg, arg::acceptMode=1);
+ async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
}
session.sync();
}
@@ -283,7 +283,7 @@ void Sender::sendByRate()
uint64_t sentAt(current_time());
msg.getDeliveryProperties().setTimestamp(sentAt);
//msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
- session.messageTransfer(arg::content=msg, arg::acceptMode=1);
+ async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
}
uint64_t timeTaken = (current_time() - start) / TIME_USEC;
if (timeTaken < 1000) {
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index 2a8a9ec17c..91ecd83f50 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -21,7 +21,7 @@
#include "TestOptions.h"
-#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Completion.h"
@@ -194,12 +194,12 @@ Opts opts;
struct Client : public Runnable {
Connection connection;
- Session session;
+ AsyncSession session;
Thread thread;
Client() {
opts.open(connection);
- session = connection.newSession(ASYNC);
+ session = connection.newSession();
}
~Client() {
@@ -431,7 +431,7 @@ struct PublishThread : public Client {
offset = 5;
data += "data:";//marker (requested for latency testing tool scripts)
data += string(sizeof(size_t), 'X');//space for seq no
- data += string(reinterpret_cast<const char*>(session.getId().data()), session.getId().size());
+ data += session.getId().str();
if (opts.size > data.size()) {
data += string(opts.size - data.size(), 'X');
} else if(opts.size < data.size()) {
diff --git a/cpp/src/tests/publish.cpp b/cpp/src/tests/publish.cpp
index 17e3d4e104..b78f3fdf6d 100644
--- a/cpp/src/tests/publish.cpp
+++ b/cpp/src/tests/publish.cpp
@@ -28,7 +28,7 @@
#include "TestOptions.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
-#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
using namespace qpid;
@@ -61,12 +61,12 @@ Args opts;
struct Client
{
Connection connection;
- Session session;
+ AsyncSession session;
Client()
{
opts.open(connection);
- session = connection.newSession(ASYNC);
+ session = connection.newSession();
}
std::string id(uint i)
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp
index 8f0e290070..6daf928401 100644
--- a/cpp/src/tests/topic_listener.cpp
+++ b/cpp/src/tests/topic_listener.cpp
@@ -53,7 +53,7 @@ using namespace std;
* defined.
*/
class Listener : public MessageListener{
- Session& session;
+ Session session;
SubscriptionManager& mgr;
const string responseQueue;
const bool transactional;
@@ -64,7 +64,7 @@ class Listener : public MessageListener{
void shutdown();
void report();
public:
- Listener(Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx);
+ Listener(const Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx);
virtual void received(Message& msg);
};
@@ -101,7 +101,7 @@ int main(int argc, char** argv){
else {
Connection connection;
args.open(connection);
- Session session = connection.newSession(ASYNC);
+ AsyncSession session = connection.newSession();
if (args.transactional) {
session.txSelect();
}
@@ -127,7 +127,8 @@ int main(int argc, char** argv){
mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
}
mgr.subscribe(listener, control);
-
+ session.sync();
+
cout << "topic_listener: listening..." << endl;
mgr.run();
if (args.durable) {
@@ -144,7 +145,7 @@ int main(int argc, char** argv){
return 1;
}
-Listener::Listener(Session& s, SubscriptionManager& m, const string& _responseq, bool tx) :
+Listener::Listener(const Session& s, SubscriptionManager& m, const string& _responseq, bool tx) :
session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){}
void Listener::received(Message& message){
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index a6a7b4d80d..c8f0d543ec 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -37,7 +37,7 @@
#include "TestOptions.h"
#include "qpid/client/Connection.h"
#include "qpid/client/MessageListener.h"
-#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Monitor.h"
#include <unistd.h>
@@ -56,7 +56,7 @@ using namespace std;
* back by the subscribers.
*/
class Publisher {
- Session& session;
+ AsyncSession session;
SubscriptionManager mgr;
LocalQueue queue;
const string controlTopic;
@@ -66,7 +66,7 @@ class Publisher {
string generateData(int size);
public:
- Publisher(Session& session, const string& controlTopic, bool tx, bool durable);
+ Publisher(const AsyncSession& session, const string& controlTopic, bool tx, bool durable);
int64_t publish(int msgs, int listeners, int size);
void terminate();
};
@@ -107,7 +107,7 @@ int main(int argc, char** argv) {
else {
Connection connection;
args.open(connection);
- Session session = connection.newSession(ASYNC);
+ AsyncSession session = connection.newSession();
if (args.transactional) {
session.txSelect();
}
@@ -150,7 +150,7 @@ int main(int argc, char** argv) {
return 1;
}
-Publisher::Publisher(Session& _session, const string& _controlTopic, bool tx, bool d) :
+Publisher::Publisher(const AsyncSession& _session, const string& _controlTopic, bool tx, bool d) :
session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d)
{
mgr.subscribe(queue, "response");
diff --git a/cpp/src/tests/topictest b/cpp/src/tests/topictest
index c36aa319ba..ad7c5df693 100755
--- a/cpp/src/tests/topictest
+++ b/cpp/src/tests/topictest
@@ -36,5 +36,5 @@ for ((i=$SUBSCRIBERS ; i--; )); do
subscribe $i &
done
# FIXME aconway 2007-03-27: Hack around startup race. Fix topic test.
-sleep 1
+sleep 2
publish 2>&1 || exit 1
diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp
index a8369df759..6eb812738d 100644
--- a/cpp/src/tests/txtest.cpp
+++ b/cpp/src/tests/txtest.cpp
@@ -28,7 +28,7 @@
#include "TestOptions.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
-#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
using namespace qpid;
@@ -96,12 +96,12 @@ Args opts;
struct Client
{
Connection connection;
- Session session;
+ AsyncSession session;
Client()
{
opts.open(connection);
- session = connection.newSession(ASYNC);
+ session = connection.newSession();
}
~Client()