diff options
author | Alan Conway <aconway@apache.org> | 2008-05-26 18:10:05 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-05-26 18:10:05 +0000 |
commit | ce7678789fe3e8c5caebb59a26aa418fbb95e5d3 (patch) | |
tree | affd8e2de460cba285e7c25e15f5c3d94444f905 | |
parent | 0b56077cbb8b6e9cdd982cbdeaa3ec6fe1bd5434 (diff) | |
download | qpid-python-ce7678789fe3e8c5caebb59a26aa418fbb95e5d3.tar.gz |
Changes to Session API:
- Session is synchronous, no futures.
- AsyncSession is async, returns futures.
- Conversion functions sync(s) async(s) return a sync/async view of session s.
- Connection::newSession - takes name, no timeout
- SessionBase::getId - returns SessionId not UUID.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@660258 13f79535-47bb-0310-9956-ffa450edef68
44 files changed, 537 insertions, 478 deletions
diff --git a/cpp/examples/examples/direct/declare_queues.cpp b/cpp/examples/examples/direct/declare_queues.cpp index 07c9c63ff6..0cdb472665 100644 --- a/cpp/examples/examples/direct/declare_queues.cpp +++ b/cpp/examples/examples/direct/declare_queues.cpp @@ -60,7 +60,7 @@ int main(int argc, char** argv) { try { connection.open(host, port); - Session session = connection.newSession(ASYNC); + Session session = connection.newSession(); //--------- Main body of program -------------------------------------------- diff --git a/cpp/examples/examples/direct/direct_producer.cpp b/cpp/examples/examples/direct/direct_producer.cpp index 9b40f733c2..348b82efae 100644 --- a/cpp/examples/examples/direct/direct_producer.cpp +++ b/cpp/examples/examples/direct/direct_producer.cpp @@ -67,7 +67,7 @@ int main(int argc, char** argv) { Message message; try { connection.open(host, port); - Session session = connection.newSession(ASYNC); + Session session = connection.newSession(); //--------- Main body of program -------------------------------------------- @@ -85,7 +85,9 @@ int main(int argc, char** argv) { message_data << "Message " << i; message.setData(message_data.str()); - session.messageTransfer(arg::content=message, arg::destination="amq.direct"); + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + async(session).messageTransfer(arg::content=message, arg::destination="amq.direct"); } // And send a final message to indicate termination. diff --git a/cpp/examples/examples/direct/listener.cpp b/cpp/examples/examples/direct/listener.cpp index 7ee68ebf35..fc2fa96ead 100644 --- a/cpp/examples/examples/direct/listener.cpp +++ b/cpp/examples/examples/direct/listener.cpp @@ -65,7 +65,7 @@ int main(int argc, char** argv) { Message msg; try { connection.open(host, port); - Session session = connection.newSession(ASYNC); + Session session = connection.newSession(); //--------- Main body of program -------------------------------------------- diff --git a/cpp/examples/examples/fanout/fanout_producer.cpp b/cpp/examples/examples/fanout/fanout_producer.cpp index 8ae6bbc242..9a80c86c20 100644 --- a/cpp/examples/examples/fanout/fanout_producer.cpp +++ b/cpp/examples/examples/fanout/fanout_producer.cpp @@ -67,7 +67,7 @@ int main(int argc, char** argv) { Message message; try { connection.open(host, port); - Session session = connection.newSession(ASYNC); + Session session = connection.newSession(); //--------- Main body of program -------------------------------------------- @@ -83,7 +83,9 @@ int main(int argc, char** argv) { message_data << "Message " << i; message.setData(message_data.str()); - session.messageTransfer(arg::content=message, arg::destination="amq.fanout"); + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + async(session).messageTransfer(arg::content=message, arg::destination="amq.fanout"); } // And send a final message to indicate termination. diff --git a/cpp/examples/examples/fanout/listener.cpp b/cpp/examples/examples/fanout/listener.cpp index c3123bb944..79a115f99f 100644 --- a/cpp/examples/examples/fanout/listener.cpp +++ b/cpp/examples/examples/fanout/listener.cpp @@ -65,7 +65,7 @@ int main(int argc, char** argv) { Message msg; try { connection.open(host, port); - Session session = connection.newSession(ASYNC); + Session session = connection.newSession(); //--------- Main body of program -------------------------------------------- @@ -90,9 +90,6 @@ int main(int argc, char** argv) { Listener listener(subscriptions); subscriptions.subscribe(listener, myQueue); - // Wait for the broker to indicate that our queues have been created. - session.sync(); - // Deliver messages until the subscription is cancelled // by Listener::received() std::cout << "Listening" << std::endl; diff --git a/cpp/examples/examples/pub-sub/topic_listener.cpp b/cpp/examples/examples/pub-sub/topic_listener.cpp index 883d6eba42..4d854e57ff 100644 --- a/cpp/examples/examples/pub-sub/topic_listener.cpp +++ b/cpp/examples/examples/pub-sub/topic_listener.cpp @@ -91,7 +91,7 @@ void Listener::prepareQueue(std::string queue, std::string routing_key) { * the queue name parameter with the Session ID. */ - queue += session.getId().str(); + queue += session.getId().getName(); std::cout << "Declaring queue: " << queue << std::endl; /* Declare an exclusive queue on the broker @@ -138,7 +138,7 @@ int main(int argc, char** argv) { Connection connection; try { connection.open(host, port); - Session session = connection.newSession(ASYNC); + Session session = connection.newSession(); //--------- Main body of program -------------------------------------------- @@ -153,9 +153,6 @@ int main(int argc, char** argv) { listener.prepareQueue("news", "#.news"); listener.prepareQueue("weather", "#.weather"); - // Wait for the broker to indicate that our queues have been created. - session.sync(); - std::cout << "Listening for messages ..." << std::endl; // Give up control and receive messages diff --git a/cpp/examples/examples/pub-sub/topic_publisher.cpp b/cpp/examples/examples/pub-sub/topic_publisher.cpp index 94cd3a0f56..edf6ff28f7 100644 --- a/cpp/examples/examples/pub-sub/topic_publisher.cpp +++ b/cpp/examples/examples/pub-sub/topic_publisher.cpp @@ -72,7 +72,9 @@ void publish_messages(Session& session, string routing_key) message_data << "Message " << i; message.setData(message_data.str()); - session.messageTransfer(arg::content=message, arg::destination="amq.topic"); + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + async(session).messageTransfer(arg::content=message, arg::destination="amq.topic"); } } @@ -101,7 +103,7 @@ int main(int argc, char** argv) { Message message; try { connection.open(host, port); - Session session = connection.newSession(ASYNC); + Session session = connection.newSession(); //--------- Main body of program -------------------------------------------- diff --git a/cpp/examples/examples/request-response/client.cpp b/cpp/examples/examples/request-response/client.cpp index 073af596bf..eb4c00c2f1 100644 --- a/cpp/examples/examples/request-response/client.cpp +++ b/cpp/examples/examples/request-response/client.cpp @@ -114,7 +114,7 @@ int main(int argc, char** argv) { Message request; try { connection.open(host, port); - Session session = connection.newSession(ASYNC); + Session session = connection.newSession(); //--------- Main body of program -------------------------------------------- @@ -157,7 +157,9 @@ int main(int argc, char** argv) { for (int i=0; i<4; i++) { request.setData(s[i]); - session.messageTransfer(arg::content=request, arg::destination="amq.direct"); + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + async(session).messageTransfer(arg::content=request, arg::destination="amq.direct"); std::cout << "Request: " << s[i] << std::endl; } diff --git a/cpp/examples/examples/request-response/server.cpp b/cpp/examples/examples/request-response/server.cpp index 6c9bc7ffa6..1946facd0e 100644 --- a/cpp/examples/examples/request-response/server.cpp +++ b/cpp/examples/examples/request-response/server.cpp @@ -120,7 +120,10 @@ void Listener::received(Message& request) { // Send it back to the user response.getDeliveryProperties().setRoutingKey(routingKey); - session.messageTransfer(arg::content=response, arg::destination="amq.direct"); + + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + async(session).messageTransfer(arg::content=response, arg::destination="amq.direct"); } @@ -131,7 +134,7 @@ int main(int argc, char** argv) { Message message; try { connection.open(host, port); - Session session = connection.newSession(ASYNC); + Session session = connection.newSession(); //--------- Main body of program -------------------------------------------- diff --git a/cpp/examples/examples/xml-exchange/declare_queues.cpp b/cpp/examples/examples/xml-exchange/declare_queues.cpp index b89f95cd97..1307c473c5 100644 --- a/cpp/examples/examples/xml-exchange/declare_queues.cpp +++ b/cpp/examples/examples/xml-exchange/declare_queues.cpp @@ -39,7 +39,7 @@ int main(int argc, char** argv) { try { connection.open(host, port); - Session session = connection.newSession(ASYNC); + Session session = connection.newSession(); //--------- Main body of program -------------------------------------------- diff --git a/cpp/examples/examples/xml-exchange/listener.cpp b/cpp/examples/examples/xml-exchange/listener.cpp index b5e057c4b1..559cfaf8c9 100644 --- a/cpp/examples/examples/xml-exchange/listener.cpp +++ b/cpp/examples/examples/xml-exchange/listener.cpp @@ -65,7 +65,7 @@ int main(int argc, char** argv) { Message msg; try { connection.open(host, port); - Session session = connection.newSession(ASYNC); + Session session = connection.newSession(); //--------- Main body of program -------------------------------------------- diff --git a/cpp/examples/examples/xml-exchange/xml_producer.cpp b/cpp/examples/examples/xml-exchange/xml_producer.cpp index 9333e20438..42e9ec4356 100644 --- a/cpp/examples/examples/xml-exchange/xml_producer.cpp +++ b/cpp/examples/examples/xml-exchange/xml_producer.cpp @@ -44,7 +44,7 @@ int main(int argc, char** argv) { Message message; try { connection.open(host, port); - Session session = connection.newSession(ASYNC); + Session session = connection.newSession(); //--------- Main body of program -------------------------------------------- @@ -66,7 +66,9 @@ int main(int argc, char** argv) { std::cout << "Message data: " << message_data.str() << std::endl; message.setData(message_data.str()); - session.messageTransfer(arg::content=message, arg::destination="xml"); + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + async(session).messageTransfer(arg::content=message, arg::destination="xml"); } // And send a final message to indicate termination. diff --git a/cpp/rubygen/framing.0-10/Session.rb b/cpp/rubygen/framing.0-10/Session.rb index a2f06bfcd7..d679b1c0bc 100644 --- a/cpp/rubygen/framing.0-10/Session.rb +++ b/cpp/rubygen/framing.0-10/Session.rb @@ -25,6 +25,33 @@ class CppGen end end +# Sync vs. async APIs +module SyncAsync + def sync_prefix() @async ? "Async" : "" end + def sync_adjective() @async ? "asynchronous" : "synchronous" end + def sync_convert() @async ? "async" : "sync" end + + + def decl_ctor_opeq() + genl + genl "#{@classname}();" + genl "#{@classname}(const #{@version_base}& other);" + genl "#{@classname}& operator=(const #{@version_base}& other);" + end + + def defn_ctor_opeq(inline="") + genl + genl "#{inline} #{@classname}::#{@classname}() {}" + scope("#{inline} #{@classname}::#{@classname}(const #{@version_base}& other) {") { + genl "*this = other;" + } + scope("#{inline} #{@classname}& #{@classname}::operator=(const #{@version_base}& other) {") { + genl "impl = static_cast<const #{@classname}&>(other).impl;" + genl "return *this;" + } + end +end + class ContentField # For extra content parameters def cppname() "content" end def signature() "const MethodContent& content" end @@ -49,55 +76,52 @@ class AmqpMethod fields_c.map { |f| "arg::keyword_tags::"+f.cppname }.join(',') + ">" end - def return_type() - return "TypedResult<qpid::framing::#{result.cpptype.ret_by_val}>" if (result) - return "Response" if (not responses().empty?) - return "Completion" + + def return_type(async) + if (async) + return "TypedResult<qpid::framing::#{result.cpptype.ret_by_val}>" if (result) + return "Completion" + else + return "qpid::framing::#{result.cpptype.ret_by_val}" if (result) + return "void" + end end + def session_function() "#{parent.name.lcaps}#{name.caps}"; end end class SessionNoKeywordGen < CppGen - - def initialize(outdir, amqp) + include SyncAsync + + def initialize(outdir, amqp, async) super(outdir, amqp) + @async=async @chassis="server" @namespace,@classname,@file= - parse_classname "qpid::client::no_keyword::Session_#{@amqp.version.bars}" + parse_classname "qpid::client::no_keyword::#{sync_prefix}Session_#{@amqp.version.bars}" + @version_base="SessionBase_#{@amqp.major}_#{@amqp.minor}" end def generate() h_file(@file) { - include "qpid/client/SessionBase.h" - - namespace("qpid::client") { - genl "using std::string;" - genl "using framing::Content;" - genl "using framing::FieldTable;" - genl "using framing::MethodContent;" - genl "using framing::SequenceNumber;" - genl "using framing::SequenceSet;" - genl "using framing::Uuid;" - #the following are nasty... would be better to dynamically - #include such statements based on params required - genl "using framing::Xid;" - genl - namespace("no_keyword") { - doxygen_comment { - genl "AMQP #{@amqp.version} session API." - genl @amqp.class_("session").doc + include "qpid/client/#{@version_base}.h" + namespace(@namespace) { + doxygen_comment { + genl "AMQP #{@amqp.version} #{sync_adjective} session API." + genl @amqp.class_("session").doc + # FIXME aconway 2008-05-23: additional doc on sync/async use. + } + cpp_class(@classname, "public #{@version_base}") { + public + decl_ctor_opeq() + session_methods.each { |m| + genl + doxygen(m) + args=m.sig_c_default.join(", ") + genl "#{m.return_type(@async)} #{m.session_function}(#{args});" } - cpp_class(@classname, "public SessionBase") { - public - genl "Session_#{@amqp.version.bars}() {}" - genl "Session_#{@amqp.version.bars}(shared_ptr<SessionImpl> core) : SessionBase(core) {}" - session_methods.each { |m| - genl - doxygen(m) - args=m.sig_c_default.join(", ") - genl "#{m.return_type} #{m.session_function}(#{args});" - } - }}}} + } + }} cpp_file(@file) { include @classname @@ -108,32 +132,47 @@ class SessionNoKeywordGen < CppGen genl sig=m.signature_c.join(", ") func="#{@classname}::#{m.session_function}" - scope("#{m.return_type} #{func}(#{sig}) {") { - args=(["ProtocolVersion()"]+m.param_names).join(", ") - body="#{m.body_name}(#{args})" - sendargs=body + scope("#{m.return_type(@async)} #{func}(#{sig}) {") { + args=(["ProtocolVersion(#{@amqp.major},#{@amqp.minor})"]+m.param_names).join(", ") + genl "#{m.body_name} body(#{args});"; + genl "body.setSync(#{@async ? 'false':'true'});" + sendargs="body" sendargs << ", content" if m.content - genl "return #{m.return_type}(impl->send(#{sendargs}), impl);" - }}}} + async_retval="#{m.return_type(true)}(impl->send(#{sendargs}), impl)" + if @async then + genl "return #{async_retval};" + else + if m.result + genl "return #{async_retval}.get();" + else + genl "#{async_retval}.wait();" + end + end + }} + defn_ctor_opeq() + }} end end class SessionGen < CppGen + include SyncAsync - def initialize(outdir, amqp) + def initialize(outdir, amqp, async) super(outdir, amqp) + @async=async @chassis="server" - session="Session_#{@amqp.version.bars}" + session="#{sync_prefix}Session_#{@amqp.version.bars}" @base="no_keyword::#{session}" @fqclass=FqClass.new "qpid::client::#{session}" @classname=@fqclass.name @fqbase=FqClass.new("qpid::client::#{@base}") + @version_base="SessionBase_#{@amqp.major}_#{@amqp.minor}" end - def gen_keyword_decl(m, prefix) + def gen_keyword_decl(m) return if m.fields_c.empty? # Inherited function will do. - scope("BOOST_PARAMETER_MEMFUN(#{m.return_type}, #{m.session_function}, 0, #{m.fields_c.size}, #{m.argpack_name}) {") { - scope("return #{prefix}#{m.session_function}(",");") { + scope("BOOST_PARAMETER_MEMFUN(#{m.return_type(@async)}, #{m.session_function}, 0, #{m.fields_c.size}, #{m.argpack_name}) {") { + scope("return #{@base}::#{m.session_function}(",");") { gen m.fields_c.map { |f| f.unpack() }.join(",\n") } } @@ -143,20 +182,21 @@ class SessionGen < CppGen def generate() keyword_methods=session_methods.reject { |m| m.fields_c.empty? } max_arity = keyword_methods.map{ |m| m.fields_c.size }.max + + h_file("qpid/client/arg.h") { + # Generate keyword tag declarations. + genl "#define BOOST_PARAMETER_MAX_ARITY #{max_arity}" + include "<boost/parameter.hpp>" + namespace("qpid::client::arg") { + keyword_methods.map{ |m| m.param_names_c }.flatten.uniq.each { |k| + genl "BOOST_PARAMETER_KEYWORD(keyword_tags, #{k})" + }} + } h_file(@fqclass.file) { include @fqbase.file - genl - genl "#define BOOST_PARAMETER_MAX_ARITY #{max_arity}" - include "<boost/parameter.hpp>" - genl + include "qpid/client/arg" namespace("qpid::client") { - # Generate keyword tag declarations. - namespace("arg") { - keyword_methods.map{ |m| m.param_names_c }.flatten.uniq.each { |k| - genl "BOOST_PARAMETER_KEYWORD(keyword_tags, #{k})" - }} - genl # Doxygen comment. doxygen_comment { genl "AMQP #{@amqp.version} session API with keyword arguments." @@ -183,17 +223,23 @@ EOS } # Session class. cpp_class(@classname,"public #{@base}") { + public + decl_ctor_opeq() private - genl "#{@classname}(shared_ptr<SessionImpl> core) : #{ @base}(core) {}" keyword_methods.each { |m| typedef m.argpack_type, m.argpack_name } genl "friend class Connection;" public - genl "#{@classname}() {}" - keyword_methods.each { |m| gen_keyword_decl(m,@base+"::") } - }}} + keyword_methods.each { |m| gen_keyword_decl(m) } + } + genl "/** Conversion to #{@classname} from another session type */" + genl "inline #{@classname} #{sync_convert}(const #{@version_base}& other) { return #{@clasname}(other); }" + defn_ctor_opeq("inline") + }} end end -SessionNoKeywordGen.new(ARGV[0], $amqp).generate() -SessionGen.new(ARGV[0], $amqp).generate() +SessionNoKeywordGen.new(ARGV[0], $amqp, true).generate() +SessionNoKeywordGen.new(ARGV[0], $amqp, false).generate() +SessionGen.new(ARGV[0], $amqp, true).generate() +SessionGen.new(ARGV[0], $amqp, false).generate() diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 1c8ca9da12..0bd04ec6cb 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -324,7 +324,9 @@ libqpidclient_la_SOURCES = \ qpid/client/MessageListener.cpp \ qpid/client/Queue.cpp \ qpid/client/Results.cpp \ - qpid/client/SessionBase.cpp \ + qpid/client/SessionBase_0_10.cpp \ + qpid/client/SessionBase_0_10.h \ + qpid/client/SessionBase_0_10Access.h \ qpid/client/SessionImpl.cpp \ qpid/client/StateManager.cpp \ qpid/client/SubscriptionManager.cpp @@ -446,8 +448,9 @@ nobase_include_HEADERS = \ qpid/client/MessageQueue.h \ qpid/client/Queue.h \ qpid/client/Results.h \ - qpid/client/SessionBase.h \ + qpid/client/SessionBase_0_10.h \ qpid/client/Session.h \ + qpid/client/AsyncSession.h \ qpid/client/SessionImpl.h \ qpid/client/StateManager.h \ qpid/client/SubscriptionManager.h \ diff --git a/cpp/src/qpid/SessionId.h b/cpp/src/qpid/SessionId.h index 08553e8b1d..291c42a2bb 100644 --- a/cpp/src/qpid/SessionId.h +++ b/cpp/src/qpid/SessionId.h @@ -27,7 +27,17 @@ namespace qpid { -/** Identifier for a session */ +/** Identifier for a session. + * There are two parts to a session identifier: + * + * getUserId() returns the authentication principal associated with + * the session's connection. + * + * getName() returns the session name. + * + * The name must be unique among sessions with the same authentication + * principal. + */ class SessionId : boost::totally_ordered1<SessionId> { std::string userId; std::string name; diff --git a/cpp/src/qpid/client/AckPolicy.h b/cpp/src/qpid/client/AckPolicy.h index 8d62b6f4f2..d00df1ef26 100644 --- a/cpp/src/qpid/client/AckPolicy.h +++ b/cpp/src/qpid/client/AckPolicy.h @@ -22,6 +22,7 @@ */ #include "qpid/framing/SequenceSet.h" +#include "qpid/client/AsyncSession.h" namespace qpid { namespace client { @@ -44,7 +45,7 @@ class AckPolicy */ AckPolicy(size_t n=1) : interval(n), count(n) {} - void ack(const Message& msg, Session& session) { + void ack(const Message& msg, AsyncSession session) { accepted.add(msg.getId()); if (!interval) return; if (--count==0) { @@ -57,7 +58,7 @@ class AckPolicy } } - void ackOutstanding(Session& session) { + void ackOutstanding(AsyncSession session) { if (!accepted.empty()) { session.messageAccept(accepted); accepted.clear(); diff --git a/cpp/src/qpid/client/AsyncSession.h b/cpp/src/qpid/client/AsyncSession.h new file mode 100644 index 0000000000..150aabe191 --- /dev/null +++ b/cpp/src/qpid/client/AsyncSession.h @@ -0,0 +1,38 @@ +#ifndef QPID_CLIENT_ASYNCSESSION_H +#define QPID_CLIENT_ASYNCSESSION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/AsyncSession_0_10.h" + +namespace qpid { +namespace client { + +/** + * AsyncSession is an alias for Session_0_10 + * + * \ingroup clientapi + */ +typedef AsyncSession_0_10 AsyncSession; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_ASYNCSESSION_H*/ diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index 82d1eac8b4..bec2b0345d 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -22,6 +22,7 @@ #include "ConnectionSettings.h" #include "Message.h" #include "SessionImpl.h" +#include "SessionBase_0_10Access.h" #include "qpid/log/Logger.h" #include "qpid/log/Options.h" #include "qpid/log/Statement.h" @@ -72,18 +73,16 @@ void Connection::open(const ConnectionSettings& settings) max_frame_size = impl->getNegotiatedSettings().maxFrameSize; } -Session Connection::newSession(SynchronousMode sync, - uint32_t detachedLifetime) -{ +Session Connection::newSession(const std::string& name) { if (!impl) throw Exception(QPID_MSG("Connection has not yet been opened")); - - shared_ptr<SessionImpl> core( - new SessionImpl(impl, ++channelIdCounter, max_frame_size)); - core->setSync(sync); - impl->addSession(core); - core->open(detachedLifetime); - return Session(core); + shared_ptr<SessionImpl> simpl( + new SessionImpl(name, impl, ++channelIdCounter, max_frame_size)); + impl->addSession(simpl); + simpl->open(0); + Session s; + SessionBase_0_10Access(s).set(simpl); + return s; } void Connection::resume(Session& session) { diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index 0c01c77509..5337a20bfa 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -26,7 +26,6 @@ #include "ConnectionImpl.h" #include "qpid/client/Session.h" #include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/Uuid.h" namespace qpid { @@ -111,14 +110,11 @@ class Connection * multiple streams of work to be multiplexed over the same * connection. * - *@param detachedLifetime: A session may be detached from its - * channel, either by calling Session::suspend() or because of a - * network failure. The session state is preserved for - * detachedLifetime seconds to allow a call to resume(). After - * that the broker may discard the session state. Default is 0, - * meaning the session cannot be resumed. + *@param name: A name to identify the session. @see qpid::SessionId + * If the name is empty (the default) then a unique name will be + * chosen using a Universally-unique identifier (UUID) algorithm. */ - Session newSession(SynchronousMode sync, uint32_t detachedLifetime=0); + Session newSession(const std::string& name=std::string()); /** * Resume a suspended session. A session may be resumed diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 2bbe5a122f..0bd0cb9d08 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -37,7 +37,8 @@ using qpid::sys::Thread; namespace qpid { namespace client { -Subscriber::Subscriber(Session& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {} +Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a) + : session(s), listener(l), autoAck(a) {} void Subscriber::received(Message& msg) { @@ -47,7 +48,7 @@ void Subscriber::received(Message& msg) } } -Dispatcher::Dispatcher(Session& s, const std::string& q) +Dispatcher::Dispatcher(const Session& s, const std::string& q) : session(s), running(false), autoStop(true) { queue = q.empty() ? @@ -88,7 +89,7 @@ void Dispatcher::run() } } } - session.sync(); // Make sure all our acks are received before returning. + sync(session).sync(); // Make sure all our acks are received before returning. } catch (const ClosedException&) {} //ignore it and return catch (const std::exception& e) { diff --git a/cpp/src/qpid/client/Dispatcher.h b/cpp/src/qpid/client/Dispatcher.h index e23d0c198c..1b31ddf4cf 100644 --- a/cpp/src/qpid/client/Dispatcher.h +++ b/cpp/src/qpid/client/Dispatcher.h @@ -37,13 +37,13 @@ namespace client { class Subscriber : public MessageListener { - Session& session; + AsyncSession session; MessageListener* const listener; AckPolicy autoAck; public: typedef boost::shared_ptr<Subscriber> shared_ptr; - Subscriber(Session& session, MessageListener* listener, AckPolicy); + Subscriber(const Session& session, MessageListener* listener, AckPolicy); void received(Message& msg); }; @@ -55,7 +55,7 @@ class Dispatcher : public sys::Runnable typedef std::map<std::string, Subscriber::shared_ptr> Listeners; sys::Mutex lock; sys::Thread worker; - Session& session; + Session session; Demux::QueuePtr queue; bool running; bool autoStop; @@ -67,7 +67,7 @@ class Dispatcher : public sys::Runnable bool isStopped(); public: - Dispatcher(Session& session, const std::string& queue = ""); + Dispatcher(const Session& session, const std::string& queue = ""); void start(); void run(); diff --git a/cpp/src/qpid/client/Session.h b/cpp/src/qpid/client/Session.h index 84831ec442..bdabd26c82 100644 --- a/cpp/src/qpid/client/Session.h +++ b/cpp/src/qpid/client/Session.h @@ -33,6 +33,7 @@ namespace client { */ typedef Session_0_10 Session; + }} // namespace qpid::client #endif /*!QPID_CLIENT_SESSION_H*/ diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp deleted file mode 100644 index dfd0f62e7e..0000000000 --- a/cpp/src/qpid/client/SessionBase.cpp +++ /dev/null @@ -1,84 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "SessionBase.h" -#include "qpid/framing/all_method_bodies.h" - -namespace qpid { -namespace client { -using namespace framing; - -SessionBase::SessionBase() {} -SessionBase::~SessionBase() {} -SessionBase::SessionBase(shared_ptr<SessionImpl> core) : impl(core) {} -void SessionBase::suspend() { impl->suspend(); } -void SessionBase::close() { impl->close(); } - -void SessionBase::setSynchronous(bool isSync) { impl->setSync(isSync); } -void SessionBase::setSynchronous(SynchronousMode m) { impl->setSync(m); } -bool SessionBase::isSynchronous() const { return impl->isSync(); } -SynchronousMode SessionBase::getSynchronous() const { - return SynchronousMode(impl->isSync()); -} - -Execution& SessionBase::getExecution() -{ - return *impl; -} - -void SessionBase::flush() -{ - impl->sendFlush(); -} - -// FIXME aconway 2008-04-24: do we need to provide a non-synchronous version -// of sync() or bool paramter to allow setting a sync point for a later wait? -void SessionBase::sync() -{ - ExecutionSyncBody b; - b.setSync(true); - impl->send(b).wait(*impl); -} - -void SessionBase::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer) -{ - impl->markCompleted(id, cumulative, notifyPeer); -} - -void SessionBase::sendCompletion() -{ - impl->sendCompletion(); -} - -Uuid SessionBase::getId() const { return impl->getId(); } -framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); } - -SessionBase::ScopedSync::ScopedSync(SessionBase& s) : session(s), change(!s.isSynchronous()) -{ - if (change) session.setSynchronous(true); -} - -SessionBase::ScopedSync::~ScopedSync() -{ - if (change) session.setSynchronous(false); -} - - -}} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h deleted file mode 100644 index 7f4a27dc09..0000000000 --- a/cpp/src/qpid/client/SessionBase.h +++ /dev/null @@ -1,148 +0,0 @@ -#ifndef QPID_CLIENT_SESSIONBASE_H -#define QPID_CLIENT_SESSIONBASE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/Uuid.h" -#include "qpid/framing/amqp_structs.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/framing/MethodContent.h" -#include "qpid/framing/TransferContent.h" -#include "qpid/client/Completion.h" -#include "qpid/client/ConnectionImpl.h" -#include "qpid/client/Execution.h" -#include "qpid/client/SessionImpl.h" -#include "qpid/client/TypedResult.h" -#include "qpid/shared_ptr.h" -#include <string> - -namespace qpid { -namespace client { - -using std::string; -using framing::Content; -using framing::FieldTable; -using framing::MethodContent; -using framing::SequenceNumberSet; -using framing::Uuid; - -enum CreditUnit { MESSAGE=0, BYTE=1 }; - -/** \defgroup clientapi Synchronous mode of a session. - * - * SYNC means that Session functions do not return until the remote - * broker has confirmed that the command was executed. - * - * ASYNC means that the client sends commands asynchronously, Session - * functions return immediately. - * - * ASYNC mode gives better performance for high-volume traffic, but - * requires some additional caution. - * - * Session functions return immediately. If the command causes an - * exception on the broker, the exception will be thrown on a - * <em>later</em> function call. - * - * If you need to notify some extenal agent that some actions have - * been taken (e.g. binding queues to exchanges), you must call - * Session::sync() first to ensure that all the commands are complete. - * - * You can freely switch between modes by calling Session::setSynchronous(). - * - * @see Session::sync(), Session::setSynchronous() - */ -enum SynchronousMode { SYNC=true, ASYNC=false }; - - -/** - * Basic session operations that are not derived from AMQP XML methods. - */ -class SessionBase -{ - public: - /** - * Instances of this class turn synchronous mode on for the - * duration of their scope (and revert back to async if required - * afterwards). - */ - class ScopedSync - { - SessionBase& session; - const bool change; - public: - ScopedSync(SessionBase& s); - ~ScopedSync(); - }; - - - SessionBase(); - ~SessionBase(); - - /** Get the next message frame-set from the session. */ - framing::FrameSet::shared_ptr get(); - - /** Get the session ID */ - Uuid getId() const; - - /** - * In synchronous mode, wait for the broker's response before - * returning. This gives lower throughput than asynchronous - * mode. - * - * In asynchronous mode commands are sent without waiting - * for a response (you can use the returned Completion object - * to wait for completion). - * - * @see SynchronousMode - */ - void setSynchronous(SynchronousMode mode); - void setSynchronous(bool set); - bool isSynchronous() const; - SynchronousMode getSynchronous() const; - - /** - * Suspend the session, which can be resumed on a different connection. - * @see Connection::resume() - */ - void suspend(); - - /** Close the session */ - void close(); - - Execution& getExecution(); - void sync(); - void flush(); - void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); - void sendCompletion(); - - typedef framing::TransferContent DefaultContent; - - protected: - shared_ptr<SessionImpl> impl; - framing::ProtocolVersion version; - friend class Connection; - SessionBase(shared_ptr<SessionImpl>); -}; - -}} // namespace qpid::client - -#endif /*!QPID_CLIENT_SESSIONBASE_H*/ diff --git a/cpp/src/qpid/client/SessionBase_0_10.cpp b/cpp/src/qpid/client/SessionBase_0_10.cpp new file mode 100644 index 0000000000..974acbfcf6 --- /dev/null +++ b/cpp/src/qpid/client/SessionBase_0_10.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionBase_0_10.h" +#include "qpid/framing/all_method_bodies.h" + +namespace qpid { +namespace client { +using namespace framing; + +SessionBase_0_10::SessionBase_0_10() {} +SessionBase_0_10::~SessionBase_0_10() {} + +void SessionBase_0_10::close() { impl->close(); } + +Execution& SessionBase_0_10::getExecution() +{ + return *impl; +} + +void SessionBase_0_10::flush() +{ + impl->sendFlush(); +} + +void SessionBase_0_10::sync() +{ + ExecutionSyncBody b; + b.setSync(true); + impl->send(b).wait(*impl); +} + +void SessionBase_0_10::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer) +{ + impl->markCompleted(id, cumulative, notifyPeer); +} + +void SessionBase_0_10::sendCompletion() +{ + impl->sendCompletion(); +} + +SessionId SessionBase_0_10::getId() const { return impl->getId(); } +framing::FrameSet::shared_ptr SessionBase_0_10::get() { return impl->get(); } + + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionBase_0_10.h b/cpp/src/qpid/client/SessionBase_0_10.h new file mode 100644 index 0000000000..f9ced049a9 --- /dev/null +++ b/cpp/src/qpid/client/SessionBase_0_10.h @@ -0,0 +1,104 @@ +#ifndef QPID_CLIENT_SESSIONBASE_H +#define QPID_CLIENT_SESSIONBASE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/SessionId.h" +#include "qpid/framing/amqp_structs.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/framing/MethodContent.h" +#include "qpid/framing/TransferContent.h" +#include "qpid/client/Completion.h" +#include "qpid/client/ConnectionImpl.h" +#include "qpid/client/Execution.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/TypedResult.h" +#include "qpid/shared_ptr.h" +#include <string> + +namespace qpid { +namespace client { + +using std::string; +using framing::Content; +using framing::FieldTable; +using framing::MethodContent; +using framing::SequenceNumber; +using framing::SequenceSet; +using framing::SequenceNumberSet; +using qpid::SessionId; +using framing::Xid; + +enum CreditUnit { MESSAGE=0, BYTE=1 }; + +/** + * Base class for handles to an AMQP session. + * + * Subclasses provide the AMQP commands for a given + * version of the protocol. + */ +class SessionBase_0_10 { + public: + + typedef framing::TransferContent DefaultContent; + + SessionBase_0_10(); + ~SessionBase_0_10(); + + /** Get the next message frame-set from the session. */ + framing::FrameSet::shared_ptr get(); + + /** Get the session ID */ + SessionId getId() const; + + /** Close the session. + * A session is automatically closed when all handles to it are destroyed. + */ + void close(); + + /** Synchronize the session: sync() waits until all commands + * issued on this session have been completed. It is equivalent to + * calling Session::executionSync() + * + * Note sync() is always synchronous, even on an AsyncSession object + * because that's almost always what you want. You can call + * AsyncSession::executionSync() directly in the unusual event + * that you want to do an asynchronous sync. + */ + void sync(); + + /** Set the timeout for this session. */ + uint32_t timeout(uint32_t seconds); + + Execution& getExecution(); + void flush(); + void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); + void sendCompletion(); + + protected: + boost::shared_ptr<SessionImpl> impl; + friend class SessionBase_0_10Access; +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_SESSIONBASE_H*/ diff --git a/cpp/src/qpid/client/SessionBase_0_10Access.h b/cpp/src/qpid/client/SessionBase_0_10Access.h new file mode 100644 index 0000000000..e2189a53dd --- /dev/null +++ b/cpp/src/qpid/client/SessionBase_0_10Access.h @@ -0,0 +1,42 @@ +#ifndef QPID_CLIENT_SESSIONBASEACCESS_H +#define QPID_CLIENT_SESSIONBASEACCESS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/SessionBase_0_10.h" + +/**@file @internal Internal use only */ + +namespace qpid { +namespace client { + +class SessionBase_0_10Access { + public: + SessionBase_0_10Access(SessionBase_0_10& sb_) : sb(sb_) {} + void set(const boost::shared_ptr<SessionImpl>& si) { sb.impl = si; } + boost::shared_ptr<SessionImpl> get() { return sb.impl; } + private: + SessionBase_0_10& sb; +}; +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_SESSIONBASEACCESS_H*/ diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 18b573b464..58f4bc0aa7 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -48,18 +48,16 @@ typedef sys::Monitor::ScopedUnlock UnLock; typedef sys::ScopedLock<sys::Semaphore> Acquire; -SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn, +SessionImpl::SessionImpl(const std::string& name, + shared_ptr<ConnectionImpl> conn, uint16_t ch, uint64_t _maxFrameSize) : error(OK), code(NORMAL), text(EMPTY), state(INACTIVE), - syncMode(false), detachedLifetime(0), maxFrameSize(_maxFrameSize), - id(true),//generate unique uuid for each session - name(id.str()), - //TODO: may want to allow application defined names instead + id(conn->getNegotiatedSettings().username, name.empty() ? Uuid(true).str() : name), connection(conn), ioHandler(*this), channel(ch), @@ -81,15 +79,6 @@ SessionImpl::~SessionImpl() { connection->erase(channel); } -void SessionImpl::setSync(bool s) // user thread -{ - syncMode = s; //syncMode is volatile -} - -bool SessionImpl::isSync() // user thread -{ - return syncMode; //syncMode is volatile -} FrameSet::shared_ptr SessionImpl::get() // user thread { @@ -97,7 +86,7 @@ FrameSet::shared_ptr SessionImpl::get() // user thread return demux.getDefault()->pop(); } -const Uuid SessionImpl::getId() const //user thread +const SessionId SessionImpl::getId() const //user thread { return id; //id is immutable } @@ -107,7 +96,7 @@ void SessionImpl::open(uint32_t timeout) // user thread Lock l(state); if (state == INACTIVE) { setState(ATTACHING); - proxy.attach(name, false); + proxy.attach(id.getName(), false); waitFor(ATTACHED); //TODO: timeout will not be set locally until get response to //confirm, should we wait for that? @@ -144,7 +133,7 @@ void SessionImpl::suspend() //user thread void SessionImpl::detach() //call with lock held { if (state == ATTACHED) { - proxy.detach(name); + proxy.detach(id.getName()); setState(DETACHING); } } @@ -285,15 +274,11 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con { Acquire a(sendLock); SequenceNumber id = nextOut++; - bool sync; { Lock l(state); checkOpen(); incompleteOut.add(id); - sync = syncMode; } - - if (sync) command.getMethod()->setSync(true); Future f(id); if (command.getMethod()->resultExpected()) { Lock l(state); @@ -308,9 +293,6 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con if (content) { sendContent(*content); } - if (sync) { - waitForCompletion(id); - } return f; } void SessionImpl::sendContent(const MethodContent& content) @@ -441,27 +423,27 @@ void SessionImpl::attach(const std::string& /*name*/, bool /*force*/) void SessionImpl::attached(const std::string& _name) { Lock l(state); - if (name != _name) throw InternalErrorException("Incorrect session name"); + if (id.getName() != _name) throw InternalErrorException("Incorrect session name"); setState(ATTACHED); } void SessionImpl::detach(const std::string& _name) { Lock l(state); - if (name != _name) throw InternalErrorException("Incorrect session name"); + if (id.getName() != _name) throw InternalErrorException("Incorrect session name"); setState(DETACHED); - QPID_LOG(info, "Session detached by peer: " << name); + QPID_LOG(info, "Session detached by peer: " << id); } void SessionImpl::detached(const std::string& _name, uint8_t _code) { Lock l(state); - if (name != _name) throw InternalErrorException("Incorrect session name"); + if (id.getName() != _name) throw InternalErrorException("Incorrect session name"); setState(DETACHED); if (_code) { //TODO: make sure this works with execution.exception - don't //want to overwrite the code from that - QPID_LOG(error, "Session detached by peer: " << name << " " << code); + QPID_LOG(error, "Session detached by peer: " << id << " " << code); error = SESSION_DETACH; code = _code; text = "Session detached by peer"; @@ -561,8 +543,6 @@ void SessionImpl::gap(const framing::SequenceSet& /*commands*/) throw NotImplementedException("gap not yet supported"); } - - void SessionImpl::sync() {} void SessionImpl::result(const framing::SequenceNumber& commandId, const std::string& value) diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 0bcec4dd0c..7bb7136912 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -26,6 +26,7 @@ #include "Execution.h" #include "Results.h" +#include "qpid/SessionId.h" #include "qpid/shared_ptr.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/ChannelHandler.h" @@ -59,14 +60,14 @@ class SessionImpl : public framing::FrameHandler::InOutHandler, private framing::AMQP_ClientOperations::ExecutionHandler { public: - SessionImpl(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize); + SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize); ~SessionImpl(); //NOTE: Public functions called in user thread. framing::FrameSet::shared_ptr get(); - const framing::Uuid getId() const; + const SessionId getId() const; uint16_t getChannel() const; void setChannel(uint16_t channel); @@ -76,8 +77,6 @@ public: void resume(shared_ptr<ConnectionImpl>); void suspend(); - void setSync(bool s); - bool isSync(); void assertOpen() const; Future send(const framing::AMQBody& command); @@ -131,7 +130,8 @@ private: Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0); void sendContent(const framing::MethodContent&); void waitForCompletionImpl(const framing::SequenceNumber& id); - + void requestTimeout(uint32_t timeout); + void sendCompletionImpl(); // Note: Following methods are called by network thread in @@ -140,7 +140,6 @@ private: void attached(const std::string& name); void detach(const std::string& name); void detached(const std::string& name, uint8_t detachCode); - void requestTimeout(uint32_t timeout); void timeout(uint32_t timeout); void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset); void expected(const framing::SequenceSet& commands, const framing::Array& fragments); @@ -167,11 +166,9 @@ private: std::string text; // Error text mutable StateMonitor state; mutable sys::Semaphore sendLock; - volatile bool syncMode; uint32_t detachedLifetime; const uint64_t maxFrameSize; - const framing::Uuid id; - const std::string name; + const SessionId id; shared_ptr<ConnectionImpl> connection; framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler; diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index 2ba3f5fe62..6036f153f6 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -32,23 +32,23 @@ namespace qpid { namespace client { -SubscriptionManager::SubscriptionManager(Session& s) +SubscriptionManager::SubscriptionManager(const Session& s) : dispatcher(s), session(s), messages(UNLIMITED), bytes(UNLIMITED), window(true), acceptMode(0), acquireMode(0), autoStop(true) {} -Completion SubscriptionManager::subscribeInternal( +void SubscriptionManager::subscribeInternal( const std::string& q, const std::string& dest) { - Completion c = session.messageSubscribe(arg::queue=q, arg::destination=dest, - arg::acceptMode=acceptMode, arg::acquireMode=acquireMode); - setFlowControl(dest, messages, bytes, window); - return c; + async(session).messageSubscribe( // setFlowControl will sync. + arg::queue=q, arg::destination=dest, + arg::acceptMode=acceptMode, arg::acquireMode=acquireMode); + setFlowControl(dest, messages, bytes, window); } -Completion SubscriptionManager::subscribe( +void SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const std::string& d) { std::string dest=d.empty() ? q:d; @@ -56,7 +56,7 @@ Completion SubscriptionManager::subscribe( return subscribeInternal(q, dest); } -Completion SubscriptionManager::subscribe( +void SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const std::string& d) { std::string dest=d.empty() ? q:d; @@ -68,9 +68,9 @@ Completion SubscriptionManager::subscribe( void SubscriptionManager::setFlowControl( const std::string& dest, uint32_t messages, uint32_t bytes, bool window) { - session.messageSetFlowMode(dest, window); - session.messageFlow(dest, 0, messages); - session.messageFlow(dest, 1, bytes); + async(session).messageSetFlowMode(dest, window); + async(session).messageFlow(dest, 0, messages); + session.messageFlow(dest, 1, bytes); // Only need one sync } void SubscriptionManager::setFlowControl( diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 4ccb95c968..4ff962f67b 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -45,10 +45,10 @@ class SubscriptionManager : public sys::Runnable typedef sys::Mutex::ScopedLock Lock; typedef sys::Mutex::ScopedUnlock Unlock; - Completion subscribeInternal(const std::string& q, const std::string& dest); + void subscribeInternal(const std::string& q, const std::string& dest); qpid::client::Dispatcher dispatcher; - qpid::client::Session& session; + qpid::client::Session session; uint32_t messages; uint32_t bytes; bool window; @@ -58,7 +58,7 @@ class SubscriptionManager : public sys::Runnable bool autoStop; public: - SubscriptionManager(Session& session); + SubscriptionManager(const Session& session); /** * Subscribe a MessagesListener to receive messages from queue. @@ -68,7 +68,7 @@ class SubscriptionManager : public sys::Runnable *@param tag Unique destination tag for the listener. * If not specified, the queue name is used. */ - Completion subscribe(MessageListener& listener, + void subscribe(MessageListener& listener, const std::string& queue, const std::string& tag=std::string()); @@ -79,7 +79,7 @@ class SubscriptionManager : public sys::Runnable *@param tag Unique destination tag for the listener. * If not specified, the queue name is used. */ - Completion subscribe(LocalQueue& localQueue, + void subscribe(LocalQueue& localQueue, const std::string& queue, const std::string& tag=std::string()); diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index 83b3f621c7..31f63d71a0 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -92,7 +92,7 @@ struct SessionFixtureT : BrokerFixture { qpid::client::LocalQueue lq; SessionFixtureT() : connection(broker->getPort()), - session(connection.newSession(qpid::client::ASYNC)), + session(connection.newSession("SessionFixture")), subs(session) {} diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 801e33d412..1dade47ee9 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -106,19 +106,19 @@ struct ClientSessionFixture : public ProxySessionFixture QPID_AUTO_TEST_CASE(testQueueQuery) { ClientSessionFixture fix; - fix.session = fix.connection.newSession(ASYNC); + fix.session = fix.connection.newSession(); fix.session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true); - TypedResult<QueueQueryResult> result = fix.session.queueQuery(string("my-queue")); - BOOST_CHECK_EQUAL(false, result.get().getDurable()); - BOOST_CHECK_EQUAL(true, result.get().getExclusive()); + QueueQueryResult result = fix.session.queueQuery(string("my-queue")); + BOOST_CHECK_EQUAL(false, result.getDurable()); + BOOST_CHECK_EQUAL(true, result.getExclusive()); BOOST_CHECK_EQUAL(string("amq.fanout"), - result.get().getAlternateExchange()); + result.getAlternateExchange()); } QPID_AUTO_TEST_CASE(testTransfer) { ClientSessionFixture fix; - fix.session=fix.connection.newSession(ASYNC); + fix.session=fix.connection.newSession(); fix.declareSubscribe(); fix.session.messageTransfer(acceptMode=1, content=TransferContent("my-message", "my-queue")); //get & test the message: @@ -133,7 +133,7 @@ QPID_AUTO_TEST_CASE(testTransfer) QPID_AUTO_TEST_CASE(testDispatcher) { ClientSessionFixture fix; - fix.session =fix.connection.newSession(ASYNC); + fix.session =fix.connection.newSession(); fix.declareSubscribe(); size_t count = 100; for (size_t i = 0; i < count; ++i) @@ -148,7 +148,7 @@ QPID_AUTO_TEST_CASE(testDispatcher) QPID_AUTO_TEST_CASE(testDispatcherThread) { ClientSessionFixture fix; - fix.session =fix.connection.newSession(ASYNC); + fix.session =fix.connection.newSession(); fix.declareSubscribe(); size_t count = 10; DummyListener listener(fix.session, "my-dest", count); @@ -162,40 +162,42 @@ QPID_AUTO_TEST_CASE(testDispatcherThread) BOOST_CHECK_EQUAL(lexical_cast<string>(i), listener.messages[i].getData()); } -QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1) -{ - ClientSessionFixture fix; - fix.session =fix.connection.newSession(ASYNC, 0); - fix.session.suspend(); // session has 0 timeout. - try { - fix.connection.resume(fix.session); - BOOST_FAIL("Expected InvalidArgumentException."); - } catch(const InternalErrorException&) {} -} - -QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUseSuspendedError, 1) -{ - ClientSessionFixture fix; - fix.session =fix.connection.newSession(ASYNC, 60); - fix.session.suspend(); - try { - fix.session.exchangeQuery(name="amq.fanout"); - BOOST_FAIL("Expected session suspended exception"); - } catch(const CommandInvalidException&) {} -} +// FIXME aconway 2008-05-26: Re-enable with final resume implementation. +// +// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1) +// { +// ClientSessionFixture fix; +// fix.session.suspend(); // session has 0 timeout. +// try { +// fix.connection.resume(fix.session); +// BOOST_FAIL("Expected InvalidArgumentException."); +// } catch(const InternalErrorException&) {} +// } + +// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUseSuspendedError, 1) +// { +// ClientSessionFixture fix; +// fix.session =fix.session.timeout(60); +// fix.session.suspend(); +// try { +// fix.session.exchangeQuery(name="amq.fanout"); +// BOOST_FAIL("Expected session suspended exception"); +// } catch(const CommandInvalidException&) {} +// } + +// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1) +// { +// ClientSessionFixture fix; +// fix.session.timeout(60); +// fix.declareSubscribe(); +// fix.session.suspend(); +// // Make sure we are still subscribed after resume. +// fix.connection.resume(fix.session); +// fix.session.messageTransfer(content=TransferContent("my-message", "my-queue")); +// FrameSet::shared_ptr msg = fix.session.get(); +// BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); +// } -QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1) -{ - ClientSessionFixture fix; - fix.session =fix.connection.newSession(ASYNC, 60); - fix.declareSubscribe(); - fix.session.suspend(); - // Make sure we are still subscribed after resume. - fix.connection.resume(fix.session); - fix.session.messageTransfer(content=TransferContent("my-message", "my-queue")); - FrameSet::shared_ptr msg = fix.session.get(); - BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); -} QPID_AUTO_TEST_CASE(testSendToSelf) { ClientSessionFixture fix; diff --git a/cpp/src/tests/XmlClientSessionTest.cpp b/cpp/src/tests/XmlClientSessionTest.cpp index dc6cce24fc..9b0c035f37 100644 --- a/cpp/src/tests/XmlClientSessionTest.cpp +++ b/cpp/src/tests/XmlClientSessionTest.cpp @@ -121,7 +121,7 @@ struct ClientSessionFixture : public ProxySessionFixture QPID_AUTO_TEST_CASE(testXmlBinding) { ClientSessionFixture f; - Session session = f.connection.newSession(ASYNC); + Session session = f.connection.newSession(); SubscriptionManager subscriptions(session); SubscribedLocalQueue localQueue(subscriptions); diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp index 20e8b21a3a..04269b299d 100644 --- a/cpp/src/tests/client_test.cpp +++ b/cpp/src/tests/client_test.cpp @@ -92,7 +92,7 @@ int main(int argc, char** argv) //Create and open a session on the connection through which //most functionality is exposed: - Session session = connection.newSession(ASYNC); + Session session = connection.newSession(); if (opts.verbose) std::cout << "Opened session." << std::endl; diff --git a/cpp/src/tests/consume.cpp b/cpp/src/tests/consume.cpp index 6d2f0a7413..43e08a80b7 100644 --- a/cpp/src/tests/consume.cpp +++ b/cpp/src/tests/consume.cpp @@ -62,7 +62,7 @@ struct Client Client() { opts.open(connection); - session = connection.newSession(ASYNC); + session = connection.newSession(); } void consume() diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp index f75269c959..a656e0cf1a 100644 --- a/cpp/src/tests/exception_test.cpp +++ b/cpp/src/tests/exception_test.cpp @@ -96,7 +96,6 @@ QPID_AUTO_TEST_CASE(DisconnectedListen) { QPID_AUTO_TEST_CASE(NoSuchQueueTest) { ProxySessionFixture fix; - fix.session.setSynchronous(true); BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue"), NotFoundException); } diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp index 0b343d0243..f4cbade36b 100644 --- a/cpp/src/tests/latencytest.cpp +++ b/cpp/src/tests/latencytest.cpp @@ -30,7 +30,7 @@ #include "TestOptions.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" -#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" using namespace qpid; @@ -99,7 +99,7 @@ class Client : public Runnable { protected: Connection connection; - Session session; + AsyncSession session; Thread thread; string queue; @@ -157,7 +157,7 @@ public: Client::Client(const string& q) : queue(q) { opts.open(connection); - session = connection.newSession(ASYNC); + session = connection.newSession(); } void Client::start() @@ -262,7 +262,7 @@ void Sender::sendByCount() uint64_t sentAt(current_time()); msg.getDeliveryProperties().setTimestamp(sentAt); //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables - session.messageTransfer(arg::content=msg, arg::acceptMode=1); + async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); } session.sync(); } @@ -283,7 +283,7 @@ void Sender::sendByRate() uint64_t sentAt(current_time()); msg.getDeliveryProperties().setTimestamp(sentAt); //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables - session.messageTransfer(arg::content=msg, arg::acceptMode=1); + async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); } uint64_t timeTaken = (current_time() - start) / TIME_USEC; if (timeTaken < 1000) { diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index 2a8a9ec17c..91ecd83f50 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -21,7 +21,7 @@ #include "TestOptions.h" -#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/client/Connection.h" #include "qpid/client/Completion.h" @@ -194,12 +194,12 @@ Opts opts; struct Client : public Runnable { Connection connection; - Session session; + AsyncSession session; Thread thread; Client() { opts.open(connection); - session = connection.newSession(ASYNC); + session = connection.newSession(); } ~Client() { @@ -431,7 +431,7 @@ struct PublishThread : public Client { offset = 5; data += "data:";//marker (requested for latency testing tool scripts) data += string(sizeof(size_t), 'X');//space for seq no - data += string(reinterpret_cast<const char*>(session.getId().data()), session.getId().size()); + data += session.getId().str(); if (opts.size > data.size()) { data += string(opts.size - data.size(), 'X'); } else if(opts.size < data.size()) { diff --git a/cpp/src/tests/publish.cpp b/cpp/src/tests/publish.cpp index 17e3d4e104..b78f3fdf6d 100644 --- a/cpp/src/tests/publish.cpp +++ b/cpp/src/tests/publish.cpp @@ -28,7 +28,7 @@ #include "TestOptions.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" -#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" using namespace qpid; @@ -61,12 +61,12 @@ Args opts; struct Client { Connection connection; - Session session; + AsyncSession session; Client() { opts.open(connection); - session = connection.newSession(ASYNC); + session = connection.newSession(); } std::string id(uint i) diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index 8f0e290070..6daf928401 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -53,7 +53,7 @@ using namespace std; * defined. */ class Listener : public MessageListener{ - Session& session; + Session session; SubscriptionManager& mgr; const string responseQueue; const bool transactional; @@ -64,7 +64,7 @@ class Listener : public MessageListener{ void shutdown(); void report(); public: - Listener(Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx); + Listener(const Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx); virtual void received(Message& msg); }; @@ -101,7 +101,7 @@ int main(int argc, char** argv){ else { Connection connection; args.open(connection); - Session session = connection.newSession(ASYNC); + AsyncSession session = connection.newSession(); if (args.transactional) { session.txSelect(); } @@ -127,7 +127,8 @@ int main(int argc, char** argv){ mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); } mgr.subscribe(listener, control); - + session.sync(); + cout << "topic_listener: listening..." << endl; mgr.run(); if (args.durable) { @@ -144,7 +145,7 @@ int main(int argc, char** argv){ return 1; } -Listener::Listener(Session& s, SubscriptionManager& m, const string& _responseq, bool tx) : +Listener::Listener(const Session& s, SubscriptionManager& m, const string& _responseq, bool tx) : session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){} void Listener::received(Message& message){ diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp index a6a7b4d80d..c8f0d543ec 100644 --- a/cpp/src/tests/topic_publisher.cpp +++ b/cpp/src/tests/topic_publisher.cpp @@ -37,7 +37,7 @@ #include "TestOptions.h" #include "qpid/client/Connection.h" #include "qpid/client/MessageListener.h" -#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/sys/Monitor.h" #include <unistd.h> @@ -56,7 +56,7 @@ using namespace std; * back by the subscribers. */ class Publisher { - Session& session; + AsyncSession session; SubscriptionManager mgr; LocalQueue queue; const string controlTopic; @@ -66,7 +66,7 @@ class Publisher { string generateData(int size); public: - Publisher(Session& session, const string& controlTopic, bool tx, bool durable); + Publisher(const AsyncSession& session, const string& controlTopic, bool tx, bool durable); int64_t publish(int msgs, int listeners, int size); void terminate(); }; @@ -107,7 +107,7 @@ int main(int argc, char** argv) { else { Connection connection; args.open(connection); - Session session = connection.newSession(ASYNC); + AsyncSession session = connection.newSession(); if (args.transactional) { session.txSelect(); } @@ -150,7 +150,7 @@ int main(int argc, char** argv) { return 1; } -Publisher::Publisher(Session& _session, const string& _controlTopic, bool tx, bool d) : +Publisher::Publisher(const AsyncSession& _session, const string& _controlTopic, bool tx, bool d) : session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d) { mgr.subscribe(queue, "response"); diff --git a/cpp/src/tests/topictest b/cpp/src/tests/topictest index c36aa319ba..ad7c5df693 100755 --- a/cpp/src/tests/topictest +++ b/cpp/src/tests/topictest @@ -36,5 +36,5 @@ for ((i=$SUBSCRIBERS ; i--; )); do subscribe $i & done # FIXME aconway 2007-03-27: Hack around startup race. Fix topic test. -sleep 1 +sleep 2 publish 2>&1 || exit 1 diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp index a8369df759..6eb812738d 100644 --- a/cpp/src/tests/txtest.cpp +++ b/cpp/src/tests/txtest.cpp @@ -28,7 +28,7 @@ #include "TestOptions.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" -#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" using namespace qpid; @@ -96,12 +96,12 @@ Args opts; struct Client { Connection connection; - Session session; + AsyncSession session; Client() { opts.open(connection); - session = connection.newSession(ASYNC); + session = connection.newSession(); } ~Client() |