diff options
author | Gordon Sim <gsim@apache.org> | 2008-04-20 12:10:37 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-04-20 12:10:37 +0000 |
commit | 4780580874e8d6a3e3590fa5fdf8a088310b20ae (patch) | |
tree | a73e247b9821c2429a8e015ddff9cbb5d17a88e8 /qpid | |
parent | e29bb6406ad8d0c0d9d58b7d1d09798829687602 (diff) | |
download | qpid-python-4780580874e8d6a3e3590fa5fdf8a088310b20ae.tar.gz |
QPID-920: converted c++ client to use final 0-10 protocol
* connection handler converted to using invoker & proxy and updated to final method defs
* SessionCore & ExecutionHandler replace by SessionImpl
* simplified handling of completion & results, removed handling of responses
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@649915 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
89 files changed, 1587 insertions, 2083 deletions
diff --git a/qpid/cpp/examples/examples/direct/declare_queues.cpp b/qpid/cpp/examples/examples/direct/declare_queues.cpp index de7eff0490..71ed28dac4 100644 --- a/qpid/cpp/examples/examples/direct/declare_queues.cpp +++ b/qpid/cpp/examples/examples/direct/declare_queues.cpp @@ -66,10 +66,10 @@ int main(int argc, char** argv) { //--------- Main body of program -------------------------------------------- // Create a queue named "message_queue", and route all messages whose - // routing key is "routing_key to this newly created queue. + // routing key is "routing_key" to this newly created queue. session.queueDeclare(arg::queue="message_queue"); - session.queueBind(arg::exchange="amq.direct", arg::queue="message_queue", arg::routingKey="routing_key"); + session.exchangeBind(arg::exchange="amq.direct", arg::queue="message_queue", arg::bindingKey="routing_key"); //----------------------------------------------------------------------------- diff --git a/qpid/cpp/examples/examples/fanout/listener.cpp b/qpid/cpp/examples/examples/fanout/listener.cpp index 2860528b1f..79809d679e 100644 --- a/qpid/cpp/examples/examples/fanout/listener.cpp +++ b/qpid/cpp/examples/examples/fanout/listener.cpp @@ -71,13 +71,15 @@ int main(int argc, char** argv) { // Unique name for private queue: std::string myQueue=session.getId().str(); - // Declear my queue. + // Declare my queue. session.queueDeclare(arg::queue=myQueue, arg::exclusive=true, arg::autoDelete=true); - // Bind my queue to the fanout exchange. - // Note no routingKey required, the fanout exchange delivers - // all messages to all bound queues unconditionally. - session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue); + // Bind my queue to the fanout exchange. + //Note no the binding key will not affect routing (its just + //used to identify the binding e.g. when unbinding), the + //fanout exchange delivers all messages to all bound queues + //unconditionally. + session.exchangeBind(arg::exchange="amq.fanout", arg::queue=myQueue, arg::bindingKey="my-key"); // Create a listener and subscribe it to my queue. SubscriptionManager subscriptions(session); diff --git a/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp b/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp index e5292db703..c7e9d3877d 100644 --- a/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp +++ b/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp @@ -107,8 +107,8 @@ void Listener::prepareQueue(std::string queue, std::string routing_key) { * "control" routing key, when it is finished. */ - session.queueBind(arg::exchange="amq.topic", arg::queue=queue, arg::routingKey=routing_key); - session.queueBind(arg::exchange="amq.topic", arg::queue=queue, arg::routingKey="control"); + session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey=routing_key); + session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey="control"); /* * subscribe to the queue using the subscription manager. diff --git a/qpid/cpp/examples/examples/request-response/client.cpp b/qpid/cpp/examples/examples/request-response/client.cpp index 9f82bd9d9e..8cec16a461 100644 --- a/qpid/cpp/examples/examples/request-response/client.cpp +++ b/qpid/cpp/examples/examples/request-response/client.cpp @@ -128,7 +128,7 @@ int main(int argc, char** argv) { // Use the name of the response queue as the routing key session.queueDeclare(arg::queue=response_queue.str()); - session.queueBind(arg::exchange="amq.direct", arg::queue=response_queue.str(), arg::routingKey=response_queue.str()); + session.exchangeBind(arg::exchange="amq.direct", arg::queue=response_queue.str(), arg::bindingKey=response_queue.str()); // Create a listener for the response queue and start listening. diff --git a/qpid/cpp/examples/examples/request-response/server.cpp b/qpid/cpp/examples/examples/request-response/server.cpp index 0de2ce5234..6c9bc7ffa6 100644 --- a/qpid/cpp/examples/examples/request-response/server.cpp +++ b/qpid/cpp/examples/examples/request-response/server.cpp @@ -143,7 +143,7 @@ int main(int argc, char** argv) { // Use the name of the request queue as the routing key session.queueDeclare(arg::queue=request_queue); - session.queueBind(arg::exchange="amq.direct", arg::queue=request_queue, arg::routingKey=request_queue); + session.exchangeBind(arg::exchange="amq.direct", arg::queue=request_queue, arg::bindingKey=request_queue); // Create a listener for the request queue and start listening. diff --git a/qpid/cpp/rubygen/99-0/Session.rb b/qpid/cpp/rubygen/99-0/Session.rb index 1ec78f6167..8e14d8daf9 100644 --- a/qpid/cpp/rubygen/99-0/Session.rb +++ b/qpid/cpp/rubygen/99-0/Session.rb @@ -8,7 +8,7 @@ class CppGen def session_methods excludes = ["channel", "connection", "session", "execution"] gen_methods=@amqp.methods_on(@chassis).reject { |m| - excludes.include? m.parent.name or m.body_name.include?("010") + excludes.include? m.classname or !m.parent.name.include?("010") } end @@ -43,7 +43,7 @@ class AmqpMethod def param_names_c() fields_c.map { |f| f.cppname} end def signature_c() fields_c.map { |f| f.signature }; end def sig_c_default() fields_c.map { |f| f.sig_default }; end - def argpack_name() "#{parent.cppname}#{name.caps}Parameters"; end + def argpack_name() "#{classname.lcaps.cppsafe}#{name.caps}Parameters"; end def argpack_type() "boost::parameter::parameters<" + fields_c.map { |f| "arg::keyword_tags::"+f.cppname }.join(',') + @@ -54,7 +54,7 @@ class AmqpMethod return "Response" if (not responses().empty?) return "Completion" end - def session_function() "#{parent.name.lcaps}#{name.caps}"; end + def session_function() "#{classname.lcaps}#{name.caps}"; end end class SessionNoKeywordGen < CppGen @@ -75,6 +75,7 @@ class SessionNoKeywordGen < CppGen genl "using framing::Content;" genl "using framing::FieldTable;" genl "using framing::MethodContent;" + genl "using framing::SequenceSet;" genl "using framing::SequenceNumberSet;" genl "using framing::Uuid;" genl @@ -86,7 +87,7 @@ class SessionNoKeywordGen < CppGen cpp_class(@classname, "public SessionBase") { public genl "Session_#{@amqp.version.bars}() {}" - genl "Session_#{@amqp.version.bars}(shared_ptr<SessionCore> core) : SessionBase(core) {}" + genl "Session_#{@amqp.version.bars}(shared_ptr<SessionImpl> core) : SessionBase(core) {}" session_methods.each { |m| genl doxygen(m) @@ -180,7 +181,7 @@ EOS # Session class. cpp_class(@classname,"public #{@base}") { private - genl "#{@classname}(shared_ptr<SessionCore> core) : #{ @base}(core) {}" + genl "#{@classname}(shared_ptr<SessionImpl> core) : #{ @base}(core) {}" keyword_methods.each { |m| typedef m.argpack_type, m.argpack_name } genl "friend class Connection;" public diff --git a/qpid/cpp/rubygen/99-0/structs.rb b/qpid/cpp/rubygen/99-0/structs.rb index 58e175af0f..b7d230d528 100644 --- a/qpid/cpp/rubygen/99-0/structs.rb +++ b/qpid/cpp/rubygen/99-0/structs.rb @@ -35,7 +35,7 @@ class StructGen < CppGen def is_packed(s) #return true - s.kind_of?(AmqpStruct) or s.body_name.include?("010") + s.kind_of?(AmqpStruct) or s.parent.name.include?("010") end def execution_header?(s) diff --git a/qpid/cpp/rubygen/amqpgen.rb b/qpid/cpp/rubygen/amqpgen.rb index bf473506d4..76685aa45b 100755 --- a/qpid/cpp/rubygen/amqpgen.rb +++ b/qpid/cpp/rubygen/amqpgen.rb @@ -341,7 +341,7 @@ class AmqpClass < AmqpElement end def l4?() # preview - !["connection", "session", "execution"].include?(name) + !["connection", "session", "execution"].include?(name) && !control? end def control?() diff --git a/qpid/cpp/rubygen/cppgen.rb b/qpid/cpp/rubygen/cppgen.rb index 757894163d..c1121e9bfe 100755 --- a/qpid/cpp/rubygen/cppgen.rb +++ b/qpid/cpp/rubygen/cppgen.rb @@ -88,6 +88,12 @@ class CppType def passcref() @param="const #{name}&"; self; end def code(str) @code=str; self; end def defval(str) @defval=str; self; end + def fq(namespace) + @param="const #{namespace}::#{name}&" + @ret="const #{namespace}::#{name}&" + @defval="#{namespace}::#{name}()" + self + end def encode(value, buffer) @code ? "#{buffer}.put#{@code}(#{value});" : "#{value}.encode(#{buffer});" @@ -143,7 +149,19 @@ class AmqpMethod def cppname() name.lcaps.cppsafe; end def param_names() fields.map { |f| f.cppname }; end def signature() fields.map { |f| f.signature }; end - def body_name() parent.name.caps+name.caps+"Body"; end + def classname() + #TODO: remove name mangling after preview path is dropped + if (parent.name.include?("010")) + return parent.name.delete("010") + elsif (parent.name == "cluster") + return parent.name + else + return parent.name + "X" + end + end + def body_name() + classname().caps+name.caps+"Body" + end def cpp_pack_type() # preview CppType.new("uint16_t").code("Short").defval("0"); @@ -211,7 +229,16 @@ class AmqpDomain "uuid"=>CppType.new("Uuid").passcref.retcref } - def cppname() name.caps; end + def cppname() + #TODO: remove name mangling after preview path is dropped + if (name.include?("010")) + return name.caps.delete("010") + elsif (name.include?("properties")) + return "Preview" + name.caps + else + return name.caps + end + end def fqtypename() return containing_class.nsname+"::"+name.typename if containing_class @@ -221,7 +248,7 @@ class AmqpDomain def cpptype() d=unalias @cpptype ||= @@typemap[d.type_] or - CppType.new(d.cppname).passcref.retcref or + CppType.new(d.cppname).fq("qpid::framing") or raise "Invalid type #{self}" end @@ -232,7 +259,7 @@ end class AmqpResult def cpptype() - @cpptype=CppType.new(parent.parent.name.caps+parent.name.caps+"Result").passcref + @cpptype=CppType.new(parent.classname.caps+parent.name.caps+"Result").passcref end end diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 82e6f4e6a4..cf4630cdf0 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -265,26 +265,24 @@ libqpidbroker_la_SOURCES = \ libqpidclient_la_LIBADD = libqpidcommon.la libqpidclient_la_SOURCES = \ $(rgen_client_srcs) \ - qpid/client/SessionBase.cpp \ qpid/client/Connection.cpp \ qpid/client/Channel.cpp \ qpid/client/Exchange.cpp \ - qpid/broker/PersistableMessage.cpp \ qpid/client/Queue.cpp \ qpid/client/ConnectionImpl.cpp \ qpid/client/Connector.cpp \ qpid/client/Demux.cpp \ qpid/client/Dispatcher.cpp \ qpid/client/LocalQueue.cpp \ + qpid/client/Message.cpp \ qpid/client/MessageListener.cpp \ - qpid/client/Correlator.cpp \ - qpid/client/CompletionTracker.cpp \ qpid/client/ConnectionHandler.cpp \ - qpid/client/ExecutionHandler.cpp \ + qpid/client/Future.cpp \ qpid/client/FutureCompletion.cpp \ - qpid/client/FutureResponse.cpp \ qpid/client/FutureResult.cpp \ - qpid/client/SessionCore.cpp \ + qpid/client/Results.cpp \ + qpid/client/SessionBase.cpp \ + qpid/client/SessionImpl.cpp \ qpid/client/StateManager.cpp \ qpid/client/SubscriptionManager.cpp @@ -392,27 +390,23 @@ nobase_include_HEADERS = \ qpid/client/Queue.h \ qpid/client/AckPolicy.h \ qpid/client/Completion.h \ - qpid/client/CompletionTracker.h \ qpid/client/Connection.h \ qpid/client/ConnectionHandler.h \ qpid/client/ConnectionImpl.h \ qpid/client/Connector.h \ - qpid/client/Correlator.h \ qpid/client/Demux.h \ qpid/client/Dispatcher.h \ qpid/client/LocalQueue.h \ qpid/client/Execution.h \ - qpid/client/ExecutionHandler.h \ qpid/client/Future.h \ qpid/client/FutureCompletion.h \ - qpid/client/FutureResponse.h \ qpid/client/FutureResult.h \ qpid/client/MessageListener.h \ qpid/client/MessageQueue.h \ - qpid/client/Response.h \ + qpid/client/Results.h \ qpid/client/SessionBase.h \ qpid/client/Session.h \ - qpid/client/SessionCore.h \ + qpid/client/SessionImpl.h \ qpid/client/StateManager.h \ qpid/client/SubscriptionManager.h \ qpid/client/TypedResult.h \ diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp index ea964ef3a3..b83a275959 100644 --- a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -106,17 +106,17 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const stri getBroker().getExchanges().destroy(name); } -ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) +ExchangeXQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) { try { Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); - return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); + return ExchangeXQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); } catch (const ChannelException& e) { - return ExchangeQueryResult("", false, true, FieldTable()); + return ExchangeXQueryResult("", false, true, FieldTable()); } } -BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, +BindingXQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, const std::string& exchangeName, const std::string& queueName, const std::string& key, @@ -133,27 +133,27 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/ } if (!exchange) { - return BindingQueryResult(true, false, false, false, false); + return BindingXQueryResult(true, false, false, false, false); } else if (!queueName.empty() && !queue) { - return BindingQueryResult(false, true, false, false, false); + return BindingXQueryResult(false, true, false, false, false); } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { - return BindingQueryResult(false, false, false, false, false); + return BindingXQueryResult(false, false, false, false, false); } else { //need to test each specified option individually bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); - return BindingQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched); + return BindingXQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched); } } -QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) +QueueXQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) { Queue::shared_ptr queue = state.getQueue(name); Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); - return QueueQueryResult(queue->getName(), + return QueueXQueryResult(queue->getName(), alternateExchange ? alternateExchange->getName() : "", queue->isDurable(), queue->hasExclusiveOwner(), diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.h b/qpid/cpp/src/qpid/broker/BrokerAdapter.h index b28c4ebdcc..26dfe802e1 100644 --- a/qpid/cpp/src/qpid/broker/BrokerAdapter.h +++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.h @@ -111,7 +111,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations const qpid::framing::FieldTable& arguments); void delete_(uint16_t ticket, const std::string& exchange, bool ifUnused); - framing::ExchangeQueryResult query(u_int16_t ticket, + framing::ExchangeXQueryResult query(u_int16_t ticket, const std::string& name); private: void checkType(shared_ptr<Exchange> exchange, const std::string& type); @@ -127,7 +127,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations public: BindingHandlerImpl(SemanticState& session) : HandlerImpl(session) {} - framing::BindingQueryResult query(u_int16_t ticket, + framing::BindingXQueryResult query(u_int16_t ticket, const std::string& exchange, const std::string& queue, const std::string& routingKey, @@ -154,7 +154,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations const std::string& exchange, const std::string& routingKey, const qpid::framing::FieldTable& arguments ); - framing::QueueQueryResult query(const std::string& queue); + framing::QueueXQueryResult query(const std::string& queue); void purge(uint16_t ticket, const std::string& queue); void delete_(uint16_t ticket, const std::string& queue, bool ifUnused, bool ifEmpty); diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 0e91c081c0..79f9064b9d 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -22,8 +22,6 @@ #include "ConnectionHandler.h" #include "Connection.h" -#include "qpid/framing/ConnectionStartBody.h" -#include "qpid/framing/Connection010StartBody.h" #include "qpid/framing/ClientInvoker.h" #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" diff --git a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 533872e849..61ab856fa9 100644 --- a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -36,7 +36,7 @@ void DtxHandlerImpl::select() state.selectDtx(); } -DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, +DtxDemarcationXEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, const string& xid, bool fail, bool suspend) @@ -47,7 +47,7 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, if (suspend) { throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); } else { - return DtxDemarcationEndResult(XA_RBROLLBACK); + return DtxDemarcationXEndResult(XA_RBROLLBACK); } } else { if (suspend) { @@ -55,14 +55,14 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, } else { state.endDtx(xid, false); } - return DtxDemarcationEndResult(XA_OK); + return DtxDemarcationXEndResult(XA_OK); } } catch (const DtxTimeoutException& e) { - return DtxDemarcationEndResult(XA_RBTIMEOUT); + return DtxDemarcationXEndResult(XA_RBTIMEOUT); } } -DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/, +DtxDemarcationXStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/, const string& xid, bool join, bool resume) @@ -76,50 +76,50 @@ DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/, } else { state.startDtx(xid, getBroker().getDtxManager(), join); } - return DtxDemarcationStartResult(XA_OK); + return DtxDemarcationXStartResult(XA_OK); } catch (const DtxTimeoutException& e) { - return DtxDemarcationStartResult(XA_RBTIMEOUT); + return DtxDemarcationXStartResult(XA_RBTIMEOUT); } } // DtxCoordinationHandler: -DtxCoordinationPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/, +DtxCoordinationXPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/, const string& xid) { try { bool ok = getBroker().getDtxManager().prepare(xid); - return DtxCoordinationPrepareResult(ok ? XA_OK : XA_RBROLLBACK); + return DtxCoordinationXPrepareResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - return DtxCoordinationPrepareResult(XA_RBTIMEOUT); + return DtxCoordinationXPrepareResult(XA_RBTIMEOUT); } } -DtxCoordinationCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/, +DtxCoordinationXCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/, const string& xid, bool onePhase) { try { bool ok = getBroker().getDtxManager().commit(xid, onePhase); - return DtxCoordinationCommitResult(ok ? XA_OK : XA_RBROLLBACK); + return DtxCoordinationXCommitResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - return DtxCoordinationCommitResult(XA_RBTIMEOUT); + return DtxCoordinationXCommitResult(XA_RBTIMEOUT); } } -DtxCoordinationRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/, +DtxCoordinationXRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/, const string& xid ) { try { getBroker().getDtxManager().rollback(xid); - return DtxCoordinationRollbackResult(XA_OK); + return DtxCoordinationXRollbackResult(XA_OK); } catch (const DtxTimeoutException& e) { - return DtxCoordinationRollbackResult(XA_RBTIMEOUT); + return DtxCoordinationXRollbackResult(XA_RBTIMEOUT); } } -DtxCoordinationRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/, +DtxCoordinationXRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/, bool /*startscan*/, bool /*endscan*/ ) { @@ -144,7 +144,7 @@ DtxCoordinationRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/, data.push_back(*i); } Array indoubt(data); - return DtxCoordinationRecoverResult(indoubt); + return DtxCoordinationXRecoverResult(indoubt); } void DtxHandlerImpl::forget(u_int16_t /*ticket*/, @@ -154,10 +154,10 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/, throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!")); } -DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid) +DtxCoordinationXGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid) { uint32_t timeout = getBroker().getDtxManager().getTimeout(xid); - return DtxCoordinationGetTimeoutResult(timeout); + return DtxCoordinationXGetTimeoutResult(timeout); } diff --git a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h index 5bc9d5142a..efb56dba95 100644 --- a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h +++ b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h @@ -36,27 +36,27 @@ public: // DtxCoordinationHandler: - framing::DtxCoordinationCommitResult commit(u_int16_t ticket, const std::string& xid, bool onePhase); + framing::DtxCoordinationXCommitResult commit(u_int16_t ticket, const std::string& xid, bool onePhase); void forget(u_int16_t ticket, const std::string& xid); - framing::DtxCoordinationGetTimeoutResult getTimeout(const std::string& xid); + framing::DtxCoordinationXGetTimeoutResult getTimeout(const std::string& xid); - framing::DtxCoordinationPrepareResult prepare(u_int16_t ticket, const std::string& xid); + framing::DtxCoordinationXPrepareResult prepare(u_int16_t ticket, const std::string& xid); - framing::DtxCoordinationRecoverResult recover(u_int16_t ticket, bool startscan, bool endscan); + framing::DtxCoordinationXRecoverResult recover(u_int16_t ticket, bool startscan, bool endscan); - framing::DtxCoordinationRollbackResult rollback(u_int16_t ticket, const std::string& xid); + framing::DtxCoordinationXRollbackResult rollback(u_int16_t ticket, const std::string& xid); void setTimeout(u_int16_t ticket, const std::string& xid, u_int32_t timeout); // DtxDemarcationHandler: - framing::DtxDemarcationEndResult end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend); + framing::DtxDemarcationXEndResult end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend); void select(); - framing::DtxDemarcationStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume); + framing::DtxDemarcationXStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume); }; diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index b60a95228d..297e610418 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -22,9 +22,9 @@ #include "Message.h" #include "ExchangeRegistry.h" #include "qpid/framing/frame_functors.h" -#include "qpid/framing/BasicPublishBody.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/MessageXTransferBody.h" #include "qpid/framing/SendContent.h" #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/TypeFilter.h" @@ -225,9 +225,9 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) MessageAdapter& Message::getAdapter() const { if (!adapter) { - if(frames.isA<MessageTransferBody>()) { + if(frames.isA<MessageXTransferBody>()) { adapter = &TRANSFER_99_0; - } else if(frames.isA<Message010TransferBody>()) { + } else if(frames.isA<MessageTransferBody>()) { adapter = &TRANSFER; } else { const AMQMethodBody* method = frames.getMethod(); diff --git a/qpid/cpp/src/qpid/broker/MessageAdapter.cpp b/qpid/cpp/src/qpid/broker/MessageAdapter.cpp index 0e99d923d4..013e2c91ac 100644 --- a/qpid/cpp/src/qpid/broker/MessageAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/MessageAdapter.cpp @@ -21,6 +21,11 @@ #include "MessageAdapter.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/MessageXTransferBody.h" + namespace { const std::string empty; } @@ -30,13 +35,13 @@ namespace broker{ std::string TransferAdapter::getRoutingKey(const framing::FrameSet& f) { - const framing::DeliveryProperties010* p = f.getHeaders()->get<framing::DeliveryProperties010>(); + const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); return p ? p->getRoutingKey() : empty; } std::string TransferAdapter::getExchange(const framing::FrameSet& f) { - return f.as<framing::Message010TransferBody>()->getDestination(); + return f.as<framing::MessageTransferBody>()->getDestination(); } bool TransferAdapter::isImmediate(const framing::FrameSet&) @@ -47,42 +52,42 @@ namespace broker{ const framing::FieldTable* TransferAdapter::getApplicationHeaders(const framing::FrameSet& f) { - const framing::MessageProperties010* p = f.getHeaders()->get<framing::MessageProperties010>(); + const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>(); return p ? &(p->getApplicationHeaders()) : 0; } bool TransferAdapter::isPersistent(const framing::FrameSet& f) { - const framing::DeliveryProperties010* p = f.getHeaders()->get<framing::DeliveryProperties010>(); + const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); return p && p->getDeliveryMode() == 2; } bool TransferAdapter::requiresAccept(const framing::FrameSet& f) { - const framing::Message010TransferBody* b = f.as<framing::Message010TransferBody>(); + const framing::MessageTransferBody* b = f.as<framing::MessageTransferBody>(); return b && b->getAcceptMode() == 0/*EXPLICIT == 0*/; } std::string PreviewAdapter::getExchange(const framing::FrameSet& f) { - return f.as<framing::MessageTransferBody>()->getDestination(); + return f.as<framing::MessageXTransferBody>()->getDestination(); } std::string PreviewAdapter::getRoutingKey(const framing::FrameSet& f) { - const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); + const framing::PreviewDeliveryProperties* p = f.getHeaders()->get<framing::PreviewDeliveryProperties>(); return p ? p->getRoutingKey() : empty; } const framing::FieldTable* PreviewAdapter::getApplicationHeaders(const framing::FrameSet& f) { - const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>(); + const framing::PreviewMessageProperties* p = f.getHeaders()->get<framing::PreviewMessageProperties>(); return p ? &(p->getApplicationHeaders()) : 0; } bool PreviewAdapter::isPersistent(const framing::FrameSet& f) { - const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); + const framing::PreviewDeliveryProperties* p = f.getHeaders()->get<framing::PreviewDeliveryProperties>(); return p && p->getDeliveryMode() == 2; } diff --git a/qpid/cpp/src/qpid/broker/MessageAdapter.h b/qpid/cpp/src/qpid/broker/MessageAdapter.h index 9759f320ac..4c13e756e9 100644 --- a/qpid/cpp/src/qpid/broker/MessageAdapter.h +++ b/qpid/cpp/src/qpid/broker/MessageAdapter.h @@ -23,13 +23,8 @@ */ #include <string> -#include "qpid/framing/BasicPublishBody.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FrameSet.h" -#include "qpid/framing/DeliveryProperties.h" -#include "qpid/framing/MessageProperties.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/Message010TransferBody.h" namespace qpid { namespace broker { diff --git a/qpid/cpp/src/qpid/broker/MessageDelivery.cpp b/qpid/cpp/src/qpid/broker/MessageDelivery.cpp index 9ef7090cd9..36862edf37 100644 --- a/qpid/cpp/src/qpid/broker/MessageDelivery.cpp +++ b/qpid/cpp/src/qpid/broker/MessageDelivery.cpp @@ -24,9 +24,10 @@ #include "Message.h" #include "Queue.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/framing/BasicDeliverBody.h" -#include "qpid/framing/BasicGetOkBody.h" +#include "qpid/framing/BasicXDeliverBody.h" +#include "qpid/framing/BasicXGetOkBody.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/MessageXTransferBody.h" using namespace boost; @@ -52,7 +53,7 @@ struct BasicGetToken : BaseToken AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) { - return AMQFrame(in_place<BasicGetOkBody>( + return AMQFrame(in_place<BasicXGetOkBody>( ProtocolVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey(), queue->getMessageCount())); @@ -69,7 +70,7 @@ struct BasicConsumeToken : BaseToken AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) { - return AMQFrame(in_place<BasicDeliverBody>( + return AMQFrame(in_place<BasicXDeliverBody>( ProtocolVersion(), consumer, id.getValue(), msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey())); @@ -92,16 +93,16 @@ struct MessageDeliveryToken : BaseToken //may need to set the redelivered flag: if (isPreview) { if (msg->getRedelivered()){ - msg->getProperties<DeliveryProperties>()->setRedelivered(true); + msg->getProperties<PreviewDeliveryProperties>()->setRedelivered(true); } - return AMQFrame(in_place<MessageTransferBody>( + return AMQFrame(in_place<MessageXTransferBody>( ProtocolVersion(), 0, destination, confirmMode, acquireMode)); } else { if (msg->getRedelivered()){ - msg->getProperties<DeliveryProperties010>()->setRedelivered(true); + msg->getProperties<DeliveryProperties>()->setRedelivered(true); } - return AMQFrame(in_place<Message010TransferBody>( + return AMQFrame(in_place<MessageTransferBody>( ProtocolVersion(), destination, confirmMode, acquireMode)); } } diff --git a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 64c0282963..5e0e759dfb 100644 --- a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -23,8 +23,6 @@ #include "Connection.h" #include "Broker.h" #include "MessageDelivery.h" -#include "qpid/framing/MessageAppendBody.h" -#include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/reply_exceptions.h" #include "BrokerAdapter.h" diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp index eb45ff1492..411e0ce3c0 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp @@ -24,8 +24,7 @@ #include "SessionContext.h" #include "BrokerAdapter.h" #include "MessageDelivery.h" -#include "qpid/framing/ExecutionCompleteBody.h" -#include "qpid/framing/ExecutionResultBody.h" +#include "qpid/framing/ExecutionXCompleteBody.h" #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" @@ -182,7 +181,7 @@ SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) classId = frame.castBody<AMQMethodBody>()->amqpClassId(); switch (classId) { - case ExecutionCompleteBody::CLASS_ID: + case ExecutionXCompleteBody::CLASS_ID: return EXECUTION_CONTROL_TRACK; } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 2251901340..c2f6e3c307 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -34,7 +34,7 @@ #include "TxPublish.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/Message010TransferBody.h" +#include "qpid/framing/MessageXTransferBody.h" #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" @@ -351,10 +351,11 @@ void SemanticState::handle(intrusive_ptr<Message> msg) { void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); - if (msg->isA<MessageTransferBody>()) { + //TODO: the following should be hidden behind message (using MessageAdapter or similar) + if (msg->isA<MessageXTransferBody>()) { + msg->getProperties<PreviewDeliveryProperties>()->setExchange(exchangeName); + } else if (msg->isA<MessageTransferBody>()) { msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); - } else if (msg->isA<Message010TransferBody>()) { - msg->getProperties<DeliveryProperties010>()->setExchange(exchangeName); } if (!cacheExchange || cacheExchange->getName() != exchangeName){ cacheExchange = session.getBroker().getExchanges().get(exchangeName); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index b7985e9ed8..3ad29e6271 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -107,13 +107,13 @@ void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifU getBroker().getExchanges().destroy(name); } -Exchange010QueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name) +ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name) { try { Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); - return Exchange010QueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); + return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); } catch (const ChannelException& e) { - return Exchange010QueryResult("", false, true, FieldTable()); + return ExchangeQueryResult("", false, true, FieldTable()); } } void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, @@ -154,7 +154,7 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, } -Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName, +ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName, const std::string& queueName, const std::string& key, const framing::FieldTable& args) @@ -170,18 +170,18 @@ Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::str } if (!exchange) { - return Exchange010BoundResult(true, false, false, false, false); + return ExchangeBoundResult(true, false, false, false, false); } else if (!queueName.empty() && !queue) { - return Exchange010BoundResult(false, true, false, false, false); + return ExchangeBoundResult(false, true, false, false, false); } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { - return Exchange010BoundResult(false, false, false, false, false); + return ExchangeBoundResult(false, false, false, false, false); } else { //need to test each specified option individually bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); - return Exchange010BoundResult(false, false, !queueMatched, !keyMatched, !argsMatched); + return ExchangeBoundResult(false, false, !queueMatched, !keyMatched, !argsMatched); } } @@ -191,6 +191,11 @@ SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : Han SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl() { + destroyExclusiveQueues(); +} + +void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues() +{ while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); @@ -200,6 +205,7 @@ SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl() exclusiveQueues.erase(exclusiveQueues.begin()); } } + bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const { @@ -207,12 +213,12 @@ bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const } -Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) +QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) { Queue::shared_ptr queue = getQueue(name); Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); - return Queue010QueryResult(queue->getName(), + return QueueQueryResult(queue->getName(), alternateExchange ? alternateExchange->getName() : "", queue->isDurable(), queue->hasExclusiveOwner(), @@ -313,6 +319,7 @@ void SessionAdapter::MessageHandlerImpl::transfer(const string& /*destination*/, uint8_t /*acquireMode*/) { //not yet used (content containing assemblies treated differently at present + std::cout << "SessionAdapter::MessageHandlerImpl::transfer() called" << std::endl; } void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, bool setRedelivered) @@ -396,7 +403,7 @@ void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& comm commands.for_each(acceptOp); } -framing::Message010AcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers) +framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers) { //TODO: change this when SequenceNumberSet is deleted along with preview code SequenceNumberSet results; @@ -408,7 +415,7 @@ framing::Message010AcquireResult SessionAdapter::MessageHandlerImpl::acquire(con RangedOperation g = boost::bind(&SequenceSet::add, &acquisitions, _1, _2); results.processRanges(g); - return Message010AcquireResult(acquisitions); + return MessageAcquireResult(acquisitions); } @@ -450,7 +457,7 @@ void SessionAdapter::TxHandlerImpl::rollback() state.rollback(); } -std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid010& xid) +std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid& xid) { std::string encoded; encode(xid, encoded); @@ -462,7 +469,7 @@ void SessionAdapter::DtxHandlerImpl::select() state.selectDtx(); } -Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid, +DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid, bool fail, bool suspend) { @@ -472,7 +479,7 @@ Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid, if (suspend) { throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); } else { - return Dtx010EndResult(XA_RBROLLBACK); + return DtxEndResult(XA_RBROLLBACK); } } else { if (suspend) { @@ -480,14 +487,14 @@ Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid, } else { state.endDtx(convert(xid), false); } - return Dtx010EndResult(XA_OK); + return DtxEndResult(XA_OK); } } catch (const DtxTimeoutException& e) { - return Dtx010EndResult(XA_RBTIMEOUT); + return DtxEndResult(XA_RBTIMEOUT); } } -Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid, +DtxStartResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid, bool join, bool resume) { @@ -500,45 +507,45 @@ Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid, } else { state.startDtx(convert(xid), getBroker().getDtxManager(), join); } - return Dtx010StartResult(XA_OK); + return DtxStartResult(XA_OK); } catch (const DtxTimeoutException& e) { - return Dtx010StartResult(XA_RBTIMEOUT); + return DtxStartResult(XA_RBTIMEOUT); } } -Dtx010PrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid010& xid) +DtxPrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid) { try { bool ok = getBroker().getDtxManager().prepare(convert(xid)); - return Dtx010PrepareResult(ok ? XA_OK : XA_RBROLLBACK); + return DtxPrepareResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - return Dtx010PrepareResult(XA_RBTIMEOUT); + return DtxPrepareResult(XA_RBTIMEOUT); } } -Dtx010CommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid010& xid, +DtxCommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid, bool onePhase) { try { bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase); - return Dtx010CommitResult(ok ? XA_OK : XA_RBROLLBACK); + return DtxCommitResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - return Dtx010CommitResult(XA_RBTIMEOUT); + return DtxCommitResult(XA_RBTIMEOUT); } } -Dtx010RollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid010& xid) +DtxRollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid) { try { getBroker().getDtxManager().rollback(convert(xid)); - return Dtx010RollbackResult(XA_OK); + return DtxRollbackResult(XA_OK); } catch (const DtxTimeoutException& e) { - return Dtx010RollbackResult(XA_RBTIMEOUT); + return DtxRollbackResult(XA_RBTIMEOUT); } } -Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover() +DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover() { std::set<std::string> xids; getBroker().getStore().collectPreparedXids(xids); @@ -550,23 +557,23 @@ Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover() boost::shared_ptr<FieldValue> xid(new Struct32Value(*i)); indoubt.add(xid); } - return Dtx010RecoverResult(indoubt); + return DtxRecoverResult(indoubt); } -void SessionAdapter::DtxHandlerImpl::forget(const Xid010& xid) +void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid) { //Currently no heuristic completion is supported, so this should never be used. throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!")); } -Dtx010GetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid010& xid) +DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid) { uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid)); - return Dtx010GetTimeoutResult(timeout); + return DtxGetTimeoutResult(timeout); } -void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid010& xid, +void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid, u_int32_t timeout) { getBroker().getDtxManager().setTimeout(convert(xid), timeout); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.h b/qpid/cpp/src/qpid/broker/SessionAdapter.h index 0cbbd13777..a80e2b0776 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.h +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.h @@ -83,6 +83,8 @@ class Queue; Connection010Handler* getConnection010Handler() { throw framing::NotImplementedException("Class not implemented"); } Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); } + void destroyExclusiveQueues() { queueImpl.destroyExclusiveQueues(); } + private: //common base for utility methods etc that are specific to this adapter struct HandlerHelper : public HandlerImpl @@ -105,14 +107,14 @@ class Queue; bool passive, bool durable, bool autoDelete, const qpid::framing::FieldTable& arguments); void delete_(const std::string& exchange, bool ifUnused); - framing::Exchange010QueryResult query(const std::string& name); + framing::ExchangeQueryResult query(const std::string& name); void bind(const std::string& queue, const std::string& exchange, const std::string& routingKey, const qpid::framing::FieldTable& arguments); void unbind(const std::string& queue, const std::string& exchange, const std::string& routingKey); - framing::Exchange010BoundResult bound(const std::string& exchange, + framing::ExchangeBoundResult bound(const std::string& exchange, const std::string& queue, const std::string& routingKey, const framing::FieldTable& arguments); @@ -141,8 +143,10 @@ class Queue; void delete_(const std::string& queue, bool ifUnused, bool ifEmpty); void purge(const std::string& queue); - framing::Queue010QueryResult query(const std::string& queue); + framing::QueueQueryResult query(const std::string& queue); bool isLocal(const ConnectionToken* t) const; + + void destroyExclusiveQueues(); }; class MessageHandlerImpl : @@ -170,7 +174,7 @@ class Queue; void release(const framing::SequenceSet& commands, bool setRedelivered); - framing::Message010AcquireResult acquire(const framing::SequenceSet&); + framing::MessageAcquireResult acquire(const framing::SequenceSet&); void subscribe(const string& queue, const string& destination, @@ -225,35 +229,35 @@ class Queue; class DtxHandlerImpl : public Dtx010Handler, public HandlerHelper, private framing::StructHelper { - std::string convert(const framing::Xid010& xid); + std::string convert(const framing::Xid& xid); public: DtxHandlerImpl(SemanticState& session) : HandlerHelper(session) {} void select(); - framing::Dtx010StartResult start(const framing::Xid010& xid, + framing::DtxStartResult start(const framing::Xid& xid, bool join, bool resume); - framing::Dtx010EndResult end(const framing::Xid010& xid, + framing::DtxEndResult end(const framing::Xid& xid, bool fail, bool suspend); - framing::Dtx010CommitResult commit(const framing::Xid010& xid, + framing::DtxCommitResult commit(const framing::Xid& xid, bool onePhase); - void forget(const framing::Xid010& xid); + void forget(const framing::Xid& xid); - framing::Dtx010GetTimeoutResult getTimeout(const framing::Xid010& xid); + framing::DtxGetTimeoutResult getTimeout(const framing::Xid& xid); - framing::Dtx010PrepareResult prepare(const framing::Xid010& xid); + framing::DtxPrepareResult prepare(const framing::Xid& xid); - framing::Dtx010RecoverResult recover(); + framing::DtxRecoverResult recover(); - framing::Dtx010RollbackResult rollback(const framing::Xid010& xid); + framing::DtxRollbackResult rollback(const framing::Xid& xid); - void setTimeout(const framing::Xid010& xid, uint32_t timeout); + void setTimeout(const framing::Xid& xid, uint32_t timeout); }; ExchangeHandlerImpl exchangeImpl; diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 3baa3a89a7..0b1e744e25 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -77,12 +77,6 @@ void SessionHandler::handleIn(AMQFrame& f) { } } -void SessionHandler::destroy() { - ignoring=true; // Ignore trailing frames sent by client. - session->detach(); - session.reset(); -} - void SessionHandler::handleOut(AMQFrame& f) { channel.handle(f); // Send it. if (session->sent(f)) @@ -160,12 +154,12 @@ void SessionHandler::detached(const std::string& name, uint8_t code) void SessionHandler::requestTimeout(uint32_t t) { session->setTimeout(t); - //proxy.timeout(t); + peerSession.timeout(t); } -void SessionHandler::timeout(uint32_t) +void SessionHandler::timeout(uint32_t t) { - //not sure what we need to do on the server for this... + session->setTimeout(t); } void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset) diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index fa013a1c15..4b031f2951 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -70,7 +70,6 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler, void localSuspend(); void detach() { localSuspend(); } void sendCompletion(); - void destroy(); protected: void handleIn(framing::AMQFrame&); diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 64d62934b9..3c6bed4344 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -25,6 +25,9 @@ #include "SemanticHandler.h" #include "SessionManager.h" #include "SessionHandler.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/ServerInvoker.h" @@ -182,7 +185,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& completed.add(id); if (!invocation.wasHandled()) { - throw NotImplementedException("Not implemented"); + throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { nextOut++;//execution result is now a command, so the counter must be incremented getProxy().getExecution010().result(id, invocation.getResult()); @@ -206,6 +209,14 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) } msgBuilder.handle(frame); if (frame.getEof() && frame.getEos()) {//end of frameset + if (frame.getBof()) { + //i.e this is a just a command frame, add a dummy header + AMQFrame header; + header.setBody(AMQHeaderBody()); + header.setBof(false); + header.setEof(false); + msg->getFrames().append(header); + } msg->setPublisher(&getConnection()); semanticState.handle(msg); msgBuilder.end(); @@ -242,13 +253,14 @@ void SessionState::handle(AMQFrame& frame) SequenceNumber commandId; try { //TODO: make command handling more uniform, regardless of whether - //commands carry content. (For now, assume all single frame - //assemblies are non-content bearing and all content-bearing - //assemblies will have more than one frame): - if (frame.getBof() && frame.getEof()) { - handleCommand(frame.getMethod(), commandId); - } else { + //commands carry content. + AMQMethodBody* m = frame.getMethod(); + if (m == 0 || m->isContentBearing()) { handleContent(frame, commandId); + } else if (frame.getBof() && frame.getEof()) { + handleCommand(frame.getMethod(), commandId); + } else { + throw InternalErrorException("Cannot handle multi-frame command segments yet"); } } catch(const SessionException& e) { //TODO: better implementation of new exception handling mechanism @@ -263,7 +275,11 @@ void SessionState::handle(AMQFrame& frame) } else { getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable()); } - handler->destroy(); + timeout = 0; + //The python client doesn't currently detach on receiving an exception + //so the session state isn't destroyed. This is a temporary workaround + //until that is addressed + adapter.destroyExclusiveQueues(); } } diff --git a/qpid/cpp/src/qpid/client/Channel.cpp b/qpid/cpp/src/qpid/client/Channel.cpp index 4af69c8552..ae9f78483d 100644 --- a/qpid/cpp/src/qpid/client/Channel.cpp +++ b/qpid/cpp/src/qpid/client/Channel.cpp @@ -26,7 +26,6 @@ #include "Message.h" #include "Connection.h" #include "Demux.h" -#include "FutureResponse.h" #include "MessageListener.h" #include "MessageQueue.h" #include <boost/format.hpp> @@ -47,9 +46,17 @@ const std::string empty; class ScopedSync { Session& session; + const bool change; + const bool value; public: - ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } - ~ScopedSync() { session.setSynchronous(false); } + ScopedSync(Session& s, bool desired = true) : session(s), change(s.isSynchronous() != desired), value(desired) + { + if (change) session.setSynchronous(value); + } + ~ScopedSync() + { + if (change) session.setSynchronous(!value); + } }; Channel::Channel(bool _transactional, u_int16_t _prefetch) : @@ -116,7 +123,7 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri string e = exchange.getName(); string q = queue.getName(); ScopedSync s(session, synch); - session.queueBind(0, q, e, key, args); + session.exchangeBind(q, e, key, args); } void Channel::commit(){ @@ -129,7 +136,7 @@ void Channel::rollback(){ void Channel::consume( Queue& _queue, const std::string& tag, MessageListener* listener, - AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { + AckMode ackMode, bool noLocal, bool synch, FieldTable* fields) { if (tag.empty()) { throw Exception("A tag must be specified for a consumer."); @@ -144,13 +151,18 @@ void Channel::consume( c.ackMode = ackMode; c.count = 0; } - uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1; + uint8_t confirmMode = ackMode == NO_ACK ? 1 : 0; ScopedSync s(session, synch); - session.messageSubscribe(0, _queue.getName(), tag, noLocal, + FieldTable ft; + FieldTable* ftptr = fields ? fields : &ft; + if (noLocal) { + ftptr->setString("qpid.no-local","yes"); + } + session.messageSubscribe(_queue.getName(), tag, confirmMode, 0/*pre-acquire*/, - false, fields ? *fields : FieldTable()); + false, "", 0, *ftptr); if (!prefetch) { - session.messageFlowMode(tag, 0/*credit based*/); + session.messageSetFlowMode(tag, 0/*credit based*/); } //allocate some credit: @@ -177,17 +189,22 @@ bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { ScopedDivert handler(tag, session.getExecution().getDemux()); Demux::QueuePtr incoming = handler.getQueue(); - session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)); + session.messageSubscribe(destination=tag, queue=_queue.getName(), acceptMode=(ackMode == NO_ACK ? 1 : 0)); session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); session.messageFlow(tag, 0/*MESSAGES*/, 1); - Completion status = session.messageFlush(tag); - status.sync(); + { + ScopedSync s(session); + session.messageFlush(tag); + } session.messageCancel(tag); FrameSet::shared_ptr p; if (incoming->tryPop(p)) { msg.populate(*p); - if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true); + if (ackMode == AUTO_ACK) { + msg.setSession(session); + msg.acknowledge(false, true); + } return true; } else @@ -243,7 +260,7 @@ void Channel::dispatch(FrameSet& content, const std::string& destination) bool send = i->second.ackMode == AUTO_ACK || (prefetch && ++(i->second.count) > (prefetch / 2)); if (send) i->second.count = 0; - session.getExecution().completed(content.getId(), true, send); + session.getExecution().markCompleted(content.getId(), true, send); } } else { QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); diff --git a/qpid/cpp/src/qpid/client/Channel.h b/qpid/cpp/src/qpid/client/Channel.h index 2cda97dc63..1c3c2c9ae8 100644 --- a/qpid/cpp/src/qpid/client/Channel.h +++ b/qpid/cpp/src/qpid/client/Channel.h @@ -256,7 +256,7 @@ class Channel : private sys::Runnable void consume( Queue& queue, const std::string& tag, MessageListener* listener, AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0); + framing::FieldTable* fields = 0); /** * Cancels a subscription previously set up through a call to consume(). diff --git a/qpid/cpp/src/qpid/client/Completion.h b/qpid/cpp/src/qpid/client/Completion.h index 4d324aaf28..19d5b31777 100644 --- a/qpid/cpp/src/qpid/client/Completion.h +++ b/qpid/cpp/src/qpid/client/Completion.h @@ -24,7 +24,7 @@ #include <boost/shared_ptr.hpp> #include "Future.h" -#include "SessionCore.h" +#include "SessionImpl.h" namespace qpid { namespace client { @@ -33,17 +33,12 @@ class Completion { protected: Future future; - shared_ptr<SessionCore> session; + shared_ptr<SessionImpl> session; public: Completion() {} - Completion(Future f, shared_ptr<SessionCore> s) : future(f), session(s) {} - - void sync() - { - future.sync(*session); - } + Completion(Future f, shared_ptr<SessionImpl> s) : future(f), session(s) {} void wait() { @@ -53,10 +48,6 @@ public: bool isComplete() { return future.isComplete(*session); } - - bool isCompleteUpTo() { - return future.isCompleteUpTo(*session); - } }; }} diff --git a/qpid/cpp/src/qpid/client/CompletionTracker.cpp b/qpid/cpp/src/qpid/client/CompletionTracker.cpp deleted file mode 100644 index 76ea9dec51..0000000000 --- a/qpid/cpp/src/qpid/client/CompletionTracker.cpp +++ /dev/null @@ -1,121 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "CompletionTracker.h" -#include <algorithm> - -using qpid::client::CompletionTracker; -using namespace qpid::framing; -using namespace boost; - -namespace -{ -const std::string empty; -} - -CompletionTracker::CompletionTracker() : closed(false) {} -CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {} - -void CompletionTracker::close() -{ - sys::Mutex::ScopedLock l(lock); - closed=true; - while (!listeners.empty()) { - Record r(listeners.front()); - { - sys::Mutex::ScopedUnlock u(lock); - r.completed(); - } - listeners.pop_front(); - } -} - - -void CompletionTracker::completed(const SequenceNumber& _mark) -{ - sys::Mutex::ScopedLock l(lock); - mark = _mark; - while (!listeners.empty() && !(listeners.front().id > mark)) { - Record r(listeners.front()); - listeners.pop_front(); - { - sys::Mutex::ScopedUnlock u(lock); - r.completed(); - } - } -} - -void CompletionTracker::received(const SequenceNumber& id, const std::string& result) -{ - sys::Mutex::ScopedLock l(lock); - Listeners::iterator i = seek(id); - if (i != listeners.end() && i->id == id) { - i->received(result); - listeners.erase(i); - } -} - -void CompletionTracker::listenForCompletion(const SequenceNumber& point, CompletionListener listener) -{ - if (!add(Record(point, listener))) { - listener(); - } -} - -void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListener listener) -{ - if (!add(Record(point, listener))) { - listener(empty); - } -} - -bool CompletionTracker::add(const Record& record) -{ - sys::Mutex::ScopedLock l(lock); - if (record.id <= mark || closed) { - return false; - } else { - //insert at the correct position - Listeners::iterator i = seek(record.id); - if (i == listeners.end()) i = listeners.begin(); - listeners.insert(i, record); - return true; - } -} - -CompletionTracker::Listeners::iterator CompletionTracker::seek(const framing::SequenceNumber& point) -{ - Listeners::iterator i = listeners.begin(); - while (i != listeners.end() && i->id < point) i++; - return i; -} - - -void CompletionTracker::Record::completed() -{ - if (f) f(); - else if(g) g(empty);//won't get a result if command is now complete -} - -void CompletionTracker::Record::received(const std::string& result) -{ - if (g) g(result); -} diff --git a/qpid/cpp/src/qpid/client/CompletionTracker.h b/qpid/cpp/src/qpid/client/CompletionTracker.h deleted file mode 100644 index 55f7ff7531..0000000000 --- a/qpid/cpp/src/qpid/client/CompletionTracker.h +++ /dev/null @@ -1,77 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <list> -#include <boost/function.hpp> -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/SequenceNumber.h" -#include "qpid/sys/Mutex.h" - -#ifndef _CompletionTracker_ -#define _CompletionTracker_ - -namespace qpid { -namespace client { - -class CompletionTracker -{ -public: - typedef boost::function<void()> CompletionListener; - typedef boost::function<void(const std::string&)> ResultListener; - - CompletionTracker(); - CompletionTracker(const framing::SequenceNumber& mark); - void completed(const framing::SequenceNumber& mark); - void received(const framing::SequenceNumber& id, const std::string& result); - void listenForCompletion(const framing::SequenceNumber& point, CompletionListener l); - void listenForResult(const framing::SequenceNumber& point, ResultListener l); - void close(); - -private: - struct Record - { - framing::SequenceNumber id; - CompletionListener f; - ResultListener g; - - Record(const framing::SequenceNumber& _id, CompletionListener l) : id(_id), f(l) {} - Record(const framing::SequenceNumber& _id, ResultListener l) : id(_id), g(l) {} - void completed(); - void received(const std::string& result); - - }; - - typedef std::list<Record> Listeners; - bool closed; - - sys::Mutex lock; - framing::SequenceNumber mark; - Listeners listeners; - - bool add(const Record& r); - Listeners::iterator seek(const framing::SequenceNumber&); -}; - -} -} - - -#endif diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp index 872e04b3b5..25d1c510c8 100644 --- a/qpid/cpp/src/qpid/client/Connection.cpp +++ b/qpid/cpp/src/qpid/client/Connection.cpp @@ -25,7 +25,7 @@ #include "Connection.h" #include "Channel.h" #include "Message.h" -#include "SessionCore.h" +#include "SessionImpl.h" #include "qpid/log/Logger.h" #include "qpid/log/Options.h" #include "qpid/log/Statement.h" @@ -76,8 +76,8 @@ void Connection::openChannel(Channel& channel) { Session Connection::newSession(SynchronousMode sync, uint32_t detachedLifetime) { - shared_ptr<SessionCore> core( - new SessionCore(impl, ++channelIdCounter, max_frame_size)); + shared_ptr<SessionImpl> core( + new SessionImpl(impl, ++channelIdCounter, max_frame_size)); core->setSync(sync); impl->addSession(core); core->open(detachedLifetime); diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h index 81d9b972b6..d24809b31e 100644 --- a/qpid/cpp/src/qpid/client/Connection.h +++ b/qpid/cpp/src/qpid/client/Connection.h @@ -137,7 +137,7 @@ class Connection Session newSession(SynchronousMode sync, uint32_t detachedLifetime=0); /** - * Resume a suspendded session. A session may be resumed + * Resume a suspended session. A session may be resumed * on a different connection to the one that created it. */ void resume(Session& session); diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index e1c50c14fc..13de271e3b 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -24,6 +24,7 @@ #include "qpid/framing/amqp_framing.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/all_method_bodies.h" +#include "qpid/framing/ClientInvoker.h" using namespace qpid::client; using namespace qpid::framing; @@ -31,14 +32,21 @@ using namespace boost; namespace { const std::string OK("OK"); +const std::string PLAIN("PLAIN"); +const std::string en_US("en_US"); + +const std::string INVALID_STATE_START("start received in invalid state"); +const std::string INVALID_STATE_TUNE("tune received in invalid state"); +const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state"); +const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state"); } ConnectionHandler::ConnectionHandler() - : StateManager(NOT_STARTED) + : StateManager(NOT_STARTED), outHandler(*this), proxy(outHandler) { - mechanism = "PLAIN"; - locale = "en_US"; + mechanism = PLAIN; + locale = en_US; heartbeat = 0; maxChannels = 32767; maxFrameSize = 65535; @@ -52,34 +60,29 @@ ConnectionHandler::ConnectionHandler() void ConnectionHandler::incoming(AMQFrame& frame) { if (getState() == CLOSED) { - throw Exception("Connection is closed."); + throw Exception("Received frame on closed connection"); } + AMQBody* body = frame.getBody(); - if (frame.getChannel() == 0) { - if (body->getMethod()) { - handle(body->getMethod()); - } else { - error(503, "Cannot send content on channel zero."); - } - } else { - switch(getState()) { - case OPEN: - try { + try { + if (frame.getChannel() != 0 || !invoke(static_cast<ConnectionOperations&>(*this), *body)) { + switch(getState()) { + case OPEN: in(frame); - }catch(ConnectionException& e){ - error(e.code, e.what(), body); - }catch(std::exception& e){ - error(541/*internal error*/, e.what(), body); + break; + case CLOSING: + QPID_LOG(warning, "Ignoring frame while closing connection: " << frame); + break; + default: + throw Exception("Cannot receive frames on non-zero channel until connection is established."); } - break; - case CLOSING: - QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored."); - break; - default: - //must be in connection initialisation: - fail("Cannot receive frames on non-zero channel until connection is established."); } + }catch(std::exception& e){ + QPID_LOG(warning, "Closing connection due to " << e.what()); + setState(CLOSING); + proxy.close(501, e.what()); + if (onError) onError(501, e.what()); } } @@ -109,101 +112,77 @@ void ConnectionHandler::close() break; case OPEN: setState(CLOSING); - send(ConnectionCloseBody(version, 200, OK, 0, 0)); + proxy.close(200, OK); waitFor(CLOSED); break; // Nothing to do for CLOSING, CLOSED, FAILED or NOT_STARTED } } -void ConnectionHandler::send(const framing::AMQBody& body) +void ConnectionHandler::checkState(STATES s, const std::string& msg) { - AMQFrame f(body); - out(f); + if (getState() != s) { + throw CommandInvalidException(msg); + } } -void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId) +void ConnectionHandler::fail(const std::string& message) { - setState(CLOSING); - send(ConnectionCloseBody(version, code, message, classId, methodId)); + QPID_LOG(warning, message); + setState(FAILED); } -void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body) +void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& /*mechanisms*/, const Array& /*locales*/) { - if (onError) - onError(code, message); - AMQMethodBody* method = body->getMethod(); - if (method) - error(code, message, method->amqpClassId(), method->amqpMethodId()); - else - error(code, message); + checkState(NOT_STARTED, INVALID_STATE_START); + setState(NEGOTIATING); + //TODO: verify that desired mechanism and locale are supported + string response = ((char)0) + uid + ((char)0) + pwd; + proxy.startOk(properties, mechanism, response, locale); } +void ConnectionHandler::secure(const std::string& /*challenge*/) +{ + throw NotImplementedException("Challenge-response cycle not yet implemented in client"); +} -void ConnectionHandler::fail(const std::string& message) +void ConnectionHandler::tune(uint16_t channelMax, uint16_t /*frameMax*/, uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/) { - QPID_LOG(warning, message); - setState(FAILED); + checkState(NEGOTIATING, INVALID_STATE_TUNE); + //TODO: verify that desired heartbeat and max frame size are valid + maxChannels = channelMax; + proxy.tuneOk(maxChannels, maxFrameSize, heartbeat); + setState(OPENING); + proxy.open(vhost, capabilities, insist); } -void ConnectionHandler::handle(AMQMethodBody* method) +void ConnectionHandler::openOk(const framing::Array& /*knownHosts*/) { - switch (getState()) { - case NOT_STARTED: - if (method->isA<ConnectionStartBody>()) { - setState(NEGOTIATING); - string response = ((char)0) + uid + ((char)0) + pwd; - send(ConnectionStartOkBody(version, properties, mechanism, response, locale)); - } else { - fail("Bad method sequence, expected connection-start."); - } - break; - case NEGOTIATING: - if (method->isA<ConnectionTuneBody>()) { - ConnectionTuneBody* proposal=polymorphic_downcast<ConnectionTuneBody*>(method); - heartbeat = proposal->getHeartbeat(); - maxChannels = proposal->getChannelMax(); - send(ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat)); - setState(OPENING); - send(ConnectionOpenBody(version, vhost, capabilities, insist)); - //TODO: support for further security challenges - //} else if (method->isA<ConnectionSecureBody>()) { - } else { - fail("Unexpected method sequence, expected connection-tune."); - } - break; - case OPENING: - if (method->isA<ConnectionOpenOkBody>()) { - setState(OPEN); - //TODO: support for redirection - //} else if (method->isA<ConnectionRedirectBody>()) { - } else { - fail("Unexpected method sequence, expected connection-open-ok."); - } - break; - case OPEN: - if (method->isA<ConnectionCloseBody>()) { - send(ConnectionCloseOkBody(version)); - setState(CLOSED); - ConnectionCloseBody* c=polymorphic_downcast<ConnectionCloseBody*>(method); - QPID_LOG(warning, "Broker closed connection: " << c->getReplyCode() - << ", " << c->getReplyText()); - if (onError) { - onError(c->getReplyCode(), c->getReplyText()); - } - } else { - error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId()); - } - break; - case CLOSING: - if (method->isA<ConnectionCloseOkBody>()) { - if (onClose) { - onClose(); - } - setState(CLOSED); - } else { - QPID_LOG(warning, "Received frame on channel zero while closing connection; frame ignored."); - } - break; + checkState(OPENING, INVALID_STATE_OPEN_OK); + //TODO: store knownHosts for reconnection etc + setState(OPEN); +} + +void ConnectionHandler::redirect(const std::string& /*host*/, const Array& /*knownHosts*/) +{ + throw NotImplementedException("Redirection received from broker; not yet implemented in client"); +} + +void ConnectionHandler::close(uint16_t replyCode, const std::string& replyText) +{ + proxy.closeOk(); + setState(CLOSED); + QPID_LOG(warning, "Broker closed connection: " << replyCode << ", " << replyText); + if (onError) { + onError(replyCode, replyText); + } +} + +void ConnectionHandler::closeOk() +{ + checkState(CLOSING, INVALID_STATE_CLOSE_OK); + if (onClose) { + onClose(); } + setState(CLOSED); } diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.h b/qpid/cpp/src/qpid/client/ConnectionHandler.h index bb50495c06..b298b02701 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.h @@ -21,12 +21,16 @@ #ifndef _ConnectionHandler_ #define _ConnectionHandler_ +#include "ChainableFrameHandler.h" #include "Connector.h" #include "StateManager.h" -#include "ChainableFrameHandler.h" -#include "qpid/framing/InputHandler.h" -#include "qpid/framing/FieldTable.h" #include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/AMQP_ClientOperations.h" +#include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/framing/Array.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/InputHandler.h" namespace qpid { namespace client { @@ -39,7 +43,7 @@ struct ConnectionProperties framing::FieldTable properties; std::string mechanism; std::string locale; - std::string capabilities; + framing::Array capabilities; uint16_t heartbeat; uint16_t maxChannels; uint64_t maxFrameSize; @@ -48,17 +52,42 @@ struct ConnectionProperties }; class ConnectionHandler : private StateManager, - public ConnectionProperties, - public ChainableFrameHandler, - public framing::InputHandler + public ConnectionProperties, + public ChainableFrameHandler, + public framing::InputHandler, + private framing::AMQP_ClientOperations::Connection010Handler { + typedef framing::AMQP_ClientOperations::Connection010Handler ConnectionOperations; enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED}; std::set<int> ESTABLISHED; - void handle(framing::AMQMethodBody* method); - void send(const framing::AMQBody& body); - void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0); - void error(uint16_t code, const std::string& message, framing::AMQBody* body); + class Adapter : public framing::FrameHandler + { + ConnectionHandler& handler; + public: + Adapter(ConnectionHandler& h) : handler(h) {} + void handle(framing::AMQFrame& f) { handler.out(f); } + }; + + Adapter outHandler; + framing::AMQP_ServerProxy::Connection010 proxy; + + void checkState(STATES s, const std::string& msg); + + //methods corresponding to connection controls: + void start(const framing::FieldTable& serverProperties, + const framing::Array& mechanisms, + const framing::Array& locales); + void secure(const std::string& challenge); + void tune(uint16_t channelMax, + uint16_t frameMax, + uint16_t heartbeatMin, + uint16_t heartbeatMax); + void openOk(const framing::Array& knownHosts); + void redirect(const std::string& host, + const framing::Array& knownHosts); + void close(uint16_t replyCode, const std::string& replyText); + void closeOk(); public: using InputHandler::handle; diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index b248de8744..d1fd66ff26 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -23,7 +23,7 @@ #include "qpid/framing/reply_exceptions.h" #include "ConnectionImpl.h" -#include "SessionCore.h" +#include "SessionImpl.h" #include <boost/bind.hpp> #include <boost/format.hpp> @@ -32,6 +32,7 @@ using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; + ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c), isClosed(false), isClosing(false) { @@ -52,10 +53,10 @@ ConnectionImpl::~ConnectionImpl() { connector->close(); } -void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session) +void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session) { Mutex::ScopedLock l(lock); - boost::weak_ptr<SessionCore>& s = sessions[session->getChannel()]; + boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()]; if (s.lock()) throw ChannelBusyException(); s = session; } @@ -67,7 +68,7 @@ void ConnectionImpl::handle(framing::AMQFrame& frame) void ConnectionImpl::incoming(framing::AMQFrame& frame) { - boost::shared_ptr<SessionCore> s; + boost::shared_ptr<SessionImpl> s; { Mutex::ScopedLock l(lock); s = sessions[frame.getChannel()].lock(); @@ -122,7 +123,7 @@ ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedL connector->close(); SessionVector save; for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { - boost::shared_ptr<SessionCore> s = i->second.lock(); + boost::shared_ptr<SessionImpl> s = i->second.lock(); if (s) save.push_back(s); } sessions.clear(); @@ -135,7 +136,7 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) if (isClosed) return; SessionVector save(closeInternal(l)); Mutex::ScopedUnlock u(lock); - std::for_each(save.begin(), save.end(), boost::bind(&SessionCore::connectionClosed, _1, code, text)); + std::for_each(save.begin(), save.end(), boost::bind(&SessionImpl::connectionClosed, _1, code, text)); } static const std::string CONN_CLOSED("Connection closed by broker"); @@ -148,7 +149,7 @@ void ConnectionImpl::shutdown() handler.fail(CONN_CLOSED); Mutex::ScopedUnlock u(lock); std::for_each(save.begin(), save.end(), - boost::bind(&SessionCore::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED)); + boost::bind(&SessionImpl::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED)); } void ConnectionImpl::erase(uint16_t ch) { diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.h b/qpid/cpp/src/qpid/client/ConnectionImpl.h index bf8226a776..d0df9238f2 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.h @@ -35,15 +35,15 @@ namespace qpid { namespace client { -class SessionCore; +class SessionImpl; class ConnectionImpl : public framing::FrameHandler, public sys::TimeoutHandler, public sys::ShutdownHandler { - typedef std::map<uint16_t, boost::weak_ptr<SessionCore> > SessionMap; - typedef std::vector<boost::shared_ptr<SessionCore> > SessionVector; + typedef std::map<uint16_t, boost::weak_ptr<SessionImpl> > SessionMap; + typedef std::vector<boost::shared_ptr<SessionImpl> > SessionVector; SessionMap sessions; ConnectionHandler handler; @@ -69,7 +69,7 @@ class ConnectionImpl : public framing::FrameHandler, ConnectionImpl(boost::shared_ptr<Connector> c); ~ConnectionImpl(); - void addSession(const boost::shared_ptr<SessionCore>&); + void addSession(const boost::shared_ptr<SessionImpl>&); void open(const std::string& host, int port = 5672, const std::string& uid = "guest", diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index 11aff6184b..7fb4997f5a 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -46,6 +46,7 @@ Connector::Connector( receive_buffer_size(buffer_size), send_buffer_size(buffer_size), version(ver), + initiated(false), closed(true), joined(true), timeout(0), @@ -240,6 +241,14 @@ void Connector::Writer::write(sys::AsynchIO&) { void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + if (!initiated) { + framing::ProtocolInitiation protocolInit; + if (protocolInit.decode(in)) { + //TODO: check the version is correct + QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")"); + } + initiated = true; + } AMQFrame frame; while(frame.decode(in)){ QPID_LOG(trace, "RECV " << identifier << ": " << frame); diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h index 78aad0b60a..366f82acbd 100644 --- a/qpid/cpp/src/qpid/client/Connector.h +++ b/qpid/cpp/src/qpid/client/Connector.h @@ -77,8 +77,9 @@ class Connector : public framing::OutputHandler, const int receive_buffer_size; const int send_buffer_size; framing::ProtocolVersion version; + bool initiated; - sys::Mutex closedLock; + sys::Mutex closedLock; bool closed; bool joined; diff --git a/qpid/cpp/src/qpid/client/Correlator.cpp b/qpid/cpp/src/qpid/client/Correlator.cpp deleted file mode 100644 index f30c92b992..0000000000 --- a/qpid/cpp/src/qpid/client/Correlator.cpp +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Correlator.h" - -using qpid::client::Correlator; -using namespace qpid::framing; -using namespace boost; - -bool Correlator::receive(const AMQMethodBody* response) -{ - if (listeners.empty()) { - return false; - } else { - Listener l = listeners.front(); - if (l) l(response); - listeners.pop(); - return true; - } -} - -void Correlator::listen(Listener l) -{ - listeners.push(l); -} - - diff --git a/qpid/cpp/src/qpid/client/Correlator.h b/qpid/cpp/src/qpid/client/Correlator.h deleted file mode 100644 index 45b22fb2fe..0000000000 --- a/qpid/cpp/src/qpid/client/Correlator.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <memory> -#include <queue> -#include <set> -#include <boost/function.hpp> -#include "qpid/framing/AMQMethodBody.h" -#include "qpid/sys/Monitor.h" - -#ifndef _Correlator_ -#define _Correlator_ - -namespace qpid { -namespace client { - - -class Correlator -{ -public: - typedef boost::function<void(const framing::AMQMethodBody*)> Listener; - - bool receive(const framing::AMQMethodBody*); - void listen(Listener l); - -private: - std::queue<Listener> listeners; -}; - -} -} - - -#endif diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp index 2484dabf1f..cc67701748 100644 --- a/qpid/cpp/src/qpid/client/Dispatcher.cpp +++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp @@ -84,7 +84,7 @@ void Dispatcher::run() if (handler.get()) { handler->handle(*content); } else { - QPID_LOG(error, "No handler found for " << *(content->getMethod())); + QPID_LOG(warning, "No handler found for " << *(content->getMethod())); } } } diff --git a/qpid/cpp/src/qpid/client/Execution.h b/qpid/cpp/src/qpid/client/Execution.h index 5f717de586..e4b2db23e1 100644 --- a/qpid/cpp/src/qpid/client/Execution.h +++ b/qpid/cpp/src/qpid/client/Execution.h @@ -28,21 +28,27 @@ namespace qpid { namespace client { /** - * Provides more detailed access to the amqp 'execution layer'. + * Provides access to more detailed aspects of the session + * implementation. */ class Execution { public: virtual ~Execution() {} - virtual void sendSyncRequest() = 0; - virtual void sendFlushRequest() = 0; - virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0; + /** + * Mark the incoming command with the specified id as completed + */ + virtual void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer) = 0; + /** + * Provides access to the demultiplexing function within the + * session implementation + */ virtual Demux& getDemux() = 0; - virtual bool isComplete(const framing::SequenceNumber& id) = 0; - virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0; - virtual void setCompletionListener(boost::function<void()>) = 0; - virtual void syncWait(const framing::SequenceNumber& id) = 0; - virtual framing::SequenceNumber lastSent() const = 0; + /** + * Wait until notification has been received of completion of the + * outgoing command with the specified id. + */ + void waitForCompletion(const framing::SequenceNumber& id); }; }} diff --git a/qpid/cpp/src/qpid/client/ExecutionHandler.cpp b/qpid/cpp/src/qpid/client/ExecutionHandler.cpp deleted file mode 100644 index afdd13c9e9..0000000000 --- a/qpid/cpp/src/qpid/client/ExecutionHandler.cpp +++ /dev/null @@ -1,267 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ExecutionHandler.h" -#include "qpid/Exception.h" -#include "qpid/framing/BasicDeliverBody.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/all_method_bodies.h" -#include "qpid/framing/ServerInvoker.h" -#include "qpid/client/FutureCompletion.h" -#include <boost/bind.hpp> - -using namespace qpid::client; -using namespace qpid::framing; -using namespace boost; -using qpid::sys::Mutex; - -bool isMessageMethod(AMQMethodBody* method) -{ - return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>(); -} - -bool isMessageMethod(AMQBody* body) -{ - AMQMethodBody* method=body->getMethod(); - return method && isMessageMethod(method); -} - -bool isContentFrame(AMQFrame& frame) -{ - AMQBody* body = frame.getBody(); - uint8_t type = body->type(); - return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); -} - -ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) : - version(framing::highestProtocolVersion), maxFrameSize(_maxFrameSize) {} - -//incoming: -void ExecutionHandler::handle(AMQFrame& frame) -{ - if (!invoke(*this, *frame.getBody())) { - if (!arriving) { - arriving = FrameSet::shared_ptr(new FrameSet(++incomingCounter)); - } - arriving->append(frame); - if (arriving->isComplete()) { - if (arriving->isContentBearing() || !correlation.receive(arriving->getMethod())) { - demux.handle(arriving); - } - arriving.reset(); - } - } -} - -void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) -{ - if (range.size() % 2) { //must be even number - throw NotAllowedException(QPID_MSG("Received odd number of elements in ranged mark")); - } else { - SequenceNumber mark(cumulative); - { - Mutex::ScopedLock l(lock); - outgoingCompletionStatus.update(mark, range); - } - if (completionListener) completionListener(); - completion.completed(outgoingCompletionStatus.mark); - //TODO: signal listeners of early notification? - } -} - -void ExecutionHandler::flush() -{ - sendCompletion(); -} - -void ExecutionHandler::noop() -{ - //do nothing -} - -void ExecutionHandler::result(uint32_t command, const std::string& data) -{ - completion.received(command, data); -} - -void ExecutionHandler::sync() -{ - //TODO: implement - need to note the mark requested and then - //remember to send a response when that point is reached -} - -void ExecutionHandler::flushTo(const framing::SequenceNumber& point) -{ - Mutex::ScopedLock l(lock); - if (point > outgoingCompletionStatus.mark) { - sendFlushRequest(); - } -} - -void ExecutionHandler::sendFlushRequest() -{ - Mutex::ScopedLock l(lock); - AMQFrame frame(in_place<ExecutionFlushBody>()); - out(frame); -} - -void ExecutionHandler::syncTo(const framing::SequenceNumber& point) -{ - Mutex::ScopedLock l(lock); - if (point > outgoingCompletionStatus.mark) { - sendSyncRequest(); - } -} - - -void ExecutionHandler::sendSyncRequest() -{ - Mutex::ScopedLock l(lock); - AMQFrame frame(in_place<ExecutionSyncBody>()); - out(frame); -} - -void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send) -{ - { - Mutex::ScopedLock l(lock); - if (id > incomingCompletionStatus.mark) { - if (cumulative) { - incomingCompletionStatus.update(incomingCompletionStatus.mark, id); - } else { - incomingCompletionStatus.update(id, id); - } - } - } - if (send) { - sendCompletion(); - } -} - - -void ExecutionHandler::sendCompletion() -{ - Mutex::ScopedLock l(lock); - SequenceNumberSet range; - incomingCompletionStatus.collectRanges(range); - AMQFrame frame( - in_place<ExecutionCompleteBody>( - version, incomingCompletionStatus.mark.getValue(), range)); - out(frame); -} - -SequenceNumber ExecutionHandler::lastSent() const { return outgoingCounter; } - -SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener) -{ - Mutex::ScopedLock l(lock); - return send(command, listener, false); -} - -SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent) -{ - SequenceNumber id = ++outgoingCounter; - if(l) { - completion.listenForResult(id, l); - } - AMQFrame frame(command); - if (hasContent) { - frame.setEof(false); - } - out(frame); - return id; -} - -SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content, - CompletionTracker::ResultListener listener) -{ - Mutex::ScopedLock l(lock); - SequenceNumber id = send(command, listener, true); - sendContent(content); - return id; -} - -void ExecutionHandler::sendContent(const MethodContent& content) -{ - AMQFrame header(content.getHeader()); - header.setBof(false); - u_int64_t data_length = content.getData().length(); - if(data_length > 0){ - header.setEof(false); - out(header); - const u_int32_t frag_size = maxFrameSize - (AMQFrame::frameOverhead() - 1 /*end of frame marker included in overhead but not in size*/); - if(data_length < frag_size){ - AMQFrame frame(in_place<AMQContentBody>(content.getData())); - frame.setBof(false); - out(frame); - }else{ - u_int32_t offset = 0; - u_int32_t remaining = data_length - offset; - while (remaining > 0) { - u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(content.getData().substr(offset, length)); - AMQFrame frame(in_place<AMQContentBody>(frag)); - frame.setBof(false); - if (offset > 0) { - frame.setBos(false); - } - offset += length; - remaining = data_length - offset; - if (remaining) { - frame.setEos(false); - frame.setEof(false); - } - out(frame); - } - } - } else { - out(header); - } -} - -bool ExecutionHandler::isComplete(const SequenceNumber& id) -{ - Mutex::ScopedLock l(lock); - return outgoingCompletionStatus.covers(id); -} - -bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id) -{ - Mutex::ScopedLock l(lock); - return outgoingCompletionStatus.mark >= id; -} - -void ExecutionHandler::setCompletionListener(boost::function<void()> l) -{ - completionListener = l; -} - - -void ExecutionHandler::syncWait(const SequenceNumber& id) { - syncTo(id); - FutureCompletion fc; - completion.listenForCompletion( - id, boost::bind(&FutureCompletion::completed, &fc) - ); - fc.waitForCompletion(); - assert(isCompleteUpTo(id)); -} diff --git a/qpid/cpp/src/qpid/client/ExecutionHandler.h b/qpid/cpp/src/qpid/client/ExecutionHandler.h deleted file mode 100644 index d9113b683b..0000000000 --- a/qpid/cpp/src/qpid/client/ExecutionHandler.h +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ExecutionHandler_ -#define _ExecutionHandler_ - -#include <queue> -#include "qpid/framing/AccumulatedAck.h" -#include "qpid/framing/AMQP_ServerOperations.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/framing/MethodContent.h" -#include "qpid/framing/SequenceNumber.h" -#include "qpid/sys/Mutex.h" -#include "ChainableFrameHandler.h" -#include "CompletionTracker.h" -#include "Correlator.h" -#include "Demux.h" -#include "Execution.h" - -namespace qpid { -namespace client { - -class ExecutionHandler : - public framing::AMQP_ServerOperations::ExecutionHandler, - public framing::FrameHandler, - public Execution -{ - framing::SequenceNumber incomingCounter; - framing::AccumulatedAck incomingCompletionStatus; - framing::SequenceNumber outgoingCounter; - framing::AccumulatedAck outgoingCompletionStatus; - framing::FrameSet::shared_ptr arriving; - Correlator correlation; - CompletionTracker completion; - Demux demux; - sys::Mutex lock; - framing::ProtocolVersion version; - uint64_t maxFrameSize; - boost::function<void()> completionListener; - - void complete(uint32_t mark, const framing::SequenceNumberSet& range); - void flush(); - void noop(); - void result(uint32_t command, const std::string& data); - void sync(); - - void sendCompletion(); - - framing::SequenceNumber send(const framing::AMQBody&, CompletionTracker::ResultListener, bool hasContent); - void sendContent(const framing::MethodContent&); - -public: - typedef CompletionTracker::ResultListener ResultListener; - - // Allow other classes to set the out handler. - framing::FrameHandler::Chain out; - - ExecutionHandler(uint64_t maxFrameSize = 65535); - - // Incoming handler. - void handle(framing::AMQFrame& frame); - - framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener()); - framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content, - ResultListener=ResultListener()); - framing::SequenceNumber lastSent() const; - void sendSyncRequest(); - void sendFlushRequest(); - void completed(const framing::SequenceNumber& id, bool cumulative, bool send); - void syncTo(const framing::SequenceNumber& point); - void flushTo(const framing::SequenceNumber& point); - void syncWait(const framing::SequenceNumber& id); - - bool isComplete(const framing::SequenceNumber& id); - bool isCompleteUpTo(const framing::SequenceNumber& id); - - void setMaxFrameSize(uint64_t size) { maxFrameSize = size; } - Correlator& getCorrelator() { return correlation; } - CompletionTracker& getCompletionTracker() { return completion; } - Demux& getDemux() { return demux; } - - void setCompletionListener(boost::function<void()>); -}; - -}} - -#endif diff --git a/qpid/cpp/src/qpid/client/FutureResponse.h b/qpid/cpp/src/qpid/client/Future.cpp index 534ca01bb7..6a0c78ae4b 100644 --- a/qpid/cpp/src/qpid/client/FutureResponse.h +++ b/qpid/cpp/src/qpid/client/Future.cpp @@ -19,28 +19,27 @@ * */ -#ifndef _FutureResponse_ -#define _FutureResponse_ - -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/BodyHolder.h" -#include "FutureCompletion.h" +#include "Future.h" namespace qpid { namespace client { -class SessionCore; - -class FutureResponse : public FutureCompletion +void Future::wait(SessionImpl& session) { - framing::BodyHolder response; -public: - framing::AMQMethodBody* getResponse(SessionCore& session); - void received(const framing::AMQMethodBody* response); -}; - -}} + if (!complete) { + session.waitForCompletion(command); + } + complete = true; +} +bool Future::isComplete(SessionImpl& session) +{ + return complete || session.isComplete(command); +} +void Future::setFutureResult(boost::shared_ptr<FutureResult> r) +{ + result = r; +} -#endif +}} diff --git a/qpid/cpp/src/qpid/client/Future.h b/qpid/cpp/src/qpid/client/Future.h index d07f9f149c..faf68c9104 100644 --- a/qpid/cpp/src/qpid/client/Future.h +++ b/qpid/cpp/src/qpid/client/Future.h @@ -28,9 +28,8 @@ #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/StructHelper.h" #include "FutureCompletion.h" -#include "FutureResponse.h" #include "FutureResult.h" -#include "SessionCore.h" +#include "SessionImpl.h" namespace qpid { namespace client { @@ -38,7 +37,6 @@ namespace client { class Future : private framing::StructHelper { framing::SequenceNumber command; - boost::shared_ptr<FutureResponse> response; boost::shared_ptr<FutureResult> result; bool complete; @@ -46,42 +44,7 @@ public: Future() : complete(false) {} Future(const framing::SequenceNumber& id) : command(id), complete(false) {} - void sync(SessionCore& session) - { - if (!isComplete(session)) { - session.getExecution().syncTo(command); - wait(session); - } - } - - void wait(SessionCore& session) - { - if (!isComplete(session)) { - FutureCompletion callback; - session.getExecution().getCompletionTracker().listenForCompletion( - command, - boost::bind(&FutureCompletion::completed, &callback) - ); - callback.waitForCompletion(); - session.assertOpen(); - complete = true; - } - } - - framing::AMQMethodBody* getResponse(SessionCore& session) - { - if (response) { - session.getExecution().getCompletionTracker().listenForCompletion( - command, - boost::bind(&FutureResponse::completed, response) - ); - return response->getResponse(session); - } else { - throw Exception("Response not expected"); - } - } - - template <class T> void decodeResult(T& value, SessionCore& session) + template <class T> void decodeResult(T& value, SessionImpl& session) { if (result) { decode(value, result->getResult(session)); @@ -90,17 +53,9 @@ public: } } - bool isComplete(SessionCore& session) { - return complete || session.getExecution().isComplete(command); - } - - bool isCompleteUpTo(SessionCore& session) { - return complete || session.getExecution().isCompleteUpTo(command); - } - - void setCommandId(const framing::SequenceNumber& id) { command = id; } - void setFutureResponse(boost::shared_ptr<FutureResponse> r) { response = r; } - void setFutureResult(boost::shared_ptr<FutureResult> r) { result = r; } + void wait(SessionImpl& session); + bool isComplete(SessionImpl& session); + void setFutureResult(boost::shared_ptr<FutureResult> r); }; }} diff --git a/qpid/cpp/src/qpid/client/FutureFactory.cpp b/qpid/cpp/src/qpid/client/FutureFactory.cpp deleted file mode 100644 index 7f9d51e77f..0000000000 --- a/qpid/cpp/src/qpid/client/FutureFactory.cpp +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "FutureFactory.h" - -using namespace qpid::client; -using namespace boost; - -shared_ptr<FutureCompletion> FutureFactory::createCompletion() -{ - shared_ptr<FutureCompletion> f(new FutureCompletion()); - weak_ptr<FutureCompletion> w(f); - set.push_back(w); - return f; -} - -shared_ptr<FutureResponse> FutureFactory::createResponse() -{ - shared_ptr<FutureResponse> f(new FutureResponse()); - weak_ptr<FutureCompletion> w(static_pointer_cast<FutureCompletion>(f)); - set.push_back(w); - return f; -} - -void FutureFactory::close(uint16_t code, const std::string& text) -{ - for (WeakPtrSet::iterator i = set.begin(); i != set.end(); i++) { - shared_ptr<FutureCompletion> p = i->lock(); - if (p) { - p->close(code, text); - } - } -} diff --git a/qpid/cpp/src/qpid/client/FutureFactory.h b/qpid/cpp/src/qpid/client/FutureFactory.h deleted file mode 100644 index b126e296fd..0000000000 --- a/qpid/cpp/src/qpid/client/FutureFactory.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef _FutureFactory_ -#define _FutureFactory_ - -#include <vector> -#include <boost/shared_ptr.hpp> -#include <boost/weak_ptr.hpp> -#include "FutureCompletion.h" -#include "FutureResponse.h" - -namespace qpid { -namespace client { - -class FutureFactory -{ - typedef std::vector< boost::weak_ptr<FutureCompletion> > WeakPtrSet; - WeakPtrSet set; - -public: - boost::shared_ptr<FutureCompletion> createCompletion(); - boost::shared_ptr<FutureResponse> createResponse(); - void close(uint16_t code, const std::string& text); -}; - -}} - - -#endif diff --git a/qpid/cpp/src/qpid/client/FutureResponse.cpp b/qpid/cpp/src/qpid/client/FutureResponse.cpp deleted file mode 100644 index 32d99531fa..0000000000 --- a/qpid/cpp/src/qpid/client/FutureResponse.cpp +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "FutureResponse.h" - -#include "SessionCore.h" - -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::sys; - - -AMQMethodBody* FutureResponse::getResponse(SessionCore& session) -{ - waitForCompletion(); - session.assertOpen(); - return response.getMethod(); -} - -void FutureResponse::received(const AMQMethodBody* r) -{ - Monitor::ScopedLock l(lock); - response.setBody(*r); - complete = true; - lock.notifyAll(); -} - diff --git a/qpid/cpp/src/qpid/client/FutureResult.cpp b/qpid/cpp/src/qpid/client/FutureResult.cpp index 681202edea..007f278051 100644 --- a/qpid/cpp/src/qpid/client/FutureResult.cpp +++ b/qpid/cpp/src/qpid/client/FutureResult.cpp @@ -21,13 +21,13 @@ #include "FutureResult.h" -#include "SessionCore.h" +#include "SessionImpl.h" using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; -const std::string& FutureResult::getResult(SessionCore& session) const +const std::string& FutureResult::getResult(SessionImpl& session) const { waitForCompletion(); session.assertOpen(); diff --git a/qpid/cpp/src/qpid/client/FutureResult.h b/qpid/cpp/src/qpid/client/FutureResult.h index 3117b63802..f889706493 100644 --- a/qpid/cpp/src/qpid/client/FutureResult.h +++ b/qpid/cpp/src/qpid/client/FutureResult.h @@ -29,13 +29,13 @@ namespace qpid { namespace client { -class SessionCore; +class SessionImpl; class FutureResult : public FutureCompletion { std::string result; public: - const std::string& getResult(SessionCore& session) const; + const std::string& getResult(SessionImpl& session) const; void received(const std::string& result); }; diff --git a/qpid/cpp/src/qpid/client/Message.cpp b/qpid/cpp/src/qpid/client/Message.cpp new file mode 100644 index 0000000000..3d4b9da9fa --- /dev/null +++ b/qpid/cpp/src/qpid/client/Message.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Message.h" + +namespace qpid { +namespace client { + + Message::Message(const std::string& data_, + const std::string& routingKey, + const std::string& exchange) : TransferContent(data_, routingKey, exchange) {} + + std::string Message::getDestination() const + { + return method.getDestination(); + } + + bool Message::isRedelivered() const + { + return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); + } + + void Message::setRedelivered(bool redelivered) + { + getDeliveryProperties().setRedelivered(redelivered); + } + + framing::FieldTable& Message::getHeaders() + { + return getMessageProperties().getApplicationHeaders(); + } + + void Message::acknowledge(bool cumulative, bool notifyPeer) const + { + const_cast<Session&>(session).getExecution().markCompleted(id, cumulative, notifyPeer); + } + + const framing::MessageTransferBody& Message::getMethod() const + { + return method; + } + + const framing::SequenceNumber& Message::getId() const + { + return id; + } + + /**@internal for incoming messages */ + Message::Message(const framing::FrameSet& frameset, Session s) : + method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s) + { + populate(frameset); + } + + /**@internal use for incoming messages. */ + void Message::setSession(Session s) { session=s; } + +}} diff --git a/qpid/cpp/src/qpid/client/Message.h b/qpid/cpp/src/qpid/client/Message.h index daac30ba36..977cc89146 100644 --- a/qpid/cpp/src/qpid/client/Message.h +++ b/qpid/cpp/src/qpid/client/Message.h @@ -30,7 +30,7 @@ namespace qpid { namespace client { /** - * A representation of messages for sent or recived through the + * A representation of messages for sent or received through the * client api. * * \ingroup clientapi @@ -38,60 +38,21 @@ namespace client { class Message : public framing::TransferContent { public: - Message(const std::string& data_=std::string(), + Message(const std::string& data=std::string(), const std::string& routingKey=std::string(), - const std::string& exchange=std::string() - ) : TransferContent(data_, routingKey, exchange) {} - - std::string getDestination() const - { - return method.getDestination(); - } - - bool isRedelivered() const - { - return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); - } - - void setRedelivered(bool redelivered) - { - getDeliveryProperties().setRedelivered(redelivered); - } - - framing::FieldTable& getHeaders() - { - return getMessageProperties().getApplicationHeaders(); - } - - void acknowledge(Session& session, bool cumulative = true, bool send = true) const - { - session.getExecution().completed(id, cumulative, send); - } - - void acknowledge(bool cumulative = true, bool send = true) const - { - const_cast<Session&>(session).getExecution().completed(id, cumulative, send); - } + const std::string& exchange=std::string()); + std::string getDestination() const; + bool isRedelivered() const; + void setRedelivered(bool redelivered); + framing::FieldTable& getHeaders(); + void acknowledge(bool cumulative = true, bool notifyPeer = true) const; + const framing::MessageTransferBody& getMethod() const; + const framing::SequenceNumber& getId() const; /**@internal for incoming messages */ - Message(const framing::FrameSet& frameset, Session s) : - method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s) - { - populate(frameset); - } - - const framing::MessageTransferBody& getMethod() const - { - return method; - } - - const framing::SequenceNumber& getId() const - { - return id; - } - + Message(const framing::FrameSet& frameset, Session s); /**@internal use for incoming messages. */ - void setSession(Session s) { session=s; } + void setSession(Session s); private: //method and id are only set for received messages: framing::MessageTransferBody method; diff --git a/qpid/cpp/src/qpid/client/Results.cpp b/qpid/cpp/src/qpid/client/Results.cpp new file mode 100644 index 0000000000..1fb3a6fec9 --- /dev/null +++ b/qpid/cpp/src/qpid/client/Results.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Results.h" +#include "FutureResult.h" + +using namespace qpid::framing; +using namespace boost; + +namespace qpid { +namespace client { + +Results::Results() {} + +void Results::close() +{ + for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) { + i->second->completed(); + } + listeners.clear(); +} + +void Results::completed(const SequenceSet& set) +{ + //call complete on those listeners whose ids fall within the set + Listeners::iterator i = listeners.begin(); + while (i != listeners.end()) { + if (set.contains(i->first)) { + i->second->completed(); + listeners.erase(i++); + } else { + i++; + } + } +} + +void Results::received(const SequenceNumber& id, const std::string& result) +{ + Listeners::iterator i = listeners.find(id); + if (i != listeners.end()) { + i->second->received(result); + listeners.erase(i); + } +} + +Results::FutureResultPtr Results::listenForResult(const SequenceNumber& id) +{ + FutureResultPtr l(new FutureResult()); + listeners[id] = l; + return l; +} + +}} diff --git a/qpid/cpp/src/qpid/client/Response.h b/qpid/cpp/src/qpid/client/Results.h index 2b7d55ec1f..e17021b327 100644 --- a/qpid/cpp/src/qpid/client/Response.h +++ b/qpid/cpp/src/qpid/client/Results.h @@ -19,34 +19,37 @@ * */ -#ifndef _Response_ -#define _Response_ - +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/SequenceSet.h" +#include <map> #include <boost/shared_ptr.hpp> -#include "qpid/framing/amqp_framing.h" -#include "Completion.h" + +#ifndef _Results_ +#define _Results_ namespace qpid { namespace client { -class Response : public Completion +class FutureResult; + +class Results { public: - Response(Future f, shared_ptr<SessionCore> s) : Completion(f, s) {} - - template <class T> T& as() - { - framing::AMQMethodBody* response(future.getResponse(*session)); - return *boost::polymorphic_downcast<T*>(response); - } - - template <class T> bool isA() - { - framing::AMQMethodBody* response(future.getResponse(*session)); - return response && response->isA<T>(); - } + typedef boost::shared_ptr<FutureResult> FutureResultPtr; + + Results(); + void completed(const framing::SequenceSet& set); + void received(const framing::SequenceNumber& id, const std::string& result); + FutureResultPtr listenForResult(const framing::SequenceNumber& point); + void close(); + +private: + typedef std::map<framing::SequenceNumber, FutureResultPtr> Listeners; + Listeners listeners; }; -}} +} +} + #endif diff --git a/qpid/cpp/src/qpid/client/SessionBase.cpp b/qpid/cpp/src/qpid/client/SessionBase.cpp index 0e1fa67bda..d6a7571e9f 100644 --- a/qpid/cpp/src/qpid/client/SessionBase.cpp +++ b/qpid/cpp/src/qpid/client/SessionBase.cpp @@ -19,6 +19,7 @@ * */ #include "SessionBase.h" +#include "qpid/framing/all_method_bodies.h" namespace qpid { namespace client { @@ -26,7 +27,7 @@ using namespace framing; SessionBase::SessionBase() {} SessionBase::~SessionBase() {} -SessionBase::SessionBase(shared_ptr<SessionCore> core) : impl(core) {} +SessionBase::SessionBase(shared_ptr<SessionImpl> core) : impl(core) {} void SessionBase::suspend() { impl->suspend(); } void SessionBase::close() { impl->close(); } @@ -37,14 +38,30 @@ SynchronousMode SessionBase::getSynchronous() const { return SynchronousMode(impl->isSync()); } -Execution& SessionBase::getExecution() { return impl->getExecution(); } +Execution& SessionBase::getExecution() +{ + return *impl; +} + +void SessionBase::sync() +{ + ExecutionSyncBody b; + b.setSync(true); + impl->send(b).wait(*impl); +} + Uuid SessionBase::getId() const { return impl->getId(); } framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); } -void SessionBase::sync() { - Execution& ex = getExecution(); - ex.syncWait(ex.lastSent()); - impl->assertOpen(); +SessionBase::ScopedSync::ScopedSync(SessionBase& s) : session(s), change(!s.isSynchronous()) +{ + if (change) session.setSynchronous(true); } +SessionBase::ScopedSync::~ScopedSync() +{ + if (change) session.setSynchronous(false); +} + + }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/SessionBase.h b/qpid/cpp/src/qpid/client/SessionBase.h index 3565145bb9..54484113b1 100644 --- a/qpid/cpp/src/qpid/client/SessionBase.h +++ b/qpid/cpp/src/qpid/client/SessionBase.h @@ -29,8 +29,8 @@ #include "qpid/framing/TransferContent.h" #include "qpid/client/Completion.h" #include "qpid/client/ConnectionImpl.h" -#include "qpid/client/Response.h" -#include "qpid/client/SessionCore.h" +#include "qpid/client/Execution.h" +#include "qpid/client/SessionImpl.h" #include "qpid/client/TypedResult.h" #include "qpid/shared_ptr.h" #include <string> @@ -61,8 +61,11 @@ using framing::Uuid; * <em>later</em> function call. * * If you need to notify some extenal agent that some actions have - * been taken (e.g. binding queues to exchanages), you must call - * Session::sync() first, to ensure that all the commands are complete. + * been taken (e.g. binding queues to exchanges), you must ensure that + * the broker has completed the command. In synchronous mode this is + * when the session method for the command returns. In asynchronous + * mode you can call Session::sync(), to ensure that all the commands + * are complete. * * You can freely switch between modes by calling Session::setSynchronous() * @@ -77,6 +80,21 @@ enum SynchronousMode { SYNC=true, ASYNC=false }; class SessionBase { public: + /** + * Instances of this class turn synchronous mode on for the + * duration of their scope (and revert back to async if required + * afterwards). + */ + class ScopedSync + { + SessionBase& session; + const bool change; + public: + ScopedSync(SessionBase& s); + ~ScopedSync(); + }; + + SessionBase(); ~SessionBase(); @@ -110,23 +128,17 @@ class SessionBase /** Close the session */ void close(); - - /** - * Synchronize with the broker. Wait for all commands issued so far in - * the session to complete. - * @see SynchronousMode - */ - void sync(); Execution& getExecution(); + void sync(); typedef framing::TransferContent DefaultContent; protected: - shared_ptr<SessionCore> impl; + shared_ptr<SessionImpl> impl; framing::ProtocolVersion version; friend class Connection; - SessionBase(shared_ptr<SessionCore>); + SessionBase(shared_ptr<SessionImpl>); }; }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/SessionCore.cpp b/qpid/cpp/src/qpid/client/SessionCore.cpp deleted file mode 100644 index 5079c47b5e..0000000000 --- a/qpid/cpp/src/qpid/client/SessionCore.cpp +++ /dev/null @@ -1,440 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "SessionCore.h" -#include "Future.h" -#include "FutureResponse.h" -#include "FutureResult.h" -#include "ConnectionImpl.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/framing/constants.h" -#include "qpid/framing/ClientInvoker.h" -#include "qpid/log/Statement.h" - -#include <boost/bind.hpp> - -namespace qpid { -namespace client { - -using namespace qpid::framing; - -namespace { const std::string OK="ok"; } - -typedef sys::Monitor::ScopedLock Lock; -typedef sys::Monitor::ScopedUnlock UnLock; - -inline void SessionCore::invariant() const { - switch (state.get()) { - case OPENING: - assert(!session); - assert(code==REPLY_SUCCESS); - assert(connection); - assert(channel.get()); - assert(channel.next == connection.get()); - break; - case RESUMING: - assert(session); - assert(session->getState() == SessionState::RESUMING); - assert(code==REPLY_SUCCESS); - assert(connection); - assert(channel.get()); - assert(channel.next == connection.get()); - break; - case OPEN: - case CLOSING: - case SUSPENDING: - assert(session); - assert(connection); - assert(channel.get()); - assert(channel.next == connection.get()); - break; - case SUSPENDED: - assert(session); - assert(!connection); - break; - case CLOSED: - assert(!session); - assert(!connection); - break; - } -} - -inline void SessionCore::setState(State s) { - state = s; - invariant(); -} - -inline void SessionCore::waitFor(State s) { - invariant(); - // We can be CLOSED or SUSPENDED by error at any time. - state.waitFor(States(s, CLOSED, SUSPENDED)); - check(); - invariant(); -} - -SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, - uint16_t ch, uint64_t maxFrameSize) - : l3(maxFrameSize), - sync(false), - channel(ch), - proxy(channel), - state(OPENING), - detachedLifetime(0) -{ - l3.out = &out; - attaching(conn); -} - -void SessionCore::attaching(shared_ptr<ConnectionImpl> c) { - assert(c); - assert(channel.get()); - connection = c; - channel.next = connection.get(); - code = REPLY_SUCCESS; - text = OK; - state = session ? RESUMING : OPENING; - invariant(); -} - -SessionCore::~SessionCore() { - Lock l(state); - detach(COMMAND_INVALID, "Session deleted"); - state.waitWaiters(); -} - -void SessionCore::detach(int c, const std::string& t) { - connection.reset(); - channel.next = 0; - code=c; - text=t; - l3.getDemux().close(); -} - -void SessionCore::doClose(int code, const std::string& text) { - if (state != CLOSED) { - session.reset(); - detach(code, text); - setState(CLOSED); - l3.getCompletionTracker().close(); - } - invariant(); -} - -void SessionCore::doSuspend(int code, const std::string& text) { - if (state != CLOSED && state != SUSPENDED) { - detach(code, text); - session->suspend(); - setState(SUSPENDED); - } - invariant(); -} - -ExecutionHandler& SessionCore::getExecution() { // user thread - return l3; -} - -void SessionCore::setSync(bool s) { // user thread - sync = s; -} - -bool SessionCore::isSync() { // user thread - return sync; -} - -FrameSet::shared_ptr SessionCore::get() { // user thread - // No lock here: pop does a blocking wait. - return l3.getDemux().getDefault()->pop(); -} - -static const std::string CANNOT_REOPEN_SESSION="Cannot re-open a session."; - -void SessionCore::open(uint32_t timeout) { // user thread - Lock l(state); - check(state==OPENING && !session, - COMMAND_INVALID, CANNOT_REOPEN_SESSION); - detachedLifetime=timeout; - proxy.open(detachedLifetime); - waitFor(OPEN); -} - -void SessionCore::close() { // user thread - Lock l(state); - check(); - if (state==OPEN) { - setState(CLOSING); - proxy.close(); - waitFor(CLOSED); - } - else - doClose(REPLY_SUCCESS, OK); -} - -void SessionCore::suspend() { // user thread - Lock l(state); - checkOpen(); - setState(SUSPENDING); - proxy.suspend(); - waitFor(SUSPENDED); -} - -void SessionCore::setChannel(uint16_t ch) { channel=ch; } - -static const std::string CANNOT_RESUME_SESSION("Session cannot be resumed."); - -void SessionCore::resume(shared_ptr<ConnectionImpl> c) { - // user thread - { - Lock l(state); - if (state==SUSPENDED) { // Clear error that caused suspend - code=REPLY_SUCCESS; - text=OK; - } - check(state==SUSPENDED, COMMAND_INVALID, CANNOT_RESUME_SESSION); - SequenceNumber sendAck=session->resuming(); - attaching(c); - proxy.resume(getId()); - waitFor(OPEN); - proxy.ack(sendAck, SequenceNumberSet()); - // TODO aconway 2007-10-23: Replay inside the lock might be a prolem - // for large replay sets. - SessionState::Replay replay=session->replay(); - for (SessionState::Replay::iterator i = replay.begin(); - i != replay.end(); ++i) - { - invariant(); - channel.handle(*i); // Direct to channel. - check(); - } - l3.getDemux().open(); - } -} - -void SessionCore::assertOpen() const { - Lock l(state); - checkOpen(); -} - -static const std::string UNEXPECTED_SESSION_ATTACHED( - "Received unexpected session.attached"); - -static const std::string INVALID_SESSION_RESUME_ID( - "session.resumed has invalid ID."); - -// network thread -void SessionCore::attached(const Uuid& sessionId, - uint32_t /*detachedLifetime*/) -{ - Lock l(state); - invariant(); - check(state == OPENING || state == RESUMING, - COMMAND_INVALID, UNEXPECTED_SESSION_ATTACHED); - if (state==OPENING) { // New session - // TODO aconway 2007-10-17: 0 disables sesskon.ack for now. - // If AMQP WG decides to keep it, we need to add configuration - // for the ack rate. - session=in_place<SessionState>(0, detachedLifetime > 0, sessionId); - setState(OPEN); - } - else { // RESUMING - check(sessionId == session->getId(), - INVALID_ARGUMENT, INVALID_SESSION_RESUME_ID); - // Don't setState yet, wait for first incoming ack. - } -} - -static const std::string UNEXPECTED_SESSION_DETACHED( - "Received unexpected session.detached."); - -static const std::string UNEXPECTED_SESSION_ACK( - "Received unexpected session.ack"); - -void SessionCore::detached() { // network thread - Lock l(state); - check(state == SUSPENDING, - COMMAND_INVALID, UNEXPECTED_SESSION_DETACHED); - doSuspend(REPLY_SUCCESS, OK); -} - -void SessionCore::ack(uint32_t ack, const SequenceNumberSet&) { - Lock l(state); - invariant(); - check(state==OPEN || state==RESUMING, - COMMAND_INVALID, UNEXPECTED_SESSION_ACK); - session->receivedAck(ack); - if (state==RESUMING) { - setState(OPEN); - } - invariant(); -} - -void SessionCore::closed(uint16_t code, const std::string& text) -{ // network thread - Lock l(state); - invariant(); - doClose(code, text); -} - -// closed by connection -void SessionCore::connectionClosed(uint16_t code, const std::string& text) { - Lock l(state); - try { - doClose(code, text); - } catch(...) { assert (0); } -} - -void SessionCore::connectionBroke(uint16_t code, const std::string& text) { - Lock l(state); - try { - doSuspend(code, text); - } catch (...) { assert(0); } -} - -void SessionCore::check() const { // Called with lock held. - invariant(); - if (code != REPLY_SUCCESS) - throwReplyException(code, text); -} - -void SessionCore::check(bool cond, int newCode, const std::string& msg) const { - check(); - if (!cond) { - const_cast<SessionCore*>(this)->doClose(newCode, msg); - throwReplyException(code, text); - } -} - -static const std::string SESSION_NOT_OPEN("Session is not open"); - -void SessionCore::checkOpen() const { - if (state==SUSPENDED) { - std::string cause; - if (code != REPLY_SUCCESS) - cause=" by :"+text; - throw CommandInvalidException(QPID_MSG("Session is suspended" << cause)); - } - check(state==OPEN, COMMAND_INVALID, SESSION_NOT_OPEN); -} - -Future SessionCore::send(const AMQBody& command) -{ - Lock l(state); - checkOpen(); - command.getMethod()->setSync(sync); - Future f; - //any result/response listeners must be set before the command is sent - if (command.getMethod()->resultExpected()) { - boost::shared_ptr<FutureResult> r(new FutureResult()); - f.setFutureResult(r); - //result listener is tied to command id, and is set when that - //is allocated by the execution handler, so pass it to send - f.setCommandId(l3.send(command, boost::bind(&FutureResult::received, r, _1))); - } else { - if (command.getMethod()->responseExpected()) { - boost::shared_ptr<FutureResponse> r(new FutureResponse()); - f.setFutureResponse(r); - l3.getCorrelator().listen(boost::bind(&FutureResponse::received, r, _1)); - } - - f.setCommandId(l3.send(command)); - } - return f; -} - -Future SessionCore::send(const AMQBody& command, const MethodContent& content) -{ - Lock l(state); - checkOpen(); - //content bearing methods don't currently have responses or - //results, if that changes should follow procedure for the other - //send method impl: - return Future(l3.send(command, content)); -} - -namespace { -bool isCloseResponse(const AMQFrame& frame) { - return frame.getMethod() && - frame.getMethod()->amqpClassId() == SESSION_CLASS_ID && - frame.getMethod()->amqpMethodId() == SESSION_CLOSED_METHOD_ID; -} -} - -// Network thread. -void SessionCore::handleIn(AMQFrame& frame) { - ConnectionImpl::shared_ptr save; - { - Lock l(state); - save=connection; - // Ignore frames received while closing other than closed response. - if (state==CLOSING && !isCloseResponse(frame)) - return; - } - try { - // Cast to expose private SessionHandler functions. - if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { - // If we were detached by a session command, tell the connection. - if (!connection) save->erase(channel); - } - else { - session->received(frame); - l3.handle(frame); - } - } catch (const ChannelException& e) { - QPID_LOG(error, "Channel exception:" << e.what()); - doClose(e.code, e.what()); - } -} - -void SessionCore::handleOut(AMQFrame& frame) -{ - Lock l(state); - if (state==OPEN) { - if (detachedLifetime > 0 && session->sent(frame)) - proxy.solicitAck(); - channel.handle(frame); - } -} - -void SessionCore::solicitAck( ) { - Lock l(state); - checkOpen(); - proxy.ack(session->sendingAck(), SequenceNumberSet()); -} - -void SessionCore::flow(bool) { - assert(0); throw NotImplementedException("session.flow"); -} - -void SessionCore::flowOk(bool /*active*/) { - assert(0); throw NotImplementedException("session.flow"); -} - -void SessionCore::highWaterMark(uint32_t /*lastSentMark*/) { - // TODO aconway 2007-10-02: may be removed from spec. - assert(0); throw NotImplementedException("session.highWaterMark"); -} - -const Uuid SessionCore::getId() const { - if (session) - return session->getId(); - throw Exception(QPID_MSG("Closed session, no ID.")); -} - -}} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/SessionCore.h b/qpid/cpp/src/qpid/client/SessionCore.h deleted file mode 100644 index 2bb0f41fbf..0000000000 --- a/qpid/cpp/src/qpid/client/SessionCore.h +++ /dev/null @@ -1,141 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef _SessionCore_ -#define _SessionCore_ - -#include "qpid/shared_ptr.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/ChannelHandler.h" -#include "qpid/framing/SessionState.h" -#include "qpid/framing/SequenceNumber.h" -#include "qpid/framing/AMQP_ClientOperations.h" -#include "qpid/framing/AMQP_ServerProxy.h" -#include "qpid/sys/StateMonitor.h" -#include "ExecutionHandler.h" - -#include <boost/optional.hpp> - -namespace qpid { -namespace framing { -class FrameSet; -class MethodContent; -class SequenceNumberSet; -} - -namespace client { - -class Future; -class ConnectionImpl; - -/** - * Session implementation, sets up handler chains. - * Attaches to a SessionHandler when active, detaches - * when closed. - */ -class SessionCore : public framing::FrameHandler::InOutHandler, - private framing::AMQP_ClientOperations::SessionHandler -{ - public: - SessionCore(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize); - ~SessionCore(); - - framing::FrameSet::shared_ptr get(); - const framing::Uuid getId() const; - uint16_t getChannel() const { return channel; } - void assertOpen() const; - - // NOTE: Public functions called in user thread. - void open(uint32_t detachedLifetime); - void close(); - void resume(shared_ptr<ConnectionImpl>); - void suspend(); - void setChannel(uint16_t channel); - - void setSync(bool s); - bool isSync(); - ExecutionHandler& getExecution(); - - Future send(const framing::AMQBody& command); - - Future send(const framing::AMQBody& command, const framing::MethodContent& content); - - void connectionClosed(uint16_t code, const std::string& text); - void connectionBroke(uint16_t code, const std::string& text); - - private: - enum State { - OPENING, - RESUMING, - OPEN, - CLOSING, - SUSPENDING, - SUSPENDED, - CLOSED - }; - typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler; - typedef sys::StateMonitor<State, CLOSED> StateMonitor; - typedef StateMonitor::Set States; - - inline void invariant() const; - inline void setState(State s); - inline void waitFor(State); - void doClose(int code, const std::string& text); - void doSuspend(int code, const std::string& text); - - /** If there is an error, throw the exception */ - void check(bool condition, int code, const std::string& text) const; - /** Throw if *error */ - void check() const; - - void handleIn(framing::AMQFrame& frame); - void handleOut(framing::AMQFrame& frame); - - // Private functions are called by broker in network thread. - void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); - void flow(bool active); - void flowOk(bool active); - void detached(); - void ack(uint32_t cumulativeSeenMark, - const framing::SequenceNumberSet& seenFrameSet); - void highWaterMark(uint32_t lastSentMark); - void solicitAck(); - void closed(uint16_t code, const std::string& text); - - void attaching(shared_ptr<ConnectionImpl>); - void detach(int code, const std::string& text); - void checkOpen() const; - - int code; // Error code - std::string text; // Error text - boost::optional<framing::SessionState> session; - shared_ptr<ConnectionImpl> connection; - ExecutionHandler l3; - volatile bool sync; - framing::ChannelHandler channel; - framing::AMQP_ServerProxy::Session proxy; - mutable StateMonitor state; - uint32_t detachedLifetime; -}; - -}} // namespace qpid::client - -#endif diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp new file mode 100644 index 0000000000..57f12cf28e --- /dev/null +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -0,0 +1,605 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SessionImpl.h" + +#include "ConnectionImpl.h" +#include "Future.h" + +#include "qpid/framing/all_method_bodies.h" +#include "qpid/framing/ClientInvoker.h" +#include "qpid/framing/constants.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/MethodContent.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/log/Statement.h" + +#include <boost/bind.hpp> + +namespace { const std::string OK="ok"; } + +namespace qpid { +namespace client { + +using namespace qpid::framing; + +typedef sys::Monitor::ScopedLock Lock; +typedef sys::Monitor::ScopedUnlock UnLock; + + +SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn, + uint16_t ch, uint64_t _maxFrameSize) + : code(REPLY_SUCCESS), + text(OK), + state(INACTIVE), + syncMode(false), + detachedLifetime(0), + maxFrameSize(_maxFrameSize), + id(true),//generate unique uuid for each session + name(id.str()), + //TODO: may want to allow application defined names instead + connection(conn), + channel(ch), + proxy(channel), + nextIn(0), + nextOut(0) +{ + channel.next = connection.get(); +} + +SessionImpl::~SessionImpl() { + Lock l(state); + if (state != DETACHED) { + QPID_LOG(warning, "Detaching deleted session"); + setState(DETACHED); + handleClosed(); + state.waitWaiters(); + } + connection->erase(channel); +} + +void SessionImpl::setSync(bool s) // user thread +{ + syncMode = s; //syncMode is volatile +} + +bool SessionImpl::isSync() // user thread +{ + return syncMode; //syncMode is volatile +} + +FrameSet::shared_ptr SessionImpl::get() // user thread +{ + // No lock here: pop does a blocking wait. + return demux.getDefault()->pop(); +} + +const Uuid SessionImpl::getId() const //user thread +{ + return id; //id is immutable +} + +void SessionImpl::open(uint32_t timeout) // user thread +{ + Lock l(state); + if (state == INACTIVE) { + setState(ATTACHING); + proxy.attach(name, false); + waitFor(ATTACHED); + //TODO: timeout will not be set locally until get response to + //confirm, should we wait for that? + proxy.requestTimeout(timeout); + proxy.commandPoint(nextOut, 0); + } else { + throw Exception("Open already called for this session"); + } +} + +void SessionImpl::close() //user thread +{ + Lock l(state); + if (detachedLifetime) { + proxy.requestTimeout(0); + //should we wait for the timeout response? + detachedLifetime = 0; + } + detach(); + waitFor(DETACHED); +} + +void SessionImpl::resume(shared_ptr<ConnectionImpl>) // user thread +{ + throw NotImplementedException("Resume not yet implemented by client!"); +} + +void SessionImpl::suspend() //user thread +{ + Lock l(state); + detach(); +} + +void SessionImpl::detach() //call with lock held +{ + if (state == ATTACHED) { + proxy.detach(name); + setState(DETACHING); + } +} + + +uint16_t SessionImpl::getChannel() const // user thread +{ + return channel; +} + +void SessionImpl::setChannel(uint16_t c) // user thread +{ + //channel will only ever be set when session is detached (and + //about to be resumed) + channel = c; +} + +Demux& SessionImpl::getDemux() +{ + return demux; +} + +void SessionImpl::waitForCompletion(const SequenceNumber& id) +{ + Lock l(state); + waitForCompletionImpl(id); +} + +void SessionImpl::waitForCompletionImpl(const SequenceNumber& id) //call with lock held +{ + while (incompleteOut.contains(id)) { + checkOpen(); + state.wait(); + } +} + +bool SessionImpl::isComplete(const SequenceNumber& id) +{ + Lock l(state); + return !incompleteOut.contains(id); +} + +struct IsCompleteUpTo +{ + const SequenceNumber& id; + bool result; + + IsCompleteUpTo(const SequenceNumber& _id) : id(_id), result(true) {} + void operator()(const SequenceNumber& start, const SequenceNumber&) + { + if (start <= id) result = false; + } + +}; + +bool SessionImpl::isCompleteUpTo(const SequenceNumber& id) +{ + Lock l(state); + //return false if incompleteOut contains anything less than id, + //true otherwise + IsCompleteUpTo f(id); + incompleteIn.for_each(f); + return f.result; +} + +struct MarkCompleted +{ + const SequenceNumber& id; + SequenceSet& completedIn; + + MarkCompleted(const SequenceNumber& _id, SequenceSet& set) : id(_id), completedIn(set) {} + + void operator()(const SequenceNumber& start, const SequenceNumber& end) + { + if (id >= end) { + completedIn.add(start, end); + } else if (id >= start) { + completedIn.add(start, id); + } + } + +}; + +void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool notifyPeer) +{ + Lock l(state); + if (cumulative) { + //everything in incompleteIn less than or equal to id is now complete + MarkCompleted f(id, completedIn); + incompleteIn.for_each(f); + //make sure id itself is in + completedIn.add(id); + //then remove anything thats completed from the incomplete set + incompleteIn.remove(completedIn); + } else if (incompleteIn.contains(id)) { + incompleteIn.remove(id); + completedIn.add(id); + } + if (notifyPeer) { + sendCompletion(); + } +} + +/** + * Called by ConnectionImpl to notify active sessions when connection + * is explictly closed + */ +void SessionImpl::connectionClosed(uint16_t _code, const std::string& _text) +{ + Lock l(state); + code = _code; + text = _text; + setState(DETACHED); + handleClosed(); +} + +/** + * Called by ConnectionImpl to notify active sessions when connection + * is disconnected + */ +void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text) +{ + connectionClosed(_code, _text); +} + +Future SessionImpl::send(const AMQBody& command) +{ + return sendCommand(command); +} + +Future SessionImpl::send(const AMQBody& command, const MethodContent& content) +{ + return sendCommand(command, &content); +} + +Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content) +{ + Lock l(state); + checkOpen(); + SequenceNumber id = nextOut++; + incompleteOut.add(id); + + if (syncMode) command.getMethod()->setSync(syncMode); + Future f(id); + if (command.getMethod()->resultExpected()) { + //result listener must be set before the command is sent + f.setFutureResult(results.listenForResult(id)); + } + AMQFrame frame(command); + if (content) { + frame.setEof(false); + } + handleOut(frame); + if (content) { + sendContent(*content); + } + if (syncMode) { + waitForCompletionImpl(id); + } + return f; +} + +void SessionImpl::sendContent(const MethodContent& content) +{ + AMQFrame header(content.getHeader()); + header.setBof(false); + u_int64_t data_length = content.getData().length(); + if(data_length > 0){ + header.setEof(false); + handleOut(header); + /*Note: end of frame marker included in overhead but not in size*/ + const u_int32_t frag_size = maxFrameSize - (AMQFrame::frameOverhead() - 1); + + if(data_length < frag_size){ + AMQFrame frame(in_place<AMQContentBody>(content.getData())); + frame.setBof(false); + handleOut(frame); + }else{ + u_int32_t offset = 0; + u_int32_t remaining = data_length - offset; + while (remaining > 0) { + u_int32_t length = remaining > frag_size ? frag_size : remaining; + string frag(content.getData().substr(offset, length)); + AMQFrame frame(in_place<AMQContentBody>(frag)); + frame.setBof(false); + if (offset > 0) { + frame.setBos(false); + } + offset += length; + remaining = data_length - offset; + if (remaining) { + frame.setEos(false); + frame.setEof(false); + } + handleOut(frame); + } + } + } else { + handleOut(header); + } +} + + +bool isMessageMethod(AMQMethodBody* method) +{ + return method->isA<MessageTransferBody>(); +} + +bool isMessageMethod(AMQBody* body) +{ + AMQMethodBody* method=body->getMethod(); + return method && isMessageMethod(method); +} + +bool isContentFrame(AMQFrame& frame) +{ + AMQBody* body = frame.getBody(); + uint8_t type = body->type(); + return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); +} + +void SessionImpl::handleIn(AMQFrame& frame) // network thread +{ + try { + if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { + if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) { + //make sure the command id sequence and completion + //tracking takes account of execution commands + Lock l(state); + completedIn.add(nextIn++); + } else { + //if not handled by this class, its for the application: + deliver(frame); + } + } + } catch (const SessionException& e) { + //TODO: proper 0-10 exception handling + QPID_LOG(error, "Session exception:" << e.what()); + Lock l(state); + code = e.code; + text = e.what(); + } +} + +void SessionImpl::handleOut(AMQFrame& frame) // user thread +{ + channel.handle(frame); +} + +void SessionImpl::deliver(AMQFrame& frame) // network thread +{ + if (!arriving) { + arriving = FrameSet::shared_ptr(new FrameSet(nextIn++)); + } + arriving->append(frame); + if (arriving->isComplete()) { + //message.transfers will be marked completed only when 'acked' + //as completion affects flow control; other commands will be + //considered completed as soon as processed here + if (arriving->isA<MessageTransferBody>()) { + Lock l(state); + incompleteIn.add(arriving->getId()); + } else { + Lock l(state); + completedIn.add(arriving->getId()); + } + demux.handle(arriving); + arriving.reset(); + } +} + +//control handler methods (called by network thread when controls are +//received from peer): + +void SessionImpl::attach(const std::string& /*name*/, bool /*force*/) +{ + throw NotImplementedException("Client does not support attach"); +} + +void SessionImpl::attached(const std::string& _name) +{ + Lock l(state); + if (name != _name) throw InternalErrorException("Incorrect session name"); + setState(ATTACHED); +} + +void SessionImpl::detach(const std::string& /*name*/) +{ + throw NotImplementedException("Client does not support detach"); +} + +void SessionImpl::detached(const std::string& _name, uint8_t _code) +{ + Lock l(state); + if (name != _name) throw InternalErrorException("Incorrect session name"); + setState(DETACHED); + if (_code) { + //TODO: make sure this works with execution.exception - don't + //want to overwrite the code from that + QPID_LOG(error, "Session detached by peer: " << name << " " << code); + code = _code; + text = "Session detached by peer"; + } + if (detachedLifetime == 0) { + handleClosed(); + } +} + +void SessionImpl::requestTimeout(uint32_t t) +{ + Lock l(state); + detachedLifetime = t; + proxy.timeout(t); +} + +void SessionImpl::timeout(uint32_t t) +{ + Lock l(state); + detachedLifetime = t; +} + +void SessionImpl::commandPoint(const framing::SequenceNumber& id, uint64_t offset) +{ + if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); + + Lock l(state); + nextIn = id; +} + +void SessionImpl::expected(const framing::SequenceSet& commands, const framing::Array& fragments) +{ + if (!commands.empty() || fragments.size()) { + throw NotImplementedException("Session resumption not yet supported"); + } +} + +void SessionImpl::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/) +{ + //don't really care too much about this yet +} + +void SessionImpl::completed(const framing::SequenceSet& commands, bool timelyReply) +{ + Lock l(state); + incompleteOut.remove(commands); + state.notify();//notify any waiters of completion + completedOut.add(commands); + //notify any waiting results of completion + results.completed(commands); + + if (timelyReply) { + proxy.knownCompleted(completedOut); + completedOut.clear(); + } +} + +void SessionImpl::knownCompleted(const framing::SequenceSet& commands) +{ + Lock l(state); + completedIn.remove(commands); +} + +void SessionImpl::flush(bool expected, bool confirmed, bool completed) +{ + Lock l(state); + if (expected) { + proxy.expected(SequenceSet(nextIn), Array()); + } + if (confirmed) { + proxy.confirmed(completedIn, Array()); + } + if (completed) { + proxy.completed(completedIn, true); + } +} + +void SessionImpl::sendCompletion() +{ + proxy.completed(completedIn, true); +} + +void SessionImpl::gap(const framing::SequenceSet& /*commands*/) +{ + throw NotImplementedException("gap not yet supported"); +} + + + +void SessionImpl::sync() {} + +void SessionImpl::result(uint32_t commandId, const std::string& value) +{ + Lock l(state); + results.received(commandId, value); +} + +void SessionImpl::exception(uint16_t errorCode, + uint32_t commandId, + uint8_t classCode, + uint8_t commandCode, + uint8_t /*fieldIndex*/, + const std::string& description, + const framing::FieldTable& /*errorInfo*/) +{ + QPID_LOG(warning, "Exception received from peer: " << errorCode << ":" << description + << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]"); + + Lock l(state); + code = errorCode; + text = description; + if (detachedLifetime) { + proxy.requestTimeout(0); + //should we wait for the timeout response? + detachedLifetime = 0; + } + detach(); +} + + +//private utility methods: + +inline void SessionImpl::setState(State s) //call with lock held +{ + state = s; +} + +inline void SessionImpl::waitFor(State s) //call with lock held +{ + // We can be DETACHED at any time + state.waitFor(States(s, DETACHED)); + check(); +} + +void SessionImpl::check() const //call with lock held. +{ + if (code != REPLY_SUCCESS) { + throwReplyException(code, text); + } +} + +void SessionImpl::checkOpen() const //call with lock held. +{ + check(); + if (state != ATTACHED) { + throwReplyException(0, "Session isn't attached"); + } +} + +void SessionImpl::assertOpen() const +{ + Lock l(state); + checkOpen(); +} + +void SessionImpl::handleClosed() +{ + QPID_LOG(info, "SessionImpl::handleClosed(): entering"); + demux.close(); + results.close(); + QPID_LOG(info, "SessionImpl::handleClosed(): returning"); +} + +}} diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h new file mode 100644 index 0000000000..1284670389 --- /dev/null +++ b/qpid/cpp/src/qpid/client/SessionImpl.h @@ -0,0 +1,188 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _SessionImpl_ +#define _SessionImpl_ + +#include "Demux.h" +#include "Execution.h" +#include "Results.h" + +#include "qpid/shared_ptr.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/ChannelHandler.h" +#include "qpid/framing/SessionState.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/AMQP_ClientOperations.h" +#include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/sys/StateMonitor.h" + +#include <boost/optional.hpp> + +namespace qpid { + +namespace framing { + +class FrameSet; +class MethodContent; +class SequenceSet; + +} + +namespace client { + +class Future; +class ConnectionImpl; + +class SessionImpl : public framing::FrameHandler::InOutHandler, + public Execution, + private framing::AMQP_ClientOperations::Session010Handler, + private framing::AMQP_ClientOperations::Execution010Handler +{ +public: + SessionImpl(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize); + ~SessionImpl(); + + + //NOTE: Public functions called in user thread. + framing::FrameSet::shared_ptr get(); + + const framing::Uuid getId() const; + + uint16_t getChannel() const; + void setChannel(uint16_t channel); + + void open(uint32_t detachedLifetime); + void close(); + void resume(shared_ptr<ConnectionImpl>); + void suspend(); + + void setSync(bool s); + bool isSync(); + void assertOpen() const; + + Future send(const framing::AMQBody& command); + Future send(const framing::AMQBody& command, const framing::MethodContent& content); + + Demux& getDemux(); + void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); + bool isComplete(const framing::SequenceNumber& id); + bool isCompleteUpTo(const framing::SequenceNumber& id); + void waitForCompletion(const framing::SequenceNumber& id); + + //NOTE: these are called by the network thread when the connection is closed or dies + void connectionClosed(uint16_t code, const std::string& text); + void connectionBroke(uint16_t code, const std::string& text); + +private: + enum State { + INACTIVE, + ATTACHING, + ATTACHED, + DETACHING, + DETACHED + }; + typedef framing::AMQP_ClientOperations::Session010Handler SessionHandler; + typedef framing::AMQP_ClientOperations::Execution010Handler ExecutionHandler; + typedef sys::StateMonitor<State, DETACHED> StateMonitor; + typedef StateMonitor::Set States; + + inline void setState(State s); + inline void waitFor(State); + + void detach(); + + void check() const; + void checkOpen() const; + void handleClosed(); + + void handleIn(framing::AMQFrame& frame); + void handleOut(framing::AMQFrame& frame); + void deliver(framing::AMQFrame& frame); + + Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0); + void sendContent(const framing::MethodContent&); + void waitForCompletionImpl(const framing::SequenceNumber& id); + + void sendCompletion(); + + // Note: Following methods are called by network thread in + // response to session controls from the broker + void attach(const std::string& name, bool force); + void attached(const std::string& name); + void detach(const std::string& name); + void detached(const std::string& name, uint8_t detachCode); + void requestTimeout(uint32_t timeout); + void timeout(uint32_t timeout); + void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset); + void expected(const framing::SequenceSet& commands, const framing::Array& fragments); + void confirmed(const framing::SequenceSet& commands, const framing::Array& fragments); + void completed(const framing::SequenceSet& commands, bool timelyReply); + void knownCompleted(const framing::SequenceSet& commands); + void flush(bool expected, bool confirmed, bool completed); + void gap(const framing::SequenceSet& commands); + + // Note: Following methods are called by network thread in + // response to execution commands from the broker + void sync(); + void result(uint32_t commandId, const std::string& value); + void exception(uint16_t errorCode, + uint32_t commandId, + uint8_t classCode, + uint8_t commandCode, + uint8_t fieldIndex, + const std::string& description, + const framing::FieldTable& errorInfo); + + + //hack for old generator: + void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); } + + int code; // Error code + std::string text; // Error text + mutable StateMonitor state; + volatile bool syncMode; + uint32_t detachedLifetime; + const uint64_t maxFrameSize; + const framing::Uuid id; + const std::string name; + + + shared_ptr<ConnectionImpl> connection; + framing::ChannelHandler channel; + framing::AMQP_ServerProxy::Session010 proxy; + + Results results; + Demux demux; + framing::FrameSet::shared_ptr arriving; + + framing::SequenceSet incompleteIn;//incoming commands that are as yet incomplete + framing::SequenceSet completedIn;//incoming commands that are have completed + framing::SequenceSet incompleteOut;//outgoing commands not yet known to be complete + framing::SequenceSet completedOut;//outgoing commands that we know to be completed + framing::SequenceNumber nextIn; + framing::SequenceNumber nextOut; + +}; + +}} // namespace qpid::client + +#endif diff --git a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp index f14344225c..b353be481b 100644 --- a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp @@ -35,7 +35,7 @@ namespace client { SubscriptionManager::SubscriptionManager(Session& s) : dispatcher(s), session(s), messages(UNLIMITED), bytes(UNLIMITED), window(true), - confirmMode(true), acquireMode(false), + acceptMode(0), acquireMode(0), autoStop(true) {} @@ -43,7 +43,7 @@ Completion SubscriptionManager::subscribeInternal( const std::string& q, const std::string& dest) { Completion c = session.messageSubscribe(arg::queue=q, arg::destination=dest, - arg::confirmMode=confirmMode, arg::acquireMode=acquireMode); + arg::acceptMode=acceptMode, arg::acquireMode=acquireMode); setFlowControl(dest, messages, bytes, window); return c; } @@ -68,7 +68,7 @@ Completion SubscriptionManager::subscribe( void SubscriptionManager::setFlowControl( const std::string& dest, uint32_t messages, uint32_t bytes, bool window) { - session.messageFlowMode(dest, window); + session.messageSetFlowMode(dest, window); session.messageFlow(dest, 0, messages); session.messageFlow(dest, 1, bytes); } @@ -81,7 +81,7 @@ void SubscriptionManager::setFlowControl( window=window_; } -void SubscriptionManager::setConfirmMode(bool c) { confirmMode=c; } +void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; } void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; } diff --git a/qpid/cpp/src/qpid/client/SubscriptionManager.h b/qpid/cpp/src/qpid/client/SubscriptionManager.h index 1741796f4f..48cb725fb8 100644 --- a/qpid/cpp/src/qpid/client/SubscriptionManager.h +++ b/qpid/cpp/src/qpid/client/SubscriptionManager.h @@ -53,7 +53,7 @@ class SubscriptionManager : public sys::Runnable uint32_t bytes; bool window; AckPolicy autoAck; - bool confirmMode; + bool acceptMode; bool acquireMode; bool autoStop; @@ -116,16 +116,16 @@ class SubscriptionManager : public sys::Runnable */ void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true); - /** Set the confirm-mode for new subscriptions. Defaults to true. - *@param confirm: if true messages must be confirmed by calling + /** Set the accept-mode for new subscriptions. Defaults to true. + *@param required: if true messages must be confirmed by calling *Message::acknowledge() or automatically, see setAckPolicy() */ - void setConfirmMode(bool confirm); + void setAcceptMode(bool required); /** Set the acquire-mode for new subscriptions. Defaults to false. *@param acquire: if false messages pre-acquired, if true * messages are dequed on acknowledgement or on transfer - * depending on confirmMode. + * depending on acceptMode. */ void setAcquireMode(bool acquire); diff --git a/qpid/cpp/src/qpid/client/TypedResult.h b/qpid/cpp/src/qpid/client/TypedResult.h index edcf728c54..0b36be9716 100644 --- a/qpid/cpp/src/qpid/client/TypedResult.h +++ b/qpid/cpp/src/qpid/client/TypedResult.h @@ -33,7 +33,7 @@ template <class T> class TypedResult : public Completion bool decoded; public: - TypedResult(Future f, shared_ptr<SessionCore> s) : Completion(f, s), decoded(false) {} + TypedResult(Future f, shared_ptr<SessionImpl> s) : Completion(f, s), decoded(false) {} T& get() { diff --git a/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp b/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp index a4ea257f0c..b241ee8e36 100644 --- a/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp @@ -32,10 +32,10 @@ struct ClassifierHandler::Visitor : public FrameDefaultVisitor { void visit(const ExchangeDeclareBody&) { chosen=&classifier.wiring; } void visit(const ExchangeDeleteBody&) { chosen=&classifier.wiring; } - void visit(const QueueBindBody&) { chosen=&classifier.wiring; } + void visit(const ExchangeBindBody&) { chosen=&classifier.wiring; } + void visit(const ExchangeUnbindBody&) { chosen=&classifier.wiring; } void visit(const QueueDeclareBody&) { chosen=&classifier.wiring; } void visit(const QueueDeleteBody&) { chosen=&classifier.wiring; } - void visit(const QueueUnbindBody&) { chosen=&classifier.wiring; } void defaultVisit(const AMQBody&) { chosen=&classifier.other; } using framing::FrameDefaultVisitor::visit; diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h index 2318492e2d..5a8e55f9d2 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.h +++ b/qpid/cpp/src/qpid/framing/AMQFrame.h @@ -105,7 +105,6 @@ class AMQFrame : public AMQDataBlock void setEos(bool isEos) { eos = isEos; } static uint32_t frameOverhead(); - private: void init() { bof = eof = bos = eos = true; subchannel=0; channel=0; } diff --git a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h index 8a3a92936e..2064468785 100644 --- a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h +++ b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h @@ -26,8 +26,8 @@ #include "Buffer.h" #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/MessageProperties.h" -#include "qpid/framing/DeliveryProperties010.h" -#include "qpid/framing/MessageProperties010.h" +#include "qpid/framing/PreviewDeliveryProperties.h" +#include "qpid/framing/PreviewMessageProperties.h" #include <iostream> #include <boost/optional.hpp> @@ -79,8 +79,8 @@ class AMQHeaderBody : public AMQBody // Could use boost::mpl::fold to construct a larger set. typedef PropSet< PropSet< PropSet<PropSet<Empty, DeliveryProperties>, MessageProperties>, - DeliveryProperties010>, - MessageProperties010> Properties; + PreviewDeliveryProperties>, + PreviewMessageProperties> Properties; Properties properties; diff --git a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h index b15e14d6f6..0aa1bf7e66 100644 --- a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h +++ b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h @@ -32,8 +32,8 @@ namespace qpid { namespace framing { -static ProtocolVersion highestProtocolVersion(99, 0); -//static ProtocolVersion highestProtocolVersion(0, 10); +//static ProtocolVersion highestProtocolVersion(99, 0); +static ProtocolVersion highestProtocolVersion(0, 10); } /* namespace framing */ } /* namespace qpid */ diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.h b/qpid/cpp/src/qpid/framing/SequenceSet.h index eabc729411..8b3d6b3f28 100644 --- a/qpid/cpp/src/qpid/framing/SequenceSet.h +++ b/qpid/cpp/src/qpid/framing/SequenceSet.h @@ -69,11 +69,12 @@ class SequenceSet { bool empty() const; template <class T> - void for_each(T& t) const + T for_each(T& t) const { for (Ranges::const_iterator i = ranges.begin(); i != ranges.end(); i++) { t(i->start, i->end); } + return t; } template <class S> void serialize(S& s) { s.split(*this); s(ranges.begin(), ranges.end()); } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index cace04bef5..769593c8d2 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -24,6 +24,7 @@ #include "qpid/log/Statement.h" #include <qpid/broker/Message.h> #include <qpid/broker/MessageDelivery.h> +#include "qpid/framing/MessageXTransferBody.h" #include <list> #include <iostream> #include <fstream> @@ -217,7 +218,7 @@ void ManagementAgent::SendBuffer (Buffer& buf, return; intrusive_ptr<Message> msg (new Message ()); - AMQFrame method (in_place<MessageTransferBody>( + AMQFrame method (in_place<MessageXTransferBody>( ProtocolVersion(), 0, exchange->getName (), 0, 0)); AMQFrame header (in_place<AMQHeaderBody>()); AMQFrame content(in_place<AMQContentBody>()); @@ -232,8 +233,8 @@ void ManagementAgent::SendBuffer (Buffer& buf, msg->getFrames().append(method); msg->getFrames().append(header); - MessageProperties* props = - msg->getFrames().getHeaders()->get<MessageProperties>(true); + PreviewMessageProperties* props = + msg->getFrames().getHeaders()->get<PreviewMessageProperties>(true); props->setContentLength(length); msg->getFrames().append(content); @@ -393,8 +394,8 @@ void ManagementAgent::dispatchMethod (Message& msg, uint64_t objId = inBuffer.getLongLong (); string replyToKey; - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + const framing::PreviewMessageProperties* p = + msg.getFrames().getHeaders()->get<framing::PreviewMessageProperties>(); if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo (); @@ -600,8 +601,8 @@ void ManagementAgent::dispatchAgentCommand (Message& msg) uint32_t sequence; string replyToKey; - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + const framing::PreviewMessageProperties* p = + msg.getFrames().getHeaders()->get<framing::PreviewMessageProperties>(); if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo (); diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp index 44d5ed4650..9b6e0dce21 100644 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -118,13 +118,13 @@ QPID_AUTO_TEST_CASE(testTransfer) ClientSessionFixture fix; fix.session=fix.connection.newSession(ASYNC); fix.declareSubscribe(); - fix.session.messageTransfer(content=TransferContent("my-message", "my-queue")); + fix.session.messageTransfer(acceptMode=1, content=TransferContent("my-message", "my-queue")); //get & test the message: FrameSet::shared_ptr msg = fix.session.get(); BOOST_CHECK(msg->isA<MessageTransferBody>()); BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); //confirm receipt: - fix.session.getExecution().completed(msg->getId(), true, true); + fix.session.getExecution().markCompleted(msg->getId(), true, true); } QPID_AUTO_TEST_CASE(testDispatcher) @@ -161,6 +161,8 @@ BOOST_FIXTURE_TEST_CASE(testDispatcherThread, ClientSessionFixture) } */ +/* + * GS (18-APR-2008): disabled resume tests until resumption for 0-10 final spec is implemented QPID_AUTO_TEST_CASE(_FIXTURE) { ClientSessionFixture fix; @@ -195,7 +197,7 @@ QPID_AUTO_TEST_CASE(testSuspendResume) FrameSet::shared_ptr msg = fix.session.get(); BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); } - +*/ /** * Currently broken due to a deadlock in SessionCore * diff --git a/qpid/cpp/src/tests/ExchangeTest.cpp b/qpid/cpp/src/tests/ExchangeTest.cpp index 2904424d5c..94e2c025d6 100644 --- a/qpid/cpp/src/tests/ExchangeTest.cpp +++ b/qpid/cpp/src/tests/ExchangeTest.cpp @@ -30,7 +30,6 @@ #include "qpid/broker/TopicExchange.h" #include "qpid_test_plugin.h" #include <iostream> -#include "qpid/framing/BasicGetBody.h" #include "MessageUtils.h" using boost::intrusive_ptr; diff --git a/qpid/cpp/src/tests/FramingTest.cpp b/qpid/cpp/src/tests/FramingTest.cpp index 0c7adb2af8..275d32acfe 100644 --- a/qpid/cpp/src/tests/FramingTest.cpp +++ b/qpid/cpp/src/tests/FramingTest.cpp @@ -23,8 +23,6 @@ #include "qpid/client/Connection.h" #include "qpid/client/Connector.h" #include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/BasicGetOkBody.h" -#include "qpid/framing/ConnectionRedirectBody.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/framing/all_method_bodies.h" #include "qpid/framing/amqp_framing.h" @@ -54,16 +52,12 @@ std::string tostring(const T& x) class FramingTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(FramingTest); - CPPUNIT_TEST(testBasicQosBody); + CPPUNIT_TEST(testMessageTransferBody); CPPUNIT_TEST(testConnectionSecureBody); CPPUNIT_TEST(testConnectionRedirectBody); - CPPUNIT_TEST(testAccessRequestBody); - CPPUNIT_TEST(testBasicConsumeBody); + CPPUNIT_TEST(testQueueDeclareBody); CPPUNIT_TEST(testConnectionRedirectBodyFrame); - CPPUNIT_TEST(testBasicConsumeOkBodyFrame); - CPPUNIT_TEST(testInlineContent); - CPPUNIT_TEST(testContentReference); - CPPUNIT_TEST(testContentValidation); + CPPUNIT_TEST(testMessageCancelBodyFrame); CPPUNIT_TEST_SUITE_END(); private: @@ -74,14 +68,14 @@ class FramingTest : public CppUnit::TestCase FramingTest() : version(highestProtocolVersion) {} - void testBasicQosBody() + void testMessageTransferBody() { Buffer wbuff(buffer, sizeof(buffer)); - BasicQosBody in(version, 0xCAFEBABE, 0xABBA, true); + MessageTransferBody in(version, "my-exchange", 1, 1); in.encode(wbuff); Buffer rbuff(buffer, sizeof(buffer)); - BasicQosBody out(version); + MessageTransferBody out(version); out.decode(rbuff); CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } @@ -104,7 +98,11 @@ class FramingTest : public CppUnit::TestCase Buffer wbuff(buffer, sizeof(buffer)); std::string a = "hostA"; std::string b = "hostB"; - ConnectionRedirectBody in(version, a, b); + Array hosts(0x95); + hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(a))); + hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(b))); + + ConnectionRedirectBody in(version, a, hosts); in.encode(wbuff); Buffer rbuff(buffer, sizeof(buffer)); @@ -113,41 +111,28 @@ class FramingTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } - void testAccessRequestBody() - { - Buffer wbuff(buffer, sizeof(buffer)); - std::string s = "text"; - AccessRequestBody in(version, s, true, false, true, false, true); - in.encode(wbuff); - - Buffer rbuff(buffer, sizeof(buffer)); - AccessRequestBody out(version); - out.decode(rbuff); - CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); - } - - void testBasicConsumeBody() + void testQueueDeclareBody() { Buffer wbuff(buffer, sizeof(buffer)); - std::string q = "queue"; - std::string t = "tag"; - BasicConsumeBody in(version, 0, q, t, false, true, false, false, - FieldTable()); + QueueDeclareBody in(version, "name", "dlq", true, false, true, false, FieldTable()); in.encode(wbuff); Buffer rbuff(buffer, sizeof(buffer)); - BasicConsumeBody out(version); + QueueDeclareBody out(version); out.decode(rbuff); CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } - void testConnectionRedirectBodyFrame() { Buffer wbuff(buffer, sizeof(buffer)); std::string a = "hostA"; std::string b = "hostB"; - AMQFrame in(in_place<ConnectionRedirectBody>(version, a, b)); + Array hosts(0x95); + hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(a))); + hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(b))); + + AMQFrame in(in_place<ConnectionRedirectBody>(version, a, hosts)); in.setChannel(999); in.encode(wbuff); @@ -157,11 +142,10 @@ class FramingTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } - void testBasicConsumeOkBodyFrame() + void testMessageCancelBodyFrame() { Buffer wbuff(buffer, sizeof(buffer)); - std::string s = "hostA"; - AMQFrame in(in_place<BasicConsumeOkBody>(version, s)); + AMQFrame in(in_place<MessageCancelBody>(version, "tag")); in.setChannel(999); in.encode(wbuff); @@ -171,56 +155,6 @@ class FramingTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } - void testInlineContent() { - Buffer wbuff(buffer, sizeof(buffer)); - Content content(INLINE, "MyData"); - CPPUNIT_ASSERT(content.isInline()); - content.encode(wbuff); - - Buffer rbuff(buffer, sizeof(buffer)); - Content recovered; - recovered.decode(rbuff); - CPPUNIT_ASSERT(recovered.isInline()); - CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue()); - } - - void testContentReference() { - Buffer wbuff(buffer, sizeof(buffer)); - Content content(REFERENCE, "MyRef"); - CPPUNIT_ASSERT(content.isReference()); - content.encode(wbuff); - - Buffer rbuff(buffer, sizeof(buffer)); - Content recovered; - recovered.decode(rbuff); - CPPUNIT_ASSERT(recovered.isReference()); - CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue()); - } - - void testContentValidation() { - try { - Content content(REFERENCE, ""); - CPPUNIT_ASSERT(false);//fail, expected exception - } catch (const InvalidArgumentException& e) {} - - try { - Content content(2, "Blah"); - CPPUNIT_ASSERT(false);//fail, expected exception - } catch (const SyntaxErrorException& e) {} - - try { - Buffer wbuff(buffer, sizeof(buffer)); - wbuff.putOctet(2); - wbuff.putLongString("blah, blah"); - - Buffer rbuff(buffer, sizeof(buffer)); - Content content; - content.decode(rbuff); - CPPUNIT_FAIL("Expected exception"); - } catch (Exception& e) {} - - } - }; diff --git a/qpid/cpp/src/tests/HeaderTest.cpp b/qpid/cpp/src/tests/HeaderTest.cpp index 9e2bddb4de..56be38a302 100644 --- a/qpid/cpp/src/tests/HeaderTest.cpp +++ b/qpid/cpp/src/tests/HeaderTest.cpp @@ -61,16 +61,13 @@ class HeaderTest : public CppUnit::TestCase out.castBody<AMQHeaderBody>()->get<MessageProperties>(true); props1->setContentLength(42); - props1->setMessageId("messageId"); + props1->setMessageId(Uuid(true)); props1->setCorrelationId("correlationId"); props1->setReplyTo(ReplyTo("ex","key")); props1->setContentType("contentType"); props1->setContentEncoding("contentEncoding"); - props1->setType("type"); props1->setUserId("userId"); props1->setAppId("appId"); - props1->setTransactionId("transactionId"); - props1->setSecurityToken("securityToken"); char buff[10000]; Buffer wbuffer(buff, 10000); @@ -87,11 +84,8 @@ class HeaderTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(props1->getCorrelationId(), props2->getCorrelationId()); CPPUNIT_ASSERT_EQUAL(props1->getContentType(), props2->getContentType()); CPPUNIT_ASSERT_EQUAL(props1->getContentEncoding(), props2->getContentEncoding()); - CPPUNIT_ASSERT_EQUAL(props1->getType(), props2->getType()); CPPUNIT_ASSERT_EQUAL(props1->getUserId(), props2->getUserId()); CPPUNIT_ASSERT_EQUAL(props1->getAppId(), props2->getAppId()); - CPPUNIT_ASSERT_EQUAL(props1->getTransactionId(), props2->getTransactionId()); - CPPUNIT_ASSERT_EQUAL(props1->getSecurityToken(), props2->getSecurityToken()); } diff --git a/qpid/cpp/src/tests/MessageBuilderTest.cpp b/qpid/cpp/src/tests/MessageBuilderTest.cpp index 092e02cc2f..7149ec50c7 100644 --- a/qpid/cpp/src/tests/MessageBuilderTest.cpp +++ b/qpid/cpp/src/tests/MessageBuilderTest.cpp @@ -22,6 +22,7 @@ #include "qpid/broker/MessageBuilder.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/framing/frame_functors.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/TypeFilter.h" #include "qpid_test_plugin.h" #include <list> @@ -101,7 +102,7 @@ class MessageBuilderTest : public CppUnit::TestCase std::string key("builder-exchange"); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(0); @@ -124,7 +125,7 @@ class MessageBuilderTest : public CppUnit::TestCase std::string exchange("builder-exchange"); std::string key("builder-exchange"); - AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); AMQFrame content(in_place<AMQContentBody>(data)); method.setEof(false); @@ -158,7 +159,7 @@ class MessageBuilderTest : public CppUnit::TestCase std::string key("builder-exchange"); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); AMQFrame content1(in_place<AMQContentBody>(data1)); AMQFrame content2(in_place<AMQContentBody>(data2)); @@ -194,7 +195,7 @@ class MessageBuilderTest : public CppUnit::TestCase std::string key("builder-exchange"); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); AMQFrame content1(in_place<AMQContentBody>(data1)); AMQFrame content2(in_place<AMQContentBody>(data2)); diff --git a/qpid/cpp/src/tests/MessageTest.cpp b/qpid/cpp/src/tests/MessageTest.cpp index a19080e1ce..d7688c74a9 100644 --- a/qpid/cpp/src/tests/MessageTest.cpp +++ b/qpid/cpp/src/tests/MessageTest.cpp @@ -21,7 +21,9 @@ #include "qpid/broker/Message.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/FieldValue.h" +#include "qpid/framing/Uuid.h" #include "qpid_test_plugin.h" @@ -44,14 +46,14 @@ class MessageTest : public CppUnit::TestCase { string exchange = "MyExchange"; string routingKey = "MyRoutingKey"; - string messageId = "MyMessage"; + Uuid messageId(true); string data1("abcdefg"); string data2("hijklmn"); intrusive_ptr<Message> msg(new Message()); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); AMQFrame content1(in_place<AMQContentBody>(data1)); AMQFrame content2(in_place<AMQContentBody>(data2)); diff --git a/qpid/cpp/src/tests/MessageUtils.h b/qpid/cpp/src/tests/MessageUtils.h index 3def8cd41b..21ee834ba7 100644 --- a/qpid/cpp/src/tests/MessageUtils.h +++ b/qpid/cpp/src/tests/MessageUtils.h @@ -22,6 +22,8 @@ #include "qpid/broker/Message.h" #include "qpid/broker/MessageDelivery.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/Uuid.h" using namespace qpid; using namespace broker; @@ -29,12 +31,12 @@ using namespace framing; struct MessageUtils { - static boost::intrusive_ptr<Message> createMessage(const string& exchange, const string& routingKey, - const string& messageId, uint64_t contentSize = 0) + static boost::intrusive_ptr<Message> createMessage(const string& exchange="", const string& routingKey="", + const Uuid& messageId=Uuid(true), uint64_t contentSize = 0) { boost::intrusive_ptr<Message> msg(new Message()); - AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); msg->getFrames().append(method); diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 70132bce76..1d454d9f4a 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid_test_plugin.h" #include <iostream> #include "boost/format.hpp" @@ -75,7 +76,7 @@ class QueueTest : public CppUnit::TestCase intrusive_ptr<Message> message(std::string exchange, std::string routingKey) { intrusive_ptr<Message> msg(new Message()); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); msg->getFrames().append(method); msg->getFrames().append(header); diff --git a/qpid/cpp/src/tests/TxAckTest.cpp b/qpid/cpp/src/tests/TxAckTest.cpp index 89b98cb93c..b86f3d75e0 100644 --- a/qpid/cpp/src/tests/TxAckTest.cpp +++ b/qpid/cpp/src/tests/TxAckTest.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "MessageUtils.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/RecoveryManager.h" #include "qpid/broker/TxAck.h" @@ -69,14 +70,8 @@ public: TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries) { for(int i = 0; i < 10; i++){ - intrusive_ptr<Message> msg(new Message()); - AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, "exchange", 0, 0)); - AMQFrame header(in_place<AMQHeaderBody>()); - msg->getFrames().append(method); - msg->getFrames().append(header); + intrusive_ptr<Message> msg(MessageUtils::createMessage("exchange", "routing_key")); msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT); - msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key"); messages.push_back(msg); QueuedMessage qm(queue.get()); qm.payload = msg; diff --git a/qpid/cpp/src/tests/client_test.cpp b/qpid/cpp/src/tests/client_test.cpp index bd2a541c92..011dcd4678 100644 --- a/qpid/cpp/src/tests/client_test.cpp +++ b/qpid/cpp/src/tests/client_test.cpp @@ -33,12 +33,11 @@ #include "qpid/client/Message.h" #include "qpid/client/Session.h" #include "qpid/framing/FrameSet.h" -#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/all_method_bodies.h" using namespace qpid; using namespace qpid::client; -using qpid::framing::FrameSet; -using qpid::framing::MessageTransferBody; +using namespace qpid::framing; using std::string; struct Args : public qpid::TestOptions { @@ -104,14 +103,14 @@ int main(int argc, char** argv) if (opts.trace) std::cout << "Declared queue." << std::endl; //now bind the queue to the exchange - session.queueBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::routingKey="MyKey"); + session.exchangeBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::bindingKey="MyKey"); if (opts.trace) std::cout << "Bound queue to exchange." << std::endl; //create and send a message to the exchange using the routing //key we bound our queue with: Message msgOut(generateData(opts.msgSize)); msgOut.getDeliveryProperties().setRoutingKey("MyKey"); - session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut); + session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut, arg::acceptMode=1); if (opts.trace) print("Published message: ", msgOut); //subscribe to the queue, add sufficient credit and then get @@ -132,6 +131,8 @@ int main(int argc, char** argv) } else { print("Received an unexepected message: ", msgIn); } + } else { + throw Exception("Unexpected command received"); } //close the session & connection diff --git a/qpid/cpp/src/tests/exception_test.cpp b/qpid/cpp/src/tests/exception_test.cpp index f7a91f662f..7c68973d4d 100644 --- a/qpid/cpp/src/tests/exception_test.cpp +++ b/qpid/cpp/src/tests/exception_test.cpp @@ -96,7 +96,8 @@ QPID_AUTO_TEST_CASE(DisconnectedListen) { QPID_AUTO_TEST_CASE(NoSuchQueueTest) { ProxySessionFixture fix; - BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue").sync(), NotFoundException); + fix.session.setSynchronous(true); + BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue"), NotFoundException); } QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/latencytest.cpp b/qpid/cpp/src/tests/latencytest.cpp index 2b44a5477a..a61a4b2e42 100644 --- a/qpid/cpp/src/tests/latencytest.cpp +++ b/qpid/cpp/src/tests/latencytest.cpp @@ -192,7 +192,7 @@ Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0 mgr.setAckPolicy(AckPolicy(opts.ack ? opts.ack : (opts.prefetch / 2))); mgr.setFlowControl(opts.prefetch, SubscriptionManager::UNLIMITED, true); } else { - mgr.setConfirmMode(false); + mgr.setAcceptMode(1/*not-required*/); mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); } mgr.subscribe(*this, queue); @@ -257,14 +257,13 @@ void Sender::sendByCount() msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); } - Completion c; for (uint i = 0; i < opts.count; i++) { uint64_t sentAt(current_time()); msg.getDeliveryProperties().setTimestamp(sentAt); //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables - c = session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); } - c.sync(); + session.sync(); } void Sender::sendByRate() @@ -283,7 +282,7 @@ void Sender::sendByRate() uint64_t sentAt(current_time()); msg.getDeliveryProperties().setTimestamp(sentAt); //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables - session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); } uint64_t timeTaken = (current_time() - start) / TIME_USEC; if (timeTaken < 1000) { diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp index 3dd4e876fc..966d708ff6 100644 --- a/qpid/cpp/src/tests/perftest.cpp +++ b/qpid/cpp/src/tests/perftest.cpp @@ -210,7 +210,8 @@ struct Setup : public Client { void queueInit(string name, bool durable=false, const framing::FieldTable& settings=framing::FieldTable()) { session.queueDeclare(arg::queue=name, arg::durable=durable, arg::arguments=settings); - session.queuePurge(arg::queue=name).sync(); + session.queuePurge(arg::queue=name); + session.sync(); } void run() { @@ -334,7 +335,7 @@ struct Controller : public Client { << endl; Message msg(data, queue); for (size_t i = 0; i < n; ++i) - session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); } void run() { // Controller @@ -421,7 +422,6 @@ struct PublishThread : public Client { } void run() { // Publisher - Completion completion; try { string data; size_t offset(0); @@ -459,19 +459,19 @@ struct PublishThread : public Client { // any heap allocation. const_cast<std::string&>(msg.getData()).replace(offset, sizeof(uint32_t), reinterpret_cast<const char*>(&i), sizeof(uint32_t)); - completion = session.messageTransfer( + session.messageTransfer( arg::destination=destination, arg::content=msg, - arg::confirmMode=opts.confirm); - if (opts.intervalPub) ::usleep(opts.intervalPub*1000); + arg::acceptMode=1); + if (opts.intervalPub) ::usleep(opts.intervalPub*1000); } - if (opts.confirm) completion.sync(); + if (opts.confirm) session.sync(); AbsTime end=now(); double time=secs(start,end); // Send result to controller. Message report(lexical_cast<string>(opts.count/time), "pub_done"); - session.messageTransfer(arg::content=report); + session.messageTransfer(arg::content=report, arg::acceptMode=1); } session.close(); } @@ -496,9 +496,9 @@ struct SubscribeThread : public Client { arg::exclusive=true, arg::autoDelete=true, arg::durable=opts.durable); - session.queueBind(arg::queue=queue, - arg::exchange=ex, - arg::routingKey=key); + session.exchangeBind(arg::queue=queue, + arg::exchange=ex, + arg::bindingKey=key); } void verify(bool cond, const char* test, uint32_t expect, uint32_t actual) { @@ -506,7 +506,7 @@ struct SubscribeThread : public Client { Message error( QPID_MSG("Sequence error: expected n" << test << expect << " but got " << actual), "sub_done"); - session.messageTransfer(arg::content=error); + session.messageTransfer(arg::content=error, arg::acceptMode=1); throw Exception(error.getData()); } } @@ -515,12 +515,12 @@ struct SubscribeThread : public Client { try { SubscriptionManager subs(session); LocalQueue lq(AckPolicy(opts.ack)); - subs.setConfirmMode(opts.ack > 0); + subs.setAcceptMode(opts.ack > 0 ? 0 : 1); subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED, false); subs.subscribe(lq, queue); // Notify controller we are ready. - session.messageTransfer(arg::content=Message("ready", "sub_ready")); + session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1); for (size_t j = 0; j < opts.iterations; ++j) { @@ -556,7 +556,7 @@ struct SubscribeThread : public Client { // Report to publisher. Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), "sub_done"); - session.messageTransfer(arg::content=result); + session.messageTransfer(arg::content=result, arg::acceptMode=1); } session.close(); } diff --git a/qpid/cpp/src/tests/topic_listener.cpp b/qpid/cpp/src/tests/topic_listener.cpp index e5e7d24112..5208b67445 100644 --- a/qpid/cpp/src/tests/topic_listener.cpp +++ b/qpid/cpp/src/tests/topic_listener.cpp @@ -114,7 +114,7 @@ int main(int argc, char** argv){ } else { session.queueDeclare(arg::queue=control, arg::exclusive=true, arg::autoDelete=true); } - session.queueBind(arg::exchange="amq.topic", arg::queue=control, arg::routingKey="topic_control"); + session.exchangeBind(arg::exchange="amq.topic", arg::queue=control, arg::bindingKey="topic_control"); //set up listener SubscriptionManager mgr(session); @@ -123,7 +123,7 @@ int main(int argc, char** argv){ mgr.setAckPolicy(AckPolicy(args.ack ? args.ack : (args.prefetch / 2))); mgr.setFlowControl(args.prefetch, SubscriptionManager::UNLIMITED, true); } else { - mgr.setConfirmMode(false); + mgr.setAcceptMode(1/*-not-required*/); mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); } mgr.subscribe(listener, control); @@ -181,7 +181,7 @@ void Listener::report(){ << time/TIME_MSEC << " ms."; Message msg(reportstr.str(), responseQueue); msg.getHeaders().setString("TYPE", "REPORT"); - session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); if(transactional){ session.txCommit(); } diff --git a/qpid/cpp/src/tests/topic_publisher.cpp b/qpid/cpp/src/tests/topic_publisher.cpp index 2271849c35..8242530db1 100644 --- a/qpid/cpp/src/tests/topic_publisher.cpp +++ b/qpid/cpp/src/tests/topic_publisher.cpp @@ -164,12 +164,12 @@ int64_t Publisher::publish(int msgs, int listeners, int size){ AbsTime start = now(); for(int i = 0; i < msgs; i++){ - session.messageTransfer(arg::content=msg, arg::destination="amq.topic"); + session.messageTransfer(arg::content=msg, arg::destination="amq.topic", arg::acceptMode=1); } //send report request Message reportRequest("", controlTopic); reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); - session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic"); + session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic", arg::acceptMode=1); if(transactional){ session.txCommit(); } @@ -198,7 +198,7 @@ void Publisher::terminate(){ //send termination request Message terminationRequest("", controlTopic); terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); - session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic"); + session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic", arg::acceptMode=1); if(transactional){ session.txCommit(); } diff --git a/qpid/cpp/src/tests/txtest.cpp b/qpid/cpp/src/tests/txtest.cpp index 4c5814986c..5030b24070 100644 --- a/qpid/cpp/src/tests/txtest.cpp +++ b/qpid/cpp/src/tests/txtest.cpp @@ -142,7 +142,7 @@ struct Transfer : public Client, public Runnable out.setData(in.getData()); out.getMessageProperties().setCorrelationId(in.getMessageProperties().getCorrelationId()); out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode()); - session.messageTransfer(arg::content=out); + session.messageTransfer(arg::content=out, arg::acceptMode=1); } in.acknowledge(); session.txCommit(); @@ -168,7 +168,8 @@ struct Controller : public Client { //declare queues for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { - session.queueDeclare(arg::queue=*i, arg::durable=opts.durable).sync(); + session.queueDeclare(arg::queue=*i, arg::durable=opts.durable); + session.sync(); } Message msg(generateData(opts.size), *queues.begin()); @@ -179,7 +180,7 @@ struct Controller : public Client //publish messages for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) { msg.getMessageProperties().setCorrelationId(*i); - session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); } } @@ -205,7 +206,7 @@ struct Controller : public Client { SubscriptionManager subs(session); subs.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); - subs.setConfirmMode(false); + subs.setAcceptMode(1/*not-required*/); StringSet drained; //drain each queue and verify the correct set of messages are available @@ -213,7 +214,8 @@ struct Controller : public Client //subscribe, allocate credit and flush LocalQueue lq(AckPolicy(0));//manual acking subs.subscribe(lq, *i, *i); - session.messageFlush(arg::destination=*i).sync(); + session.messageFlush(arg::destination=*i); + session.sync(); uint count(0); while (!lq.empty()) { diff --git a/qpid/cpp/xml/extra.xml b/qpid/cpp/xml/extra.xml index 36a76765d5..5cb37ae65f 100644 --- a/qpid/cpp/xml/extra.xml +++ b/qpid/cpp/xml/extra.xml @@ -623,7 +623,7 @@ <class name="message010" index="4"> <doc>blah, blah</doc> - <method name = "transfer" index="1"> + <method name = "transfer" content="1" index="1"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> <chassis name="client" implement="MUST" /> @@ -818,7 +818,7 @@ <method name = "declare" index="1"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> - <field name="name" domain="shortstr"/> + <field name="exchange" domain="shortstr"/> <field name="type" domain="shortstr"/> <field name="alternate-exchange" domain="shortstr"/> <field name="passive" domain="bit"/> @@ -829,7 +829,7 @@ <method name = "delete" index="2"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> - <field name="name" domain="shortstr"/> + <field name="exchange" domain="shortstr"/> <field name="if-unused" domain="bit"/> </method> <method name = "query" index="3"> @@ -863,8 +863,8 @@ <method name = "bound" index="6"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> - <field name="queue" domain="shortstr"/> <field name="exchange" domain="shortstr"/> + <field name="queue" domain="shortstr"/> <field name="binding-key" domain="shortstr"/> <field name="arguments" domain="table"/> <result> @@ -884,7 +884,7 @@ <method name = "declare" index="1"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> - <field name="name" domain="shortstr"/> + <field name="queue" domain="shortstr"/> <field name="alternate-exchange" domain="shortstr"/> <field name="passive" domain="bit"/> <field name="durable" domain="bit"/> @@ -895,25 +895,25 @@ <method name = "delete" index="2"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> - <field name="name" domain="shortstr"/> + <field name="queue" domain="shortstr"/> <field name="if-unused" domain="bit"/> <field name="if-empty" domain="bit"/> </method> <method name = "purge" index="3"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> - <field name="name" domain="shortstr"/> + <field name="queue" domain="shortstr"/> </method> <method name = "query" index="4"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> - <field name="name" domain="shortstr"/> + <field name="queue" domain="shortstr"/> <result> <struct size="long" type="1"> <field name="name" domain="shortstr"/> <field name="alternate-exchange" domain="shortstr"/> - <field name="passive" domain="bit"/> <field name="durable" domain="bit"/> + <field name="exclusive" domain="bit"/> <field name="auto-delete" domain="bit"/> <field name="arguments" domain="table"/> <field name="message-count" domain="long"/> |