summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-20 12:10:37 +0000
committerGordon Sim <gsim@apache.org>2008-04-20 12:10:37 +0000
commit4780580874e8d6a3e3590fa5fdf8a088310b20ae (patch)
treea73e247b9821c2429a8e015ddff9cbb5d17a88e8
parente29bb6406ad8d0c0d9d58b7d1d09798829687602 (diff)
downloadqpid-python-4780580874e8d6a3e3590fa5fdf8a088310b20ae.tar.gz
QPID-920: converted c++ client to use final 0-10 protocol
* connection handler converted to using invoker & proxy and updated to final method defs * SessionCore & ExecutionHandler replace by SessionImpl * simplified handling of completion & results, removed handling of responses git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@649915 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/examples/examples/direct/declare_queues.cpp4
-rw-r--r--qpid/cpp/examples/examples/fanout/listener.cpp12
-rw-r--r--qpid/cpp/examples/examples/pub-sub/topic_listener.cpp4
-rw-r--r--qpid/cpp/examples/examples/request-response/client.cpp2
-rw-r--r--qpid/cpp/examples/examples/request-response/server.cpp2
-rw-r--r--qpid/cpp/rubygen/99-0/Session.rb11
-rw-r--r--qpid/cpp/rubygen/99-0/structs.rb2
-rwxr-xr-xqpid/cpp/rubygen/amqpgen.rb2
-rwxr-xr-xqpid/cpp/rubygen/cppgen.rb35
-rw-r--r--qpid/cpp/src/Makefile.am20
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerAdapter.cpp20
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerAdapter.h6
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp40
-rw-r--r--qpid/cpp/src/qpid/broker/DtxHandlerImpl.h14
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/MessageAdapter.cpp23
-rw-r--r--qpid/cpp/src/qpid/broker/MessageAdapter.h5
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDelivery.cpp17
-rw-r--r--qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticHandler.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp77
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.h32
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h1
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp32
-rw-r--r--qpid/cpp/src/qpid/client/Channel.cpp45
-rw-r--r--qpid/cpp/src/qpid/client/Channel.h2
-rw-r--r--qpid/cpp/src/qpid/client/Completion.h15
-rw-r--r--qpid/cpp/src/qpid/client/CompletionTracker.cpp121
-rw-r--r--qpid/cpp/src/qpid/client/CompletionTracker.h77
-rw-r--r--qpid/cpp/src/qpid/client/Connection.cpp6
-rw-r--r--qpid/cpp/src/qpid/client/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.cpp181
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.h51
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.cpp15
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.h8
-rw-r--r--qpid/cpp/src/qpid/client/Connector.cpp9
-rw-r--r--qpid/cpp/src/qpid/client/Connector.h3
-rw-r--r--qpid/cpp/src/qpid/client/Correlator.cpp45
-rw-r--r--qpid/cpp/src/qpid/client/Correlator.h52
-rw-r--r--qpid/cpp/src/qpid/client/Dispatcher.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/Execution.h24
-rw-r--r--qpid/cpp/src/qpid/client/ExecutionHandler.cpp267
-rw-r--r--qpid/cpp/src/qpid/client/ExecutionHandler.h104
-rw-r--r--qpid/cpp/src/qpid/client/Future.cpp (renamed from qpid/cpp/src/qpid/client/FutureResponse.h)33
-rw-r--r--qpid/cpp/src/qpid/client/Future.h55
-rw-r--r--qpid/cpp/src/qpid/client/FutureFactory.cpp51
-rw-r--r--qpid/cpp/src/qpid/client/FutureFactory.h48
-rw-r--r--qpid/cpp/src/qpid/client/FutureResponse.cpp45
-rw-r--r--qpid/cpp/src/qpid/client/FutureResult.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/FutureResult.h4
-rw-r--r--qpid/cpp/src/qpid/client/Message.cpp76
-rw-r--r--qpid/cpp/src/qpid/client/Message.h63
-rw-r--r--qpid/cpp/src/qpid/client/Results.cpp71
-rw-r--r--qpid/cpp/src/qpid/client/Results.h (renamed from qpid/cpp/src/qpid/client/Response.h)43
-rw-r--r--qpid/cpp/src/qpid/client/SessionBase.cpp29
-rw-r--r--qpid/cpp/src/qpid/client/SessionBase.h38
-rw-r--r--qpid/cpp/src/qpid/client/SessionCore.cpp440
-rw-r--r--qpid/cpp/src/qpid/client/SessionCore.h141
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.cpp605
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.h188
-rw-r--r--qpid/cpp/src/qpid/client/SubscriptionManager.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/SubscriptionManager.h10
-rw-r--r--qpid/cpp/src/qpid/client/TypedResult.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp4
-rw-r--r--qpid/cpp/src/qpid/framing/AMQFrame.h1
-rw-r--r--qpid/cpp/src/qpid/framing/AMQHeaderBody.h8
-rw-r--r--qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h4
-rw-r--r--qpid/cpp/src/qpid/framing/SequenceSet.h3
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp15
-rw-r--r--qpid/cpp/src/tests/ClientSessionTest.cpp8
-rw-r--r--qpid/cpp/src/tests/ExchangeTest.cpp1
-rw-r--r--qpid/cpp/src/tests/FramingTest.cpp108
-rw-r--r--qpid/cpp/src/tests/HeaderTest.cpp8
-rw-r--r--qpid/cpp/src/tests/MessageBuilderTest.cpp9
-rw-r--r--qpid/cpp/src/tests/MessageTest.cpp6
-rw-r--r--qpid/cpp/src/tests/MessageUtils.h8
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp3
-rw-r--r--qpid/cpp/src/tests/TxAckTest.cpp9
-rw-r--r--qpid/cpp/src/tests/client_test.cpp11
-rw-r--r--qpid/cpp/src/tests/exception_test.cpp3
-rw-r--r--qpid/cpp/src/tests/latencytest.cpp9
-rw-r--r--qpid/cpp/src/tests/perftest.cpp30
-rw-r--r--qpid/cpp/src/tests/topic_listener.cpp6
-rw-r--r--qpid/cpp/src/tests/topic_publisher.cpp6
-rw-r--r--qpid/cpp/src/tests/txtest.cpp12
-rw-r--r--qpid/cpp/xml/extra.xml18
89 files changed, 1587 insertions, 2083 deletions
diff --git a/qpid/cpp/examples/examples/direct/declare_queues.cpp b/qpid/cpp/examples/examples/direct/declare_queues.cpp
index de7eff0490..71ed28dac4 100644
--- a/qpid/cpp/examples/examples/direct/declare_queues.cpp
+++ b/qpid/cpp/examples/examples/direct/declare_queues.cpp
@@ -66,10 +66,10 @@ int main(int argc, char** argv) {
//--------- Main body of program --------------------------------------------
// Create a queue named "message_queue", and route all messages whose
- // routing key is "routing_key to this newly created queue.
+ // routing key is "routing_key" to this newly created queue.
session.queueDeclare(arg::queue="message_queue");
- session.queueBind(arg::exchange="amq.direct", arg::queue="message_queue", arg::routingKey="routing_key");
+ session.exchangeBind(arg::exchange="amq.direct", arg::queue="message_queue", arg::bindingKey="routing_key");
//-----------------------------------------------------------------------------
diff --git a/qpid/cpp/examples/examples/fanout/listener.cpp b/qpid/cpp/examples/examples/fanout/listener.cpp
index 2860528b1f..79809d679e 100644
--- a/qpid/cpp/examples/examples/fanout/listener.cpp
+++ b/qpid/cpp/examples/examples/fanout/listener.cpp
@@ -71,13 +71,15 @@ int main(int argc, char** argv) {
// Unique name for private queue:
std::string myQueue=session.getId().str();
- // Declear my queue.
+ // Declare my queue.
session.queueDeclare(arg::queue=myQueue, arg::exclusive=true,
arg::autoDelete=true);
- // Bind my queue to the fanout exchange.
- // Note no routingKey required, the fanout exchange delivers
- // all messages to all bound queues unconditionally.
- session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue);
+ // Bind my queue to the fanout exchange.
+ //Note no the binding key will not affect routing (its just
+ //used to identify the binding e.g. when unbinding), the
+ //fanout exchange delivers all messages to all bound queues
+ //unconditionally.
+ session.exchangeBind(arg::exchange="amq.fanout", arg::queue=myQueue, arg::bindingKey="my-key");
// Create a listener and subscribe it to my queue.
SubscriptionManager subscriptions(session);
diff --git a/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp b/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp
index e5292db703..c7e9d3877d 100644
--- a/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp
+++ b/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp
@@ -107,8 +107,8 @@ void Listener::prepareQueue(std::string queue, std::string routing_key) {
* "control" routing key, when it is finished.
*/
- session.queueBind(arg::exchange="amq.topic", arg::queue=queue, arg::routingKey=routing_key);
- session.queueBind(arg::exchange="amq.topic", arg::queue=queue, arg::routingKey="control");
+ session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey=routing_key);
+ session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey="control");
/*
* subscribe to the queue using the subscription manager.
diff --git a/qpid/cpp/examples/examples/request-response/client.cpp b/qpid/cpp/examples/examples/request-response/client.cpp
index 9f82bd9d9e..8cec16a461 100644
--- a/qpid/cpp/examples/examples/request-response/client.cpp
+++ b/qpid/cpp/examples/examples/request-response/client.cpp
@@ -128,7 +128,7 @@ int main(int argc, char** argv) {
// Use the name of the response queue as the routing key
session.queueDeclare(arg::queue=response_queue.str());
- session.queueBind(arg::exchange="amq.direct", arg::queue=response_queue.str(), arg::routingKey=response_queue.str());
+ session.exchangeBind(arg::exchange="amq.direct", arg::queue=response_queue.str(), arg::bindingKey=response_queue.str());
// Create a listener for the response queue and start listening.
diff --git a/qpid/cpp/examples/examples/request-response/server.cpp b/qpid/cpp/examples/examples/request-response/server.cpp
index 0de2ce5234..6c9bc7ffa6 100644
--- a/qpid/cpp/examples/examples/request-response/server.cpp
+++ b/qpid/cpp/examples/examples/request-response/server.cpp
@@ -143,7 +143,7 @@ int main(int argc, char** argv) {
// Use the name of the request queue as the routing key
session.queueDeclare(arg::queue=request_queue);
- session.queueBind(arg::exchange="amq.direct", arg::queue=request_queue, arg::routingKey=request_queue);
+ session.exchangeBind(arg::exchange="amq.direct", arg::queue=request_queue, arg::bindingKey=request_queue);
// Create a listener for the request queue and start listening.
diff --git a/qpid/cpp/rubygen/99-0/Session.rb b/qpid/cpp/rubygen/99-0/Session.rb
index 1ec78f6167..8e14d8daf9 100644
--- a/qpid/cpp/rubygen/99-0/Session.rb
+++ b/qpid/cpp/rubygen/99-0/Session.rb
@@ -8,7 +8,7 @@ class CppGen
def session_methods
excludes = ["channel", "connection", "session", "execution"]
gen_methods=@amqp.methods_on(@chassis).reject { |m|
- excludes.include? m.parent.name or m.body_name.include?("010")
+ excludes.include? m.classname or !m.parent.name.include?("010")
}
end
@@ -43,7 +43,7 @@ class AmqpMethod
def param_names_c() fields_c.map { |f| f.cppname} end
def signature_c() fields_c.map { |f| f.signature }; end
def sig_c_default() fields_c.map { |f| f.sig_default }; end
- def argpack_name() "#{parent.cppname}#{name.caps}Parameters"; end
+ def argpack_name() "#{classname.lcaps.cppsafe}#{name.caps}Parameters"; end
def argpack_type()
"boost::parameter::parameters<" +
fields_c.map { |f| "arg::keyword_tags::"+f.cppname }.join(',') +
@@ -54,7 +54,7 @@ class AmqpMethod
return "Response" if (not responses().empty?)
return "Completion"
end
- def session_function() "#{parent.name.lcaps}#{name.caps}"; end
+ def session_function() "#{classname.lcaps}#{name.caps}"; end
end
class SessionNoKeywordGen < CppGen
@@ -75,6 +75,7 @@ class SessionNoKeywordGen < CppGen
genl "using framing::Content;"
genl "using framing::FieldTable;"
genl "using framing::MethodContent;"
+ genl "using framing::SequenceSet;"
genl "using framing::SequenceNumberSet;"
genl "using framing::Uuid;"
genl
@@ -86,7 +87,7 @@ class SessionNoKeywordGen < CppGen
cpp_class(@classname, "public SessionBase") {
public
genl "Session_#{@amqp.version.bars}() {}"
- genl "Session_#{@amqp.version.bars}(shared_ptr<SessionCore> core) : SessionBase(core) {}"
+ genl "Session_#{@amqp.version.bars}(shared_ptr<SessionImpl> core) : SessionBase(core) {}"
session_methods.each { |m|
genl
doxygen(m)
@@ -180,7 +181,7 @@ EOS
# Session class.
cpp_class(@classname,"public #{@base}") {
private
- genl "#{@classname}(shared_ptr<SessionCore> core) : #{ @base}(core) {}"
+ genl "#{@classname}(shared_ptr<SessionImpl> core) : #{ @base}(core) {}"
keyword_methods.each { |m| typedef m.argpack_type, m.argpack_name }
genl "friend class Connection;"
public
diff --git a/qpid/cpp/rubygen/99-0/structs.rb b/qpid/cpp/rubygen/99-0/structs.rb
index 58e175af0f..b7d230d528 100644
--- a/qpid/cpp/rubygen/99-0/structs.rb
+++ b/qpid/cpp/rubygen/99-0/structs.rb
@@ -35,7 +35,7 @@ class StructGen < CppGen
def is_packed(s)
#return true
- s.kind_of?(AmqpStruct) or s.body_name.include?("010")
+ s.kind_of?(AmqpStruct) or s.parent.name.include?("010")
end
def execution_header?(s)
diff --git a/qpid/cpp/rubygen/amqpgen.rb b/qpid/cpp/rubygen/amqpgen.rb
index bf473506d4..76685aa45b 100755
--- a/qpid/cpp/rubygen/amqpgen.rb
+++ b/qpid/cpp/rubygen/amqpgen.rb
@@ -341,7 +341,7 @@ class AmqpClass < AmqpElement
end
def l4?() # preview
- !["connection", "session", "execution"].include?(name)
+ !["connection", "session", "execution"].include?(name) && !control?
end
def control?()
diff --git a/qpid/cpp/rubygen/cppgen.rb b/qpid/cpp/rubygen/cppgen.rb
index 757894163d..c1121e9bfe 100755
--- a/qpid/cpp/rubygen/cppgen.rb
+++ b/qpid/cpp/rubygen/cppgen.rb
@@ -88,6 +88,12 @@ class CppType
def passcref() @param="const #{name}&"; self; end
def code(str) @code=str; self; end
def defval(str) @defval=str; self; end
+ def fq(namespace)
+ @param="const #{namespace}::#{name}&"
+ @ret="const #{namespace}::#{name}&"
+ @defval="#{namespace}::#{name}()"
+ self
+ end
def encode(value, buffer)
@code ? "#{buffer}.put#{@code}(#{value});" : "#{value}.encode(#{buffer});"
@@ -143,7 +149,19 @@ class AmqpMethod
def cppname() name.lcaps.cppsafe; end
def param_names() fields.map { |f| f.cppname }; end
def signature() fields.map { |f| f.signature }; end
- def body_name() parent.name.caps+name.caps+"Body"; end
+ def classname()
+ #TODO: remove name mangling after preview path is dropped
+ if (parent.name.include?("010"))
+ return parent.name.delete("010")
+ elsif (parent.name == "cluster")
+ return parent.name
+ else
+ return parent.name + "X"
+ end
+ end
+ def body_name()
+ classname().caps+name.caps+"Body"
+ end
def cpp_pack_type() # preview
CppType.new("uint16_t").code("Short").defval("0");
@@ -211,7 +229,16 @@ class AmqpDomain
"uuid"=>CppType.new("Uuid").passcref.retcref
}
- def cppname() name.caps; end
+ def cppname()
+ #TODO: remove name mangling after preview path is dropped
+ if (name.include?("010"))
+ return name.caps.delete("010")
+ elsif (name.include?("properties"))
+ return "Preview" + name.caps
+ else
+ return name.caps
+ end
+ end
def fqtypename()
return containing_class.nsname+"::"+name.typename if containing_class
@@ -221,7 +248,7 @@ class AmqpDomain
def cpptype()
d=unalias
@cpptype ||= @@typemap[d.type_] or
- CppType.new(d.cppname).passcref.retcref or
+ CppType.new(d.cppname).fq("qpid::framing") or
raise "Invalid type #{self}"
end
@@ -232,7 +259,7 @@ end
class AmqpResult
def cpptype()
- @cpptype=CppType.new(parent.parent.name.caps+parent.name.caps+"Result").passcref
+ @cpptype=CppType.new(parent.classname.caps+parent.name.caps+"Result").passcref
end
end
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 82e6f4e6a4..cf4630cdf0 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -265,26 +265,24 @@ libqpidbroker_la_SOURCES = \
libqpidclient_la_LIBADD = libqpidcommon.la
libqpidclient_la_SOURCES = \
$(rgen_client_srcs) \
- qpid/client/SessionBase.cpp \
qpid/client/Connection.cpp \
qpid/client/Channel.cpp \
qpid/client/Exchange.cpp \
- qpid/broker/PersistableMessage.cpp \
qpid/client/Queue.cpp \
qpid/client/ConnectionImpl.cpp \
qpid/client/Connector.cpp \
qpid/client/Demux.cpp \
qpid/client/Dispatcher.cpp \
qpid/client/LocalQueue.cpp \
+ qpid/client/Message.cpp \
qpid/client/MessageListener.cpp \
- qpid/client/Correlator.cpp \
- qpid/client/CompletionTracker.cpp \
qpid/client/ConnectionHandler.cpp \
- qpid/client/ExecutionHandler.cpp \
+ qpid/client/Future.cpp \
qpid/client/FutureCompletion.cpp \
- qpid/client/FutureResponse.cpp \
qpid/client/FutureResult.cpp \
- qpid/client/SessionCore.cpp \
+ qpid/client/Results.cpp \
+ qpid/client/SessionBase.cpp \
+ qpid/client/SessionImpl.cpp \
qpid/client/StateManager.cpp \
qpid/client/SubscriptionManager.cpp
@@ -392,27 +390,23 @@ nobase_include_HEADERS = \
qpid/client/Queue.h \
qpid/client/AckPolicy.h \
qpid/client/Completion.h \
- qpid/client/CompletionTracker.h \
qpid/client/Connection.h \
qpid/client/ConnectionHandler.h \
qpid/client/ConnectionImpl.h \
qpid/client/Connector.h \
- qpid/client/Correlator.h \
qpid/client/Demux.h \
qpid/client/Dispatcher.h \
qpid/client/LocalQueue.h \
qpid/client/Execution.h \
- qpid/client/ExecutionHandler.h \
qpid/client/Future.h \
qpid/client/FutureCompletion.h \
- qpid/client/FutureResponse.h \
qpid/client/FutureResult.h \
qpid/client/MessageListener.h \
qpid/client/MessageQueue.h \
- qpid/client/Response.h \
+ qpid/client/Results.h \
qpid/client/SessionBase.h \
qpid/client/Session.h \
- qpid/client/SessionCore.h \
+ qpid/client/SessionImpl.h \
qpid/client/StateManager.h \
qpid/client/SubscriptionManager.h \
qpid/client/TypedResult.h \
diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
index ea964ef3a3..b83a275959 100644
--- a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -106,17 +106,17 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const stri
getBroker().getExchanges().destroy(name);
}
-ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
+ExchangeXQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
{
try {
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
- return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
+ return ExchangeXQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
} catch (const ChannelException& e) {
- return ExchangeQueryResult("", false, true, FieldTable());
+ return ExchangeXQueryResult("", false, true, FieldTable());
}
}
-BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
+BindingXQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
const std::string& exchangeName,
const std::string& queueName,
const std::string& key,
@@ -133,27 +133,27 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/
}
if (!exchange) {
- return BindingQueryResult(true, false, false, false, false);
+ return BindingXQueryResult(true, false, false, false, false);
} else if (!queueName.empty() && !queue) {
- return BindingQueryResult(false, true, false, false, false);
+ return BindingXQueryResult(false, true, false, false, false);
} else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
- return BindingQueryResult(false, false, false, false, false);
+ return BindingXQueryResult(false, false, false, false, false);
} else {
//need to test each specified option individually
bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
- return BindingQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched);
+ return BindingXQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched);
}
}
-QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
+QueueXQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
{
Queue::shared_ptr queue = state.getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
- return QueueQueryResult(queue->getName(),
+ return QueueXQueryResult(queue->getName(),
alternateExchange ? alternateExchange->getName() : "",
queue->isDurable(),
queue->hasExclusiveOwner(),
diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.h b/qpid/cpp/src/qpid/broker/BrokerAdapter.h
index b28c4ebdcc..26dfe802e1 100644
--- a/qpid/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.h
@@ -111,7 +111,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
const qpid::framing::FieldTable& arguments);
void delete_(uint16_t ticket,
const std::string& exchange, bool ifUnused);
- framing::ExchangeQueryResult query(u_int16_t ticket,
+ framing::ExchangeXQueryResult query(u_int16_t ticket,
const std::string& name);
private:
void checkType(shared_ptr<Exchange> exchange, const std::string& type);
@@ -127,7 +127,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
public:
BindingHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
- framing::BindingQueryResult query(u_int16_t ticket,
+ framing::BindingXQueryResult query(u_int16_t ticket,
const std::string& exchange,
const std::string& queue,
const std::string& routingKey,
@@ -154,7 +154,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
const std::string& exchange,
const std::string& routingKey,
const qpid::framing::FieldTable& arguments );
- framing::QueueQueryResult query(const std::string& queue);
+ framing::QueueXQueryResult query(const std::string& queue);
void purge(uint16_t ticket, const std::string& queue);
void delete_(uint16_t ticket, const std::string& queue,
bool ifUnused, bool ifEmpty);
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 0e91c081c0..79f9064b9d 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -22,8 +22,6 @@
#include "ConnectionHandler.h"
#include "Connection.h"
-#include "qpid/framing/ConnectionStartBody.h"
-#include "qpid/framing/Connection010StartBody.h"
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
diff --git a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 533872e849..61ab856fa9 100644
--- a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -36,7 +36,7 @@ void DtxHandlerImpl::select()
state.selectDtx();
}
-DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
+DtxDemarcationXEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
const string& xid,
bool fail,
bool suspend)
@@ -47,7 +47,7 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
if (suspend) {
throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
} else {
- return DtxDemarcationEndResult(XA_RBROLLBACK);
+ return DtxDemarcationXEndResult(XA_RBROLLBACK);
}
} else {
if (suspend) {
@@ -55,14 +55,14 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
} else {
state.endDtx(xid, false);
}
- return DtxDemarcationEndResult(XA_OK);
+ return DtxDemarcationXEndResult(XA_OK);
}
} catch (const DtxTimeoutException& e) {
- return DtxDemarcationEndResult(XA_RBTIMEOUT);
+ return DtxDemarcationXEndResult(XA_RBTIMEOUT);
}
}
-DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/,
+DtxDemarcationXStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/,
const string& xid,
bool join,
bool resume)
@@ -76,50 +76,50 @@ DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/,
} else {
state.startDtx(xid, getBroker().getDtxManager(), join);
}
- return DtxDemarcationStartResult(XA_OK);
+ return DtxDemarcationXStartResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- return DtxDemarcationStartResult(XA_RBTIMEOUT);
+ return DtxDemarcationXStartResult(XA_RBTIMEOUT);
}
}
// DtxCoordinationHandler:
-DtxCoordinationPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/,
+DtxCoordinationXPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/,
const string& xid)
{
try {
bool ok = getBroker().getDtxManager().prepare(xid);
- return DtxCoordinationPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
+ return DtxCoordinationXPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- return DtxCoordinationPrepareResult(XA_RBTIMEOUT);
+ return DtxCoordinationXPrepareResult(XA_RBTIMEOUT);
}
}
-DtxCoordinationCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/,
+DtxCoordinationXCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/,
const string& xid,
bool onePhase)
{
try {
bool ok = getBroker().getDtxManager().commit(xid, onePhase);
- return DtxCoordinationCommitResult(ok ? XA_OK : XA_RBROLLBACK);
+ return DtxCoordinationXCommitResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- return DtxCoordinationCommitResult(XA_RBTIMEOUT);
+ return DtxCoordinationXCommitResult(XA_RBTIMEOUT);
}
}
-DtxCoordinationRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
+DtxCoordinationXRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
const string& xid )
{
try {
getBroker().getDtxManager().rollback(xid);
- return DtxCoordinationRollbackResult(XA_OK);
+ return DtxCoordinationXRollbackResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- return DtxCoordinationRollbackResult(XA_RBTIMEOUT);
+ return DtxCoordinationXRollbackResult(XA_RBTIMEOUT);
}
}
-DtxCoordinationRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/,
+DtxCoordinationXRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/,
bool /*startscan*/,
bool /*endscan*/ )
{
@@ -144,7 +144,7 @@ DtxCoordinationRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/,
data.push_back(*i);
}
Array indoubt(data);
- return DtxCoordinationRecoverResult(indoubt);
+ return DtxCoordinationXRecoverResult(indoubt);
}
void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
@@ -154,10 +154,10 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!"));
}
-DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid)
+DtxCoordinationXGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid)
{
uint32_t timeout = getBroker().getDtxManager().getTimeout(xid);
- return DtxCoordinationGetTimeoutResult(timeout);
+ return DtxCoordinationXGetTimeoutResult(timeout);
}
diff --git a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
index 5bc9d5142a..efb56dba95 100644
--- a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
+++ b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
@@ -36,27 +36,27 @@ public:
// DtxCoordinationHandler:
- framing::DtxCoordinationCommitResult commit(u_int16_t ticket, const std::string& xid, bool onePhase);
+ framing::DtxCoordinationXCommitResult commit(u_int16_t ticket, const std::string& xid, bool onePhase);
void forget(u_int16_t ticket, const std::string& xid);
- framing::DtxCoordinationGetTimeoutResult getTimeout(const std::string& xid);
+ framing::DtxCoordinationXGetTimeoutResult getTimeout(const std::string& xid);
- framing::DtxCoordinationPrepareResult prepare(u_int16_t ticket, const std::string& xid);
+ framing::DtxCoordinationXPrepareResult prepare(u_int16_t ticket, const std::string& xid);
- framing::DtxCoordinationRecoverResult recover(u_int16_t ticket, bool startscan, bool endscan);
+ framing::DtxCoordinationXRecoverResult recover(u_int16_t ticket, bool startscan, bool endscan);
- framing::DtxCoordinationRollbackResult rollback(u_int16_t ticket, const std::string& xid);
+ framing::DtxCoordinationXRollbackResult rollback(u_int16_t ticket, const std::string& xid);
void setTimeout(u_int16_t ticket, const std::string& xid, u_int32_t timeout);
// DtxDemarcationHandler:
- framing::DtxDemarcationEndResult end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend);
+ framing::DtxDemarcationXEndResult end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend);
void select();
- framing::DtxDemarcationStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume);
+ framing::DtxDemarcationXStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume);
};
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index b60a95228d..297e610418 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -22,9 +22,9 @@
#include "Message.h"
#include "ExchangeRegistry.h"
#include "qpid/framing/frame_functors.h"
-#include "qpid/framing/BasicPublishBody.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/MessageXTransferBody.h"
#include "qpid/framing/SendContent.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/TypeFilter.h"
@@ -225,9 +225,9 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/)
MessageAdapter& Message::getAdapter() const
{
if (!adapter) {
- if(frames.isA<MessageTransferBody>()) {
+ if(frames.isA<MessageXTransferBody>()) {
adapter = &TRANSFER_99_0;
- } else if(frames.isA<Message010TransferBody>()) {
+ } else if(frames.isA<MessageTransferBody>()) {
adapter = &TRANSFER;
} else {
const AMQMethodBody* method = frames.getMethod();
diff --git a/qpid/cpp/src/qpid/broker/MessageAdapter.cpp b/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
index 0e99d923d4..013e2c91ac 100644
--- a/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
@@ -21,6 +21,11 @@
#include "MessageAdapter.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/MessageXTransferBody.h"
+
namespace {
const std::string empty;
}
@@ -30,13 +35,13 @@ namespace broker{
std::string TransferAdapter::getRoutingKey(const framing::FrameSet& f)
{
- const framing::DeliveryProperties010* p = f.getHeaders()->get<framing::DeliveryProperties010>();
+ const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
return p ? p->getRoutingKey() : empty;
}
std::string TransferAdapter::getExchange(const framing::FrameSet& f)
{
- return f.as<framing::Message010TransferBody>()->getDestination();
+ return f.as<framing::MessageTransferBody>()->getDestination();
}
bool TransferAdapter::isImmediate(const framing::FrameSet&)
@@ -47,42 +52,42 @@ namespace broker{
const framing::FieldTable* TransferAdapter::getApplicationHeaders(const framing::FrameSet& f)
{
- const framing::MessageProperties010* p = f.getHeaders()->get<framing::MessageProperties010>();
+ const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>();
return p ? &(p->getApplicationHeaders()) : 0;
}
bool TransferAdapter::isPersistent(const framing::FrameSet& f)
{
- const framing::DeliveryProperties010* p = f.getHeaders()->get<framing::DeliveryProperties010>();
+ const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
return p && p->getDeliveryMode() == 2;
}
bool TransferAdapter::requiresAccept(const framing::FrameSet& f)
{
- const framing::Message010TransferBody* b = f.as<framing::Message010TransferBody>();
+ const framing::MessageTransferBody* b = f.as<framing::MessageTransferBody>();
return b && b->getAcceptMode() == 0/*EXPLICIT == 0*/;
}
std::string PreviewAdapter::getExchange(const framing::FrameSet& f)
{
- return f.as<framing::MessageTransferBody>()->getDestination();
+ return f.as<framing::MessageXTransferBody>()->getDestination();
}
std::string PreviewAdapter::getRoutingKey(const framing::FrameSet& f)
{
- const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+ const framing::PreviewDeliveryProperties* p = f.getHeaders()->get<framing::PreviewDeliveryProperties>();
return p ? p->getRoutingKey() : empty;
}
const framing::FieldTable* PreviewAdapter::getApplicationHeaders(const framing::FrameSet& f)
{
- const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>();
+ const framing::PreviewMessageProperties* p = f.getHeaders()->get<framing::PreviewMessageProperties>();
return p ? &(p->getApplicationHeaders()) : 0;
}
bool PreviewAdapter::isPersistent(const framing::FrameSet& f)
{
- const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+ const framing::PreviewDeliveryProperties* p = f.getHeaders()->get<framing::PreviewDeliveryProperties>();
return p && p->getDeliveryMode() == 2;
}
diff --git a/qpid/cpp/src/qpid/broker/MessageAdapter.h b/qpid/cpp/src/qpid/broker/MessageAdapter.h
index 9759f320ac..4c13e756e9 100644
--- a/qpid/cpp/src/qpid/broker/MessageAdapter.h
+++ b/qpid/cpp/src/qpid/broker/MessageAdapter.h
@@ -23,13 +23,8 @@
*/
#include <string>
-#include "qpid/framing/BasicPublishBody.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/DeliveryProperties.h"
-#include "qpid/framing/MessageProperties.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/Message010TransferBody.h"
namespace qpid {
namespace broker {
diff --git a/qpid/cpp/src/qpid/broker/MessageDelivery.cpp b/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
index 9ef7090cd9..36862edf37 100644
--- a/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
@@ -24,9 +24,10 @@
#include "Message.h"
#include "Queue.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/BasicDeliverBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
+#include "qpid/framing/BasicXDeliverBody.h"
+#include "qpid/framing/BasicXGetOkBody.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/MessageXTransferBody.h"
using namespace boost;
@@ -52,7 +53,7 @@ struct BasicGetToken : BaseToken
AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id)
{
- return AMQFrame(in_place<BasicGetOkBody>(
+ return AMQFrame(in_place<BasicXGetOkBody>(
ProtocolVersion(), id.getValue(),
msg->getRedelivered(), msg->getExchangeName(),
msg->getRoutingKey(), queue->getMessageCount()));
@@ -69,7 +70,7 @@ struct BasicConsumeToken : BaseToken
AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id)
{
- return AMQFrame(in_place<BasicDeliverBody>(
+ return AMQFrame(in_place<BasicXDeliverBody>(
ProtocolVersion(), consumer, id.getValue(),
msg->getRedelivered(), msg->getExchangeName(),
msg->getRoutingKey()));
@@ -92,16 +93,16 @@ struct MessageDeliveryToken : BaseToken
//may need to set the redelivered flag:
if (isPreview) {
if (msg->getRedelivered()){
- msg->getProperties<DeliveryProperties>()->setRedelivered(true);
+ msg->getProperties<PreviewDeliveryProperties>()->setRedelivered(true);
}
- return AMQFrame(in_place<MessageTransferBody>(
+ return AMQFrame(in_place<MessageXTransferBody>(
ProtocolVersion(), 0, destination,
confirmMode, acquireMode));
} else {
if (msg->getRedelivered()){
- msg->getProperties<DeliveryProperties010>()->setRedelivered(true);
+ msg->getProperties<DeliveryProperties>()->setRedelivered(true);
}
- return AMQFrame(in_place<Message010TransferBody>(
+ return AMQFrame(in_place<MessageTransferBody>(
ProtocolVersion(), destination, confirmMode, acquireMode));
}
}
diff --git a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 64c0282963..5e0e759dfb 100644
--- a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -23,8 +23,6 @@
#include "Connection.h"
#include "Broker.h"
#include "MessageDelivery.h"
-#include "qpid/framing/MessageAppendBody.h"
-#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "BrokerAdapter.h"
diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
index eb45ff1492..411e0ce3c0 100644
--- a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -24,8 +24,7 @@
#include "SessionContext.h"
#include "BrokerAdapter.h"
#include "MessageDelivery.h"
-#include "qpid/framing/ExecutionCompleteBody.h"
-#include "qpid/framing/ExecutionResultBody.h"
+#include "qpid/framing/ExecutionXCompleteBody.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
@@ -182,7 +181,7 @@ SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
classId = frame.castBody<AMQMethodBody>()->amqpClassId();
switch (classId) {
- case ExecutionCompleteBody::CLASS_ID:
+ case ExecutionXCompleteBody::CLASS_ID:
return EXECUTION_CONTROL_TRACK;
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 2251901340..c2f6e3c307 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -34,7 +34,7 @@
#include "TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/Message010TransferBody.h"
+#include "qpid/framing/MessageXTransferBody.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -351,10 +351,11 @@ void SemanticState::handle(intrusive_ptr<Message> msg) {
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
- if (msg->isA<MessageTransferBody>()) {
+ //TODO: the following should be hidden behind message (using MessageAdapter or similar)
+ if (msg->isA<MessageXTransferBody>()) {
+ msg->getProperties<PreviewDeliveryProperties>()->setExchange(exchangeName);
+ } else if (msg->isA<MessageTransferBody>()) {
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
- } else if (msg->isA<Message010TransferBody>()) {
- msg->getProperties<DeliveryProperties010>()->setExchange(exchangeName);
}
if (!cacheExchange || cacheExchange->getName() != exchangeName){
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index b7985e9ed8..3ad29e6271 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -107,13 +107,13 @@ void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifU
getBroker().getExchanges().destroy(name);
}
-Exchange010QueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
+ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
{
try {
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
- return Exchange010QueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
+ return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
} catch (const ChannelException& e) {
- return Exchange010QueryResult("", false, true, FieldTable());
+ return ExchangeQueryResult("", false, true, FieldTable());
}
}
void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
@@ -154,7 +154,7 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
}
-Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
+ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
const std::string& queueName,
const std::string& key,
const framing::FieldTable& args)
@@ -170,18 +170,18 @@ Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::str
}
if (!exchange) {
- return Exchange010BoundResult(true, false, false, false, false);
+ return ExchangeBoundResult(true, false, false, false, false);
} else if (!queueName.empty() && !queue) {
- return Exchange010BoundResult(false, true, false, false, false);
+ return ExchangeBoundResult(false, true, false, false, false);
} else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
- return Exchange010BoundResult(false, false, false, false, false);
+ return ExchangeBoundResult(false, false, false, false, false);
} else {
//need to test each specified option individually
bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
- return Exchange010BoundResult(false, false, !queueMatched, !keyMatched, !argsMatched);
+ return ExchangeBoundResult(false, false, !queueMatched, !keyMatched, !argsMatched);
}
}
@@ -191,6 +191,11 @@ SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : Han
SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
{
+ destroyExclusiveQueues();
+}
+
+void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues()
+{
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
@@ -200,6 +205,7 @@ SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
exclusiveQueues.erase(exclusiveQueues.begin());
}
}
+
bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
{
@@ -207,12 +213,12 @@ bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
}
-Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
+QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
Queue::shared_ptr queue = getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
- return Queue010QueryResult(queue->getName(),
+ return QueueQueryResult(queue->getName(),
alternateExchange ? alternateExchange->getName() : "",
queue->isDurable(),
queue->hasExclusiveOwner(),
@@ -313,6 +319,7 @@ void SessionAdapter::MessageHandlerImpl::transfer(const string& /*destination*/,
uint8_t /*acquireMode*/)
{
//not yet used (content containing assemblies treated differently at present
+ std::cout << "SessionAdapter::MessageHandlerImpl::transfer() called" << std::endl;
}
void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, bool setRedelivered)
@@ -396,7 +403,7 @@ void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& comm
commands.for_each(acceptOp);
}
-framing::Message010AcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)
+framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)
{
//TODO: change this when SequenceNumberSet is deleted along with preview code
SequenceNumberSet results;
@@ -408,7 +415,7 @@ framing::Message010AcquireResult SessionAdapter::MessageHandlerImpl::acquire(con
RangedOperation g = boost::bind(&SequenceSet::add, &acquisitions, _1, _2);
results.processRanges(g);
- return Message010AcquireResult(acquisitions);
+ return MessageAcquireResult(acquisitions);
}
@@ -450,7 +457,7 @@ void SessionAdapter::TxHandlerImpl::rollback()
state.rollback();
}
-std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid010& xid)
+std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid& xid)
{
std::string encoded;
encode(xid, encoded);
@@ -462,7 +469,7 @@ void SessionAdapter::DtxHandlerImpl::select()
state.selectDtx();
}
-Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid,
+DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
bool fail,
bool suspend)
{
@@ -472,7 +479,7 @@ Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid,
if (suspend) {
throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
} else {
- return Dtx010EndResult(XA_RBROLLBACK);
+ return DtxEndResult(XA_RBROLLBACK);
}
} else {
if (suspend) {
@@ -480,14 +487,14 @@ Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid,
} else {
state.endDtx(convert(xid), false);
}
- return Dtx010EndResult(XA_OK);
+ return DtxEndResult(XA_OK);
}
} catch (const DtxTimeoutException& e) {
- return Dtx010EndResult(XA_RBTIMEOUT);
+ return DtxEndResult(XA_RBTIMEOUT);
}
}
-Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid,
+DtxStartResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
bool join,
bool resume)
{
@@ -500,45 +507,45 @@ Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid,
} else {
state.startDtx(convert(xid), getBroker().getDtxManager(), join);
}
- return Dtx010StartResult(XA_OK);
+ return DtxStartResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- return Dtx010StartResult(XA_RBTIMEOUT);
+ return DtxStartResult(XA_RBTIMEOUT);
}
}
-Dtx010PrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid010& xid)
+DtxPrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid)
{
try {
bool ok = getBroker().getDtxManager().prepare(convert(xid));
- return Dtx010PrepareResult(ok ? XA_OK : XA_RBROLLBACK);
+ return DtxPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- return Dtx010PrepareResult(XA_RBTIMEOUT);
+ return DtxPrepareResult(XA_RBTIMEOUT);
}
}
-Dtx010CommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid010& xid,
+DtxCommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid,
bool onePhase)
{
try {
bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
- return Dtx010CommitResult(ok ? XA_OK : XA_RBROLLBACK);
+ return DtxCommitResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- return Dtx010CommitResult(XA_RBTIMEOUT);
+ return DtxCommitResult(XA_RBTIMEOUT);
}
}
-Dtx010RollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid010& xid)
+DtxRollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid)
{
try {
getBroker().getDtxManager().rollback(convert(xid));
- return Dtx010RollbackResult(XA_OK);
+ return DtxRollbackResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- return Dtx010RollbackResult(XA_RBTIMEOUT);
+ return DtxRollbackResult(XA_RBTIMEOUT);
}
}
-Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover()
+DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover()
{
std::set<std::string> xids;
getBroker().getStore().collectPreparedXids(xids);
@@ -550,23 +557,23 @@ Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover()
boost::shared_ptr<FieldValue> xid(new Struct32Value(*i));
indoubt.add(xid);
}
- return Dtx010RecoverResult(indoubt);
+ return DtxRecoverResult(indoubt);
}
-void SessionAdapter::DtxHandlerImpl::forget(const Xid010& xid)
+void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid)
{
//Currently no heuristic completion is supported, so this should never be used.
throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!"));
}
-Dtx010GetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid010& xid)
+DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid)
{
uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid));
- return Dtx010GetTimeoutResult(timeout);
+ return DtxGetTimeoutResult(timeout);
}
-void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid010& xid,
+void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid,
u_int32_t timeout)
{
getBroker().getDtxManager().setTimeout(convert(xid), timeout);
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.h b/qpid/cpp/src/qpid/broker/SessionAdapter.h
index 0cbbd13777..a80e2b0776 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.h
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.h
@@ -83,6 +83,8 @@ class Queue;
Connection010Handler* getConnection010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+ void destroyExclusiveQueues() { queueImpl.destroyExclusiveQueues(); }
+
private:
//common base for utility methods etc that are specific to this adapter
struct HandlerHelper : public HandlerImpl
@@ -105,14 +107,14 @@ class Queue;
bool passive, bool durable, bool autoDelete,
const qpid::framing::FieldTable& arguments);
void delete_(const std::string& exchange, bool ifUnused);
- framing::Exchange010QueryResult query(const std::string& name);
+ framing::ExchangeQueryResult query(const std::string& name);
void bind(const std::string& queue,
const std::string& exchange, const std::string& routingKey,
const qpid::framing::FieldTable& arguments);
void unbind(const std::string& queue,
const std::string& exchange,
const std::string& routingKey);
- framing::Exchange010BoundResult bound(const std::string& exchange,
+ framing::ExchangeBoundResult bound(const std::string& exchange,
const std::string& queue,
const std::string& routingKey,
const framing::FieldTable& arguments);
@@ -141,8 +143,10 @@ class Queue;
void delete_(const std::string& queue,
bool ifUnused, bool ifEmpty);
void purge(const std::string& queue);
- framing::Queue010QueryResult query(const std::string& queue);
+ framing::QueueQueryResult query(const std::string& queue);
bool isLocal(const ConnectionToken* t) const;
+
+ void destroyExclusiveQueues();
};
class MessageHandlerImpl :
@@ -170,7 +174,7 @@ class Queue;
void release(const framing::SequenceSet& commands,
bool setRedelivered);
- framing::Message010AcquireResult acquire(const framing::SequenceSet&);
+ framing::MessageAcquireResult acquire(const framing::SequenceSet&);
void subscribe(const string& queue,
const string& destination,
@@ -225,35 +229,35 @@ class Queue;
class DtxHandlerImpl : public Dtx010Handler, public HandlerHelper, private framing::StructHelper
{
- std::string convert(const framing::Xid010& xid);
+ std::string convert(const framing::Xid& xid);
public:
DtxHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
void select();
- framing::Dtx010StartResult start(const framing::Xid010& xid,
+ framing::DtxStartResult start(const framing::Xid& xid,
bool join,
bool resume);
- framing::Dtx010EndResult end(const framing::Xid010& xid,
+ framing::DtxEndResult end(const framing::Xid& xid,
bool fail,
bool suspend);
- framing::Dtx010CommitResult commit(const framing::Xid010& xid,
+ framing::DtxCommitResult commit(const framing::Xid& xid,
bool onePhase);
- void forget(const framing::Xid010& xid);
+ void forget(const framing::Xid& xid);
- framing::Dtx010GetTimeoutResult getTimeout(const framing::Xid010& xid);
+ framing::DtxGetTimeoutResult getTimeout(const framing::Xid& xid);
- framing::Dtx010PrepareResult prepare(const framing::Xid010& xid);
+ framing::DtxPrepareResult prepare(const framing::Xid& xid);
- framing::Dtx010RecoverResult recover();
+ framing::DtxRecoverResult recover();
- framing::Dtx010RollbackResult rollback(const framing::Xid010& xid);
+ framing::DtxRollbackResult rollback(const framing::Xid& xid);
- void setTimeout(const framing::Xid010& xid, uint32_t timeout);
+ void setTimeout(const framing::Xid& xid, uint32_t timeout);
};
ExchangeHandlerImpl exchangeImpl;
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 3baa3a89a7..0b1e744e25 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -77,12 +77,6 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
}
-void SessionHandler::destroy() {
- ignoring=true; // Ignore trailing frames sent by client.
- session->detach();
- session.reset();
-}
-
void SessionHandler::handleOut(AMQFrame& f) {
channel.handle(f); // Send it.
if (session->sent(f))
@@ -160,12 +154,12 @@ void SessionHandler::detached(const std::string& name, uint8_t code)
void SessionHandler::requestTimeout(uint32_t t)
{
session->setTimeout(t);
- //proxy.timeout(t);
+ peerSession.timeout(t);
}
-void SessionHandler::timeout(uint32_t)
+void SessionHandler::timeout(uint32_t t)
{
- //not sure what we need to do on the server for this...
+ session->setTimeout(t);
}
void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset)
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index fa013a1c15..4b031f2951 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -70,7 +70,6 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
void localSuspend();
void detach() { localSuspend(); }
void sendCompletion();
- void destroy();
protected:
void handleIn(framing::AMQFrame&);
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 64d62934b9..3c6bed4344 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -25,6 +25,9 @@
#include "SemanticHandler.h"
#include "SessionManager.h"
#include "SessionHandler.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/ServerInvoker.h"
@@ -182,7 +185,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber&
completed.add(id);
if (!invocation.wasHandled()) {
- throw NotImplementedException("Not implemented");
+ throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
nextOut++;//execution result is now a command, so the counter must be incremented
getProxy().getExecution010().result(id, invocation.getResult());
@@ -206,6 +209,14 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
}
msgBuilder.handle(frame);
if (frame.getEof() && frame.getEos()) {//end of frameset
+ if (frame.getBof()) {
+ //i.e this is a just a command frame, add a dummy header
+ AMQFrame header;
+ header.setBody(AMQHeaderBody());
+ header.setBof(false);
+ header.setEof(false);
+ msg->getFrames().append(header);
+ }
msg->setPublisher(&getConnection());
semanticState.handle(msg);
msgBuilder.end();
@@ -242,13 +253,14 @@ void SessionState::handle(AMQFrame& frame)
SequenceNumber commandId;
try {
//TODO: make command handling more uniform, regardless of whether
- //commands carry content. (For now, assume all single frame
- //assemblies are non-content bearing and all content-bearing
- //assemblies will have more than one frame):
- if (frame.getBof() && frame.getEof()) {
- handleCommand(frame.getMethod(), commandId);
- } else {
+ //commands carry content.
+ AMQMethodBody* m = frame.getMethod();
+ if (m == 0 || m->isContentBearing()) {
handleContent(frame, commandId);
+ } else if (frame.getBof() && frame.getEof()) {
+ handleCommand(frame.getMethod(), commandId);
+ } else {
+ throw InternalErrorException("Cannot handle multi-frame command segments yet");
}
} catch(const SessionException& e) {
//TODO: better implementation of new exception handling mechanism
@@ -263,7 +275,11 @@ void SessionState::handle(AMQFrame& frame)
} else {
getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
}
- handler->destroy();
+ timeout = 0;
+ //The python client doesn't currently detach on receiving an exception
+ //so the session state isn't destroyed. This is a temporary workaround
+ //until that is addressed
+ adapter.destroyExclusiveQueues();
}
}
diff --git a/qpid/cpp/src/qpid/client/Channel.cpp b/qpid/cpp/src/qpid/client/Channel.cpp
index 4af69c8552..ae9f78483d 100644
--- a/qpid/cpp/src/qpid/client/Channel.cpp
+++ b/qpid/cpp/src/qpid/client/Channel.cpp
@@ -26,7 +26,6 @@
#include "Message.h"
#include "Connection.h"
#include "Demux.h"
-#include "FutureResponse.h"
#include "MessageListener.h"
#include "MessageQueue.h"
#include <boost/format.hpp>
@@ -47,9 +46,17 @@ const std::string empty;
class ScopedSync
{
Session& session;
+ const bool change;
+ const bool value;
public:
- ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); }
- ~ScopedSync() { session.setSynchronous(false); }
+ ScopedSync(Session& s, bool desired = true) : session(s), change(s.isSynchronous() != desired), value(desired)
+ {
+ if (change) session.setSynchronous(value);
+ }
+ ~ScopedSync()
+ {
+ if (change) session.setSynchronous(!value);
+ }
};
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
@@ -116,7 +123,7 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri
string e = exchange.getName();
string q = queue.getName();
ScopedSync s(session, synch);
- session.queueBind(0, q, e, key, args);
+ session.exchangeBind(q, e, key, args);
}
void Channel::commit(){
@@ -129,7 +136,7 @@ void Channel::rollback(){
void Channel::consume(
Queue& _queue, const std::string& tag, MessageListener* listener,
- AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
+ AckMode ackMode, bool noLocal, bool synch, FieldTable* fields) {
if (tag.empty()) {
throw Exception("A tag must be specified for a consumer.");
@@ -144,13 +151,18 @@ void Channel::consume(
c.ackMode = ackMode;
c.count = 0;
}
- uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1;
+ uint8_t confirmMode = ackMode == NO_ACK ? 1 : 0;
ScopedSync s(session, synch);
- session.messageSubscribe(0, _queue.getName(), tag, noLocal,
+ FieldTable ft;
+ FieldTable* ftptr = fields ? fields : &ft;
+ if (noLocal) {
+ ftptr->setString("qpid.no-local","yes");
+ }
+ session.messageSubscribe(_queue.getName(), tag,
confirmMode, 0/*pre-acquire*/,
- false, fields ? *fields : FieldTable());
+ false, "", 0, *ftptr);
if (!prefetch) {
- session.messageFlowMode(tag, 0/*credit based*/);
+ session.messageSetFlowMode(tag, 0/*credit based*/);
}
//allocate some credit:
@@ -177,17 +189,22 @@ bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
ScopedDivert handler(tag, session.getExecution().getDemux());
Demux::QueuePtr incoming = handler.getQueue();
- session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1));
+ session.messageSubscribe(destination=tag, queue=_queue.getName(), acceptMode=(ackMode == NO_ACK ? 1 : 0));
session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
session.messageFlow(tag, 0/*MESSAGES*/, 1);
- Completion status = session.messageFlush(tag);
- status.sync();
+ {
+ ScopedSync s(session);
+ session.messageFlush(tag);
+ }
session.messageCancel(tag);
FrameSet::shared_ptr p;
if (incoming->tryPop(p)) {
msg.populate(*p);
- if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true);
+ if (ackMode == AUTO_ACK) {
+ msg.setSession(session);
+ msg.acknowledge(false, true);
+ }
return true;
}
else
@@ -243,7 +260,7 @@ void Channel::dispatch(FrameSet& content, const std::string& destination)
bool send = i->second.ackMode == AUTO_ACK
|| (prefetch && ++(i->second.count) > (prefetch / 2));
if (send) i->second.count = 0;
- session.getExecution().completed(content.getId(), true, send);
+ session.getExecution().markCompleted(content.getId(), true, send);
}
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
diff --git a/qpid/cpp/src/qpid/client/Channel.h b/qpid/cpp/src/qpid/client/Channel.h
index 2cda97dc63..1c3c2c9ae8 100644
--- a/qpid/cpp/src/qpid/client/Channel.h
+++ b/qpid/cpp/src/qpid/client/Channel.h
@@ -256,7 +256,7 @@ class Channel : private sys::Runnable
void consume(
Queue& queue, const std::string& tag, MessageListener* listener,
AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const framing::FieldTable* fields = 0);
+ framing::FieldTable* fields = 0);
/**
* Cancels a subscription previously set up through a call to consume().
diff --git a/qpid/cpp/src/qpid/client/Completion.h b/qpid/cpp/src/qpid/client/Completion.h
index 4d324aaf28..19d5b31777 100644
--- a/qpid/cpp/src/qpid/client/Completion.h
+++ b/qpid/cpp/src/qpid/client/Completion.h
@@ -24,7 +24,7 @@
#include <boost/shared_ptr.hpp>
#include "Future.h"
-#include "SessionCore.h"
+#include "SessionImpl.h"
namespace qpid {
namespace client {
@@ -33,17 +33,12 @@ class Completion
{
protected:
Future future;
- shared_ptr<SessionCore> session;
+ shared_ptr<SessionImpl> session;
public:
Completion() {}
- Completion(Future f, shared_ptr<SessionCore> s) : future(f), session(s) {}
-
- void sync()
- {
- future.sync(*session);
- }
+ Completion(Future f, shared_ptr<SessionImpl> s) : future(f), session(s) {}
void wait()
{
@@ -53,10 +48,6 @@ public:
bool isComplete() {
return future.isComplete(*session);
}
-
- bool isCompleteUpTo() {
- return future.isCompleteUpTo(*session);
- }
};
}}
diff --git a/qpid/cpp/src/qpid/client/CompletionTracker.cpp b/qpid/cpp/src/qpid/client/CompletionTracker.cpp
deleted file mode 100644
index 76ea9dec51..0000000000
--- a/qpid/cpp/src/qpid/client/CompletionTracker.cpp
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "CompletionTracker.h"
-#include <algorithm>
-
-using qpid::client::CompletionTracker;
-using namespace qpid::framing;
-using namespace boost;
-
-namespace
-{
-const std::string empty;
-}
-
-CompletionTracker::CompletionTracker() : closed(false) {}
-CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {}
-
-void CompletionTracker::close()
-{
- sys::Mutex::ScopedLock l(lock);
- closed=true;
- while (!listeners.empty()) {
- Record r(listeners.front());
- {
- sys::Mutex::ScopedUnlock u(lock);
- r.completed();
- }
- listeners.pop_front();
- }
-}
-
-
-void CompletionTracker::completed(const SequenceNumber& _mark)
-{
- sys::Mutex::ScopedLock l(lock);
- mark = _mark;
- while (!listeners.empty() && !(listeners.front().id > mark)) {
- Record r(listeners.front());
- listeners.pop_front();
- {
- sys::Mutex::ScopedUnlock u(lock);
- r.completed();
- }
- }
-}
-
-void CompletionTracker::received(const SequenceNumber& id, const std::string& result)
-{
- sys::Mutex::ScopedLock l(lock);
- Listeners::iterator i = seek(id);
- if (i != listeners.end() && i->id == id) {
- i->received(result);
- listeners.erase(i);
- }
-}
-
-void CompletionTracker::listenForCompletion(const SequenceNumber& point, CompletionListener listener)
-{
- if (!add(Record(point, listener))) {
- listener();
- }
-}
-
-void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListener listener)
-{
- if (!add(Record(point, listener))) {
- listener(empty);
- }
-}
-
-bool CompletionTracker::add(const Record& record)
-{
- sys::Mutex::ScopedLock l(lock);
- if (record.id <= mark || closed) {
- return false;
- } else {
- //insert at the correct position
- Listeners::iterator i = seek(record.id);
- if (i == listeners.end()) i = listeners.begin();
- listeners.insert(i, record);
- return true;
- }
-}
-
-CompletionTracker::Listeners::iterator CompletionTracker::seek(const framing::SequenceNumber& point)
-{
- Listeners::iterator i = listeners.begin();
- while (i != listeners.end() && i->id < point) i++;
- return i;
-}
-
-
-void CompletionTracker::Record::completed()
-{
- if (f) f();
- else if(g) g(empty);//won't get a result if command is now complete
-}
-
-void CompletionTracker::Record::received(const std::string& result)
-{
- if (g) g(result);
-}
diff --git a/qpid/cpp/src/qpid/client/CompletionTracker.h b/qpid/cpp/src/qpid/client/CompletionTracker.h
deleted file mode 100644
index 55f7ff7531..0000000000
--- a/qpid/cpp/src/qpid/client/CompletionTracker.h
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <list>
-#include <boost/function.hpp>
-#include "qpid/framing/amqp_framing.h"
-#include "qpid/framing/SequenceNumber.h"
-#include "qpid/sys/Mutex.h"
-
-#ifndef _CompletionTracker_
-#define _CompletionTracker_
-
-namespace qpid {
-namespace client {
-
-class CompletionTracker
-{
-public:
- typedef boost::function<void()> CompletionListener;
- typedef boost::function<void(const std::string&)> ResultListener;
-
- CompletionTracker();
- CompletionTracker(const framing::SequenceNumber& mark);
- void completed(const framing::SequenceNumber& mark);
- void received(const framing::SequenceNumber& id, const std::string& result);
- void listenForCompletion(const framing::SequenceNumber& point, CompletionListener l);
- void listenForResult(const framing::SequenceNumber& point, ResultListener l);
- void close();
-
-private:
- struct Record
- {
- framing::SequenceNumber id;
- CompletionListener f;
- ResultListener g;
-
- Record(const framing::SequenceNumber& _id, CompletionListener l) : id(_id), f(l) {}
- Record(const framing::SequenceNumber& _id, ResultListener l) : id(_id), g(l) {}
- void completed();
- void received(const std::string& result);
-
- };
-
- typedef std::list<Record> Listeners;
- bool closed;
-
- sys::Mutex lock;
- framing::SequenceNumber mark;
- Listeners listeners;
-
- bool add(const Record& r);
- Listeners::iterator seek(const framing::SequenceNumber&);
-};
-
-}
-}
-
-
-#endif
diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp
index 872e04b3b5..25d1c510c8 100644
--- a/qpid/cpp/src/qpid/client/Connection.cpp
+++ b/qpid/cpp/src/qpid/client/Connection.cpp
@@ -25,7 +25,7 @@
#include "Connection.h"
#include "Channel.h"
#include "Message.h"
-#include "SessionCore.h"
+#include "SessionImpl.h"
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
@@ -76,8 +76,8 @@ void Connection::openChannel(Channel& channel) {
Session Connection::newSession(SynchronousMode sync,
uint32_t detachedLifetime)
{
- shared_ptr<SessionCore> core(
- new SessionCore(impl, ++channelIdCounter, max_frame_size));
+ shared_ptr<SessionImpl> core(
+ new SessionImpl(impl, ++channelIdCounter, max_frame_size));
core->setSync(sync);
impl->addSession(core);
core->open(detachedLifetime);
diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h
index 81d9b972b6..d24809b31e 100644
--- a/qpid/cpp/src/qpid/client/Connection.h
+++ b/qpid/cpp/src/qpid/client/Connection.h
@@ -137,7 +137,7 @@ class Connection
Session newSession(SynchronousMode sync, uint32_t detachedLifetime=0);
/**
- * Resume a suspendded session. A session may be resumed
+ * Resume a suspended session. A session may be resumed
* on a different connection to the one that created it.
*/
void resume(Session& session);
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
index e1c50c14fc..13de271e3b 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -24,6 +24,7 @@
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/all_method_bodies.h"
+#include "qpid/framing/ClientInvoker.h"
using namespace qpid::client;
using namespace qpid::framing;
@@ -31,14 +32,21 @@ using namespace boost;
namespace {
const std::string OK("OK");
+const std::string PLAIN("PLAIN");
+const std::string en_US("en_US");
+
+const std::string INVALID_STATE_START("start received in invalid state");
+const std::string INVALID_STATE_TUNE("tune received in invalid state");
+const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state");
+const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state");
}
ConnectionHandler::ConnectionHandler()
- : StateManager(NOT_STARTED)
+ : StateManager(NOT_STARTED), outHandler(*this), proxy(outHandler)
{
- mechanism = "PLAIN";
- locale = "en_US";
+ mechanism = PLAIN;
+ locale = en_US;
heartbeat = 0;
maxChannels = 32767;
maxFrameSize = 65535;
@@ -52,34 +60,29 @@ ConnectionHandler::ConnectionHandler()
void ConnectionHandler::incoming(AMQFrame& frame)
{
if (getState() == CLOSED) {
- throw Exception("Connection is closed.");
+ throw Exception("Received frame on closed connection");
}
+
AMQBody* body = frame.getBody();
- if (frame.getChannel() == 0) {
- if (body->getMethod()) {
- handle(body->getMethod());
- } else {
- error(503, "Cannot send content on channel zero.");
- }
- } else {
- switch(getState()) {
- case OPEN:
- try {
+ try {
+ if (frame.getChannel() != 0 || !invoke(static_cast<ConnectionOperations&>(*this), *body)) {
+ switch(getState()) {
+ case OPEN:
in(frame);
- }catch(ConnectionException& e){
- error(e.code, e.what(), body);
- }catch(std::exception& e){
- error(541/*internal error*/, e.what(), body);
+ break;
+ case CLOSING:
+ QPID_LOG(warning, "Ignoring frame while closing connection: " << frame);
+ break;
+ default:
+ throw Exception("Cannot receive frames on non-zero channel until connection is established.");
}
- break;
- case CLOSING:
- QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored.");
- break;
- default:
- //must be in connection initialisation:
- fail("Cannot receive frames on non-zero channel until connection is established.");
}
+ }catch(std::exception& e){
+ QPID_LOG(warning, "Closing connection due to " << e.what());
+ setState(CLOSING);
+ proxy.close(501, e.what());
+ if (onError) onError(501, e.what());
}
}
@@ -109,101 +112,77 @@ void ConnectionHandler::close()
break;
case OPEN:
setState(CLOSING);
- send(ConnectionCloseBody(version, 200, OK, 0, 0));
+ proxy.close(200, OK);
waitFor(CLOSED);
break;
// Nothing to do for CLOSING, CLOSED, FAILED or NOT_STARTED
}
}
-void ConnectionHandler::send(const framing::AMQBody& body)
+void ConnectionHandler::checkState(STATES s, const std::string& msg)
{
- AMQFrame f(body);
- out(f);
+ if (getState() != s) {
+ throw CommandInvalidException(msg);
+ }
}
-void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
+void ConnectionHandler::fail(const std::string& message)
{
- setState(CLOSING);
- send(ConnectionCloseBody(version, code, message, classId, methodId));
+ QPID_LOG(warning, message);
+ setState(FAILED);
}
-void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body)
+void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& /*mechanisms*/, const Array& /*locales*/)
{
- if (onError)
- onError(code, message);
- AMQMethodBody* method = body->getMethod();
- if (method)
- error(code, message, method->amqpClassId(), method->amqpMethodId());
- else
- error(code, message);
+ checkState(NOT_STARTED, INVALID_STATE_START);
+ setState(NEGOTIATING);
+ //TODO: verify that desired mechanism and locale are supported
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ proxy.startOk(properties, mechanism, response, locale);
}
+void ConnectionHandler::secure(const std::string& /*challenge*/)
+{
+ throw NotImplementedException("Challenge-response cycle not yet implemented in client");
+}
-void ConnectionHandler::fail(const std::string& message)
+void ConnectionHandler::tune(uint16_t channelMax, uint16_t /*frameMax*/, uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/)
{
- QPID_LOG(warning, message);
- setState(FAILED);
+ checkState(NEGOTIATING, INVALID_STATE_TUNE);
+ //TODO: verify that desired heartbeat and max frame size are valid
+ maxChannels = channelMax;
+ proxy.tuneOk(maxChannels, maxFrameSize, heartbeat);
+ setState(OPENING);
+ proxy.open(vhost, capabilities, insist);
}
-void ConnectionHandler::handle(AMQMethodBody* method)
+void ConnectionHandler::openOk(const framing::Array& /*knownHosts*/)
{
- switch (getState()) {
- case NOT_STARTED:
- if (method->isA<ConnectionStartBody>()) {
- setState(NEGOTIATING);
- string response = ((char)0) + uid + ((char)0) + pwd;
- send(ConnectionStartOkBody(version, properties, mechanism, response, locale));
- } else {
- fail("Bad method sequence, expected connection-start.");
- }
- break;
- case NEGOTIATING:
- if (method->isA<ConnectionTuneBody>()) {
- ConnectionTuneBody* proposal=polymorphic_downcast<ConnectionTuneBody*>(method);
- heartbeat = proposal->getHeartbeat();
- maxChannels = proposal->getChannelMax();
- send(ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat));
- setState(OPENING);
- send(ConnectionOpenBody(version, vhost, capabilities, insist));
- //TODO: support for further security challenges
- //} else if (method->isA<ConnectionSecureBody>()) {
- } else {
- fail("Unexpected method sequence, expected connection-tune.");
- }
- break;
- case OPENING:
- if (method->isA<ConnectionOpenOkBody>()) {
- setState(OPEN);
- //TODO: support for redirection
- //} else if (method->isA<ConnectionRedirectBody>()) {
- } else {
- fail("Unexpected method sequence, expected connection-open-ok.");
- }
- break;
- case OPEN:
- if (method->isA<ConnectionCloseBody>()) {
- send(ConnectionCloseOkBody(version));
- setState(CLOSED);
- ConnectionCloseBody* c=polymorphic_downcast<ConnectionCloseBody*>(method);
- QPID_LOG(warning, "Broker closed connection: " << c->getReplyCode()
- << ", " << c->getReplyText());
- if (onError) {
- onError(c->getReplyCode(), c->getReplyText());
- }
- } else {
- error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId());
- }
- break;
- case CLOSING:
- if (method->isA<ConnectionCloseOkBody>()) {
- if (onClose) {
- onClose();
- }
- setState(CLOSED);
- } else {
- QPID_LOG(warning, "Received frame on channel zero while closing connection; frame ignored.");
- }
- break;
+ checkState(OPENING, INVALID_STATE_OPEN_OK);
+ //TODO: store knownHosts for reconnection etc
+ setState(OPEN);
+}
+
+void ConnectionHandler::redirect(const std::string& /*host*/, const Array& /*knownHosts*/)
+{
+ throw NotImplementedException("Redirection received from broker; not yet implemented in client");
+}
+
+void ConnectionHandler::close(uint16_t replyCode, const std::string& replyText)
+{
+ proxy.closeOk();
+ setState(CLOSED);
+ QPID_LOG(warning, "Broker closed connection: " << replyCode << ", " << replyText);
+ if (onError) {
+ onError(replyCode, replyText);
+ }
+}
+
+void ConnectionHandler::closeOk()
+{
+ checkState(CLOSING, INVALID_STATE_CLOSE_OK);
+ if (onClose) {
+ onClose();
}
+ setState(CLOSED);
}
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.h b/qpid/cpp/src/qpid/client/ConnectionHandler.h
index bb50495c06..b298b02701 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.h
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.h
@@ -21,12 +21,16 @@
#ifndef _ConnectionHandler_
#define _ConnectionHandler_
+#include "ChainableFrameHandler.h"
#include "Connector.h"
#include "StateManager.h"
-#include "ChainableFrameHandler.h"
-#include "qpid/framing/InputHandler.h"
-#include "qpid/framing/FieldTable.h"
#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/framing/Array.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/InputHandler.h"
namespace qpid {
namespace client {
@@ -39,7 +43,7 @@ struct ConnectionProperties
framing::FieldTable properties;
std::string mechanism;
std::string locale;
- std::string capabilities;
+ framing::Array capabilities;
uint16_t heartbeat;
uint16_t maxChannels;
uint64_t maxFrameSize;
@@ -48,17 +52,42 @@ struct ConnectionProperties
};
class ConnectionHandler : private StateManager,
- public ConnectionProperties,
- public ChainableFrameHandler,
- public framing::InputHandler
+ public ConnectionProperties,
+ public ChainableFrameHandler,
+ public framing::InputHandler,
+ private framing::AMQP_ClientOperations::Connection010Handler
{
+ typedef framing::AMQP_ClientOperations::Connection010Handler ConnectionOperations;
enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
std::set<int> ESTABLISHED;
- void handle(framing::AMQMethodBody* method);
- void send(const framing::AMQBody& body);
- void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0);
- void error(uint16_t code, const std::string& message, framing::AMQBody* body);
+ class Adapter : public framing::FrameHandler
+ {
+ ConnectionHandler& handler;
+ public:
+ Adapter(ConnectionHandler& h) : handler(h) {}
+ void handle(framing::AMQFrame& f) { handler.out(f); }
+ };
+
+ Adapter outHandler;
+ framing::AMQP_ServerProxy::Connection010 proxy;
+
+ void checkState(STATES s, const std::string& msg);
+
+ //methods corresponding to connection controls:
+ void start(const framing::FieldTable& serverProperties,
+ const framing::Array& mechanisms,
+ const framing::Array& locales);
+ void secure(const std::string& challenge);
+ void tune(uint16_t channelMax,
+ uint16_t frameMax,
+ uint16_t heartbeatMin,
+ uint16_t heartbeatMax);
+ void openOk(const framing::Array& knownHosts);
+ void redirect(const std::string& host,
+ const framing::Array& knownHosts);
+ void close(uint16_t replyCode, const std::string& replyText);
+ void closeOk();
public:
using InputHandler::handle;
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index b248de8744..d1fd66ff26 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -23,7 +23,7 @@
#include "qpid/framing/reply_exceptions.h"
#include "ConnectionImpl.h"
-#include "SessionCore.h"
+#include "SessionImpl.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -32,6 +32,7 @@ using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
+
ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
: connector(c), isClosed(false), isClosing(false)
{
@@ -52,10 +53,10 @@ ConnectionImpl::~ConnectionImpl() {
connector->close();
}
-void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session)
+void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session)
{
Mutex::ScopedLock l(lock);
- boost::weak_ptr<SessionCore>& s = sessions[session->getChannel()];
+ boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()];
if (s.lock()) throw ChannelBusyException();
s = session;
}
@@ -67,7 +68,7 @@ void ConnectionImpl::handle(framing::AMQFrame& frame)
void ConnectionImpl::incoming(framing::AMQFrame& frame)
{
- boost::shared_ptr<SessionCore> s;
+ boost::shared_ptr<SessionImpl> s;
{
Mutex::ScopedLock l(lock);
s = sessions[frame.getChannel()].lock();
@@ -122,7 +123,7 @@ ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedL
connector->close();
SessionVector save;
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
- boost::shared_ptr<SessionCore> s = i->second.lock();
+ boost::shared_ptr<SessionImpl> s = i->second.lock();
if (s) save.push_back(s);
}
sessions.clear();
@@ -135,7 +136,7 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text)
if (isClosed) return;
SessionVector save(closeInternal(l));
Mutex::ScopedUnlock u(lock);
- std::for_each(save.begin(), save.end(), boost::bind(&SessionCore::connectionClosed, _1, code, text));
+ std::for_each(save.begin(), save.end(), boost::bind(&SessionImpl::connectionClosed, _1, code, text));
}
static const std::string CONN_CLOSED("Connection closed by broker");
@@ -148,7 +149,7 @@ void ConnectionImpl::shutdown()
handler.fail(CONN_CLOSED);
Mutex::ScopedUnlock u(lock);
std::for_each(save.begin(), save.end(),
- boost::bind(&SessionCore::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED));
+ boost::bind(&SessionImpl::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED));
}
void ConnectionImpl::erase(uint16_t ch) {
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.h b/qpid/cpp/src/qpid/client/ConnectionImpl.h
index bf8226a776..d0df9238f2 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.h
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.h
@@ -35,15 +35,15 @@
namespace qpid {
namespace client {
-class SessionCore;
+class SessionImpl;
class ConnectionImpl : public framing::FrameHandler,
public sys::TimeoutHandler,
public sys::ShutdownHandler
{
- typedef std::map<uint16_t, boost::weak_ptr<SessionCore> > SessionMap;
- typedef std::vector<boost::shared_ptr<SessionCore> > SessionVector;
+ typedef std::map<uint16_t, boost::weak_ptr<SessionImpl> > SessionMap;
+ typedef std::vector<boost::shared_ptr<SessionImpl> > SessionVector;
SessionMap sessions;
ConnectionHandler handler;
@@ -69,7 +69,7 @@ class ConnectionImpl : public framing::FrameHandler,
ConnectionImpl(boost::shared_ptr<Connector> c);
~ConnectionImpl();
- void addSession(const boost::shared_ptr<SessionCore>&);
+ void addSession(const boost::shared_ptr<SessionImpl>&);
void open(const std::string& host, int port = 5672,
const std::string& uid = "guest",
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp
index 11aff6184b..7fb4997f5a 100644
--- a/qpid/cpp/src/qpid/client/Connector.cpp
+++ b/qpid/cpp/src/qpid/client/Connector.cpp
@@ -46,6 +46,7 @@ Connector::Connector(
receive_buffer_size(buffer_size),
send_buffer_size(buffer_size),
version(ver),
+ initiated(false),
closed(true),
joined(true),
timeout(0),
@@ -240,6 +241,14 @@ void Connector::Writer::write(sys::AsynchIO&) {
void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+ if (!initiated) {
+ framing::ProtocolInitiation protocolInit;
+ if (protocolInit.decode(in)) {
+ //TODO: check the version is correct
+ QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
+ }
+ initiated = true;
+ }
AMQFrame frame;
while(frame.decode(in)){
QPID_LOG(trace, "RECV " << identifier << ": " << frame);
diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h
index 78aad0b60a..366f82acbd 100644
--- a/qpid/cpp/src/qpid/client/Connector.h
+++ b/qpid/cpp/src/qpid/client/Connector.h
@@ -77,8 +77,9 @@ class Connector : public framing::OutputHandler,
const int receive_buffer_size;
const int send_buffer_size;
framing::ProtocolVersion version;
+ bool initiated;
- sys::Mutex closedLock;
+ sys::Mutex closedLock;
bool closed;
bool joined;
diff --git a/qpid/cpp/src/qpid/client/Correlator.cpp b/qpid/cpp/src/qpid/client/Correlator.cpp
deleted file mode 100644
index f30c92b992..0000000000
--- a/qpid/cpp/src/qpid/client/Correlator.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Correlator.h"
-
-using qpid::client::Correlator;
-using namespace qpid::framing;
-using namespace boost;
-
-bool Correlator::receive(const AMQMethodBody* response)
-{
- if (listeners.empty()) {
- return false;
- } else {
- Listener l = listeners.front();
- if (l) l(response);
- listeners.pop();
- return true;
- }
-}
-
-void Correlator::listen(Listener l)
-{
- listeners.push(l);
-}
-
-
diff --git a/qpid/cpp/src/qpid/client/Correlator.h b/qpid/cpp/src/qpid/client/Correlator.h
deleted file mode 100644
index 45b22fb2fe..0000000000
--- a/qpid/cpp/src/qpid/client/Correlator.h
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <memory>
-#include <queue>
-#include <set>
-#include <boost/function.hpp>
-#include "qpid/framing/AMQMethodBody.h"
-#include "qpid/sys/Monitor.h"
-
-#ifndef _Correlator_
-#define _Correlator_
-
-namespace qpid {
-namespace client {
-
-
-class Correlator
-{
-public:
- typedef boost::function<void(const framing::AMQMethodBody*)> Listener;
-
- bool receive(const framing::AMQMethodBody*);
- void listen(Listener l);
-
-private:
- std::queue<Listener> listeners;
-};
-
-}
-}
-
-
-#endif
diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp
index 2484dabf1f..cc67701748 100644
--- a/qpid/cpp/src/qpid/client/Dispatcher.cpp
+++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp
@@ -84,7 +84,7 @@ void Dispatcher::run()
if (handler.get()) {
handler->handle(*content);
} else {
- QPID_LOG(error, "No handler found for " << *(content->getMethod()));
+ QPID_LOG(warning, "No handler found for " << *(content->getMethod()));
}
}
}
diff --git a/qpid/cpp/src/qpid/client/Execution.h b/qpid/cpp/src/qpid/client/Execution.h
index 5f717de586..e4b2db23e1 100644
--- a/qpid/cpp/src/qpid/client/Execution.h
+++ b/qpid/cpp/src/qpid/client/Execution.h
@@ -28,21 +28,27 @@ namespace qpid {
namespace client {
/**
- * Provides more detailed access to the amqp 'execution layer'.
+ * Provides access to more detailed aspects of the session
+ * implementation.
*/
class Execution
{
public:
virtual ~Execution() {}
- virtual void sendSyncRequest() = 0;
- virtual void sendFlushRequest() = 0;
- virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0;
+ /**
+ * Mark the incoming command with the specified id as completed
+ */
+ virtual void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer) = 0;
+ /**
+ * Provides access to the demultiplexing function within the
+ * session implementation
+ */
virtual Demux& getDemux() = 0;
- virtual bool isComplete(const framing::SequenceNumber& id) = 0;
- virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0;
- virtual void setCompletionListener(boost::function<void()>) = 0;
- virtual void syncWait(const framing::SequenceNumber& id) = 0;
- virtual framing::SequenceNumber lastSent() const = 0;
+ /**
+ * Wait until notification has been received of completion of the
+ * outgoing command with the specified id.
+ */
+ void waitForCompletion(const framing::SequenceNumber& id);
};
}}
diff --git a/qpid/cpp/src/qpid/client/ExecutionHandler.cpp b/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
deleted file mode 100644
index afdd13c9e9..0000000000
--- a/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ExecutionHandler.h"
-#include "qpid/Exception.h"
-#include "qpid/framing/BasicDeliverBody.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/framing/all_method_bodies.h"
-#include "qpid/framing/ServerInvoker.h"
-#include "qpid/client/FutureCompletion.h"
-#include <boost/bind.hpp>
-
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace boost;
-using qpid::sys::Mutex;
-
-bool isMessageMethod(AMQMethodBody* method)
-{
- return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>();
-}
-
-bool isMessageMethod(AMQBody* body)
-{
- AMQMethodBody* method=body->getMethod();
- return method && isMessageMethod(method);
-}
-
-bool isContentFrame(AMQFrame& frame)
-{
- AMQBody* body = frame.getBody();
- uint8_t type = body->type();
- return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body);
-}
-
-ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) :
- version(framing::highestProtocolVersion), maxFrameSize(_maxFrameSize) {}
-
-//incoming:
-void ExecutionHandler::handle(AMQFrame& frame)
-{
- if (!invoke(*this, *frame.getBody())) {
- if (!arriving) {
- arriving = FrameSet::shared_ptr(new FrameSet(++incomingCounter));
- }
- arriving->append(frame);
- if (arriving->isComplete()) {
- if (arriving->isContentBearing() || !correlation.receive(arriving->getMethod())) {
- demux.handle(arriving);
- }
- arriving.reset();
- }
- }
-}
-
-void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
-{
- if (range.size() % 2) { //must be even number
- throw NotAllowedException(QPID_MSG("Received odd number of elements in ranged mark"));
- } else {
- SequenceNumber mark(cumulative);
- {
- Mutex::ScopedLock l(lock);
- outgoingCompletionStatus.update(mark, range);
- }
- if (completionListener) completionListener();
- completion.completed(outgoingCompletionStatus.mark);
- //TODO: signal listeners of early notification?
- }
-}
-
-void ExecutionHandler::flush()
-{
- sendCompletion();
-}
-
-void ExecutionHandler::noop()
-{
- //do nothing
-}
-
-void ExecutionHandler::result(uint32_t command, const std::string& data)
-{
- completion.received(command, data);
-}
-
-void ExecutionHandler::sync()
-{
- //TODO: implement - need to note the mark requested and then
- //remember to send a response when that point is reached
-}
-
-void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
-{
- Mutex::ScopedLock l(lock);
- if (point > outgoingCompletionStatus.mark) {
- sendFlushRequest();
- }
-}
-
-void ExecutionHandler::sendFlushRequest()
-{
- Mutex::ScopedLock l(lock);
- AMQFrame frame(in_place<ExecutionFlushBody>());
- out(frame);
-}
-
-void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
-{
- Mutex::ScopedLock l(lock);
- if (point > outgoingCompletionStatus.mark) {
- sendSyncRequest();
- }
-}
-
-
-void ExecutionHandler::sendSyncRequest()
-{
- Mutex::ScopedLock l(lock);
- AMQFrame frame(in_place<ExecutionSyncBody>());
- out(frame);
-}
-
-void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send)
-{
- {
- Mutex::ScopedLock l(lock);
- if (id > incomingCompletionStatus.mark) {
- if (cumulative) {
- incomingCompletionStatus.update(incomingCompletionStatus.mark, id);
- } else {
- incomingCompletionStatus.update(id, id);
- }
- }
- }
- if (send) {
- sendCompletion();
- }
-}
-
-
-void ExecutionHandler::sendCompletion()
-{
- Mutex::ScopedLock l(lock);
- SequenceNumberSet range;
- incomingCompletionStatus.collectRanges(range);
- AMQFrame frame(
- in_place<ExecutionCompleteBody>(
- version, incomingCompletionStatus.mark.getValue(), range));
- out(frame);
-}
-
-SequenceNumber ExecutionHandler::lastSent() const { return outgoingCounter; }
-
-SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener)
-{
- Mutex::ScopedLock l(lock);
- return send(command, listener, false);
-}
-
-SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent)
-{
- SequenceNumber id = ++outgoingCounter;
- if(l) {
- completion.listenForResult(id, l);
- }
- AMQFrame frame(command);
- if (hasContent) {
- frame.setEof(false);
- }
- out(frame);
- return id;
-}
-
-SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content,
- CompletionTracker::ResultListener listener)
-{
- Mutex::ScopedLock l(lock);
- SequenceNumber id = send(command, listener, true);
- sendContent(content);
- return id;
-}
-
-void ExecutionHandler::sendContent(const MethodContent& content)
-{
- AMQFrame header(content.getHeader());
- header.setBof(false);
- u_int64_t data_length = content.getData().length();
- if(data_length > 0){
- header.setEof(false);
- out(header);
- const u_int32_t frag_size = maxFrameSize - (AMQFrame::frameOverhead() - 1 /*end of frame marker included in overhead but not in size*/);
- if(data_length < frag_size){
- AMQFrame frame(in_place<AMQContentBody>(content.getData()));
- frame.setBof(false);
- out(frame);
- }else{
- u_int32_t offset = 0;
- u_int32_t remaining = data_length - offset;
- while (remaining > 0) {
- u_int32_t length = remaining > frag_size ? frag_size : remaining;
- string frag(content.getData().substr(offset, length));
- AMQFrame frame(in_place<AMQContentBody>(frag));
- frame.setBof(false);
- if (offset > 0) {
- frame.setBos(false);
- }
- offset += length;
- remaining = data_length - offset;
- if (remaining) {
- frame.setEos(false);
- frame.setEof(false);
- }
- out(frame);
- }
- }
- } else {
- out(header);
- }
-}
-
-bool ExecutionHandler::isComplete(const SequenceNumber& id)
-{
- Mutex::ScopedLock l(lock);
- return outgoingCompletionStatus.covers(id);
-}
-
-bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id)
-{
- Mutex::ScopedLock l(lock);
- return outgoingCompletionStatus.mark >= id;
-}
-
-void ExecutionHandler::setCompletionListener(boost::function<void()> l)
-{
- completionListener = l;
-}
-
-
-void ExecutionHandler::syncWait(const SequenceNumber& id) {
- syncTo(id);
- FutureCompletion fc;
- completion.listenForCompletion(
- id, boost::bind(&FutureCompletion::completed, &fc)
- );
- fc.waitForCompletion();
- assert(isCompleteUpTo(id));
-}
diff --git a/qpid/cpp/src/qpid/client/ExecutionHandler.h b/qpid/cpp/src/qpid/client/ExecutionHandler.h
deleted file mode 100644
index d9113b683b..0000000000
--- a/qpid/cpp/src/qpid/client/ExecutionHandler.h
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _ExecutionHandler_
-#define _ExecutionHandler_
-
-#include <queue>
-#include "qpid/framing/AccumulatedAck.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/MethodContent.h"
-#include "qpid/framing/SequenceNumber.h"
-#include "qpid/sys/Mutex.h"
-#include "ChainableFrameHandler.h"
-#include "CompletionTracker.h"
-#include "Correlator.h"
-#include "Demux.h"
-#include "Execution.h"
-
-namespace qpid {
-namespace client {
-
-class ExecutionHandler :
- public framing::AMQP_ServerOperations::ExecutionHandler,
- public framing::FrameHandler,
- public Execution
-{
- framing::SequenceNumber incomingCounter;
- framing::AccumulatedAck incomingCompletionStatus;
- framing::SequenceNumber outgoingCounter;
- framing::AccumulatedAck outgoingCompletionStatus;
- framing::FrameSet::shared_ptr arriving;
- Correlator correlation;
- CompletionTracker completion;
- Demux demux;
- sys::Mutex lock;
- framing::ProtocolVersion version;
- uint64_t maxFrameSize;
- boost::function<void()> completionListener;
-
- void complete(uint32_t mark, const framing::SequenceNumberSet& range);
- void flush();
- void noop();
- void result(uint32_t command, const std::string& data);
- void sync();
-
- void sendCompletion();
-
- framing::SequenceNumber send(const framing::AMQBody&, CompletionTracker::ResultListener, bool hasContent);
- void sendContent(const framing::MethodContent&);
-
-public:
- typedef CompletionTracker::ResultListener ResultListener;
-
- // Allow other classes to set the out handler.
- framing::FrameHandler::Chain out;
-
- ExecutionHandler(uint64_t maxFrameSize = 65535);
-
- // Incoming handler.
- void handle(framing::AMQFrame& frame);
-
- framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener());
- framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content,
- ResultListener=ResultListener());
- framing::SequenceNumber lastSent() const;
- void sendSyncRequest();
- void sendFlushRequest();
- void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
- void syncTo(const framing::SequenceNumber& point);
- void flushTo(const framing::SequenceNumber& point);
- void syncWait(const framing::SequenceNumber& id);
-
- bool isComplete(const framing::SequenceNumber& id);
- bool isCompleteUpTo(const framing::SequenceNumber& id);
-
- void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
- Correlator& getCorrelator() { return correlation; }
- CompletionTracker& getCompletionTracker() { return completion; }
- Demux& getDemux() { return demux; }
-
- void setCompletionListener(boost::function<void()>);
-};
-
-}}
-
-#endif
diff --git a/qpid/cpp/src/qpid/client/FutureResponse.h b/qpid/cpp/src/qpid/client/Future.cpp
index 534ca01bb7..6a0c78ae4b 100644
--- a/qpid/cpp/src/qpid/client/FutureResponse.h
+++ b/qpid/cpp/src/qpid/client/Future.cpp
@@ -19,28 +19,27 @@
*
*/
-#ifndef _FutureResponse_
-#define _FutureResponse_
-
-#include "qpid/framing/amqp_framing.h"
-#include "qpid/framing/BodyHolder.h"
-#include "FutureCompletion.h"
+#include "Future.h"
namespace qpid {
namespace client {
-class SessionCore;
-
-class FutureResponse : public FutureCompletion
+void Future::wait(SessionImpl& session)
{
- framing::BodyHolder response;
-public:
- framing::AMQMethodBody* getResponse(SessionCore& session);
- void received(const framing::AMQMethodBody* response);
-};
-
-}}
+ if (!complete) {
+ session.waitForCompletion(command);
+ }
+ complete = true;
+}
+bool Future::isComplete(SessionImpl& session)
+{
+ return complete || session.isComplete(command);
+}
+void Future::setFutureResult(boost::shared_ptr<FutureResult> r)
+{
+ result = r;
+}
-#endif
+}}
diff --git a/qpid/cpp/src/qpid/client/Future.h b/qpid/cpp/src/qpid/client/Future.h
index d07f9f149c..faf68c9104 100644
--- a/qpid/cpp/src/qpid/client/Future.h
+++ b/qpid/cpp/src/qpid/client/Future.h
@@ -28,9 +28,8 @@
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/StructHelper.h"
#include "FutureCompletion.h"
-#include "FutureResponse.h"
#include "FutureResult.h"
-#include "SessionCore.h"
+#include "SessionImpl.h"
namespace qpid {
namespace client {
@@ -38,7 +37,6 @@ namespace client {
class Future : private framing::StructHelper
{
framing::SequenceNumber command;
- boost::shared_ptr<FutureResponse> response;
boost::shared_ptr<FutureResult> result;
bool complete;
@@ -46,42 +44,7 @@ public:
Future() : complete(false) {}
Future(const framing::SequenceNumber& id) : command(id), complete(false) {}
- void sync(SessionCore& session)
- {
- if (!isComplete(session)) {
- session.getExecution().syncTo(command);
- wait(session);
- }
- }
-
- void wait(SessionCore& session)
- {
- if (!isComplete(session)) {
- FutureCompletion callback;
- session.getExecution().getCompletionTracker().listenForCompletion(
- command,
- boost::bind(&FutureCompletion::completed, &callback)
- );
- callback.waitForCompletion();
- session.assertOpen();
- complete = true;
- }
- }
-
- framing::AMQMethodBody* getResponse(SessionCore& session)
- {
- if (response) {
- session.getExecution().getCompletionTracker().listenForCompletion(
- command,
- boost::bind(&FutureResponse::completed, response)
- );
- return response->getResponse(session);
- } else {
- throw Exception("Response not expected");
- }
- }
-
- template <class T> void decodeResult(T& value, SessionCore& session)
+ template <class T> void decodeResult(T& value, SessionImpl& session)
{
if (result) {
decode(value, result->getResult(session));
@@ -90,17 +53,9 @@ public:
}
}
- bool isComplete(SessionCore& session) {
- return complete || session.getExecution().isComplete(command);
- }
-
- bool isCompleteUpTo(SessionCore& session) {
- return complete || session.getExecution().isCompleteUpTo(command);
- }
-
- void setCommandId(const framing::SequenceNumber& id) { command = id; }
- void setFutureResponse(boost::shared_ptr<FutureResponse> r) { response = r; }
- void setFutureResult(boost::shared_ptr<FutureResult> r) { result = r; }
+ void wait(SessionImpl& session);
+ bool isComplete(SessionImpl& session);
+ void setFutureResult(boost::shared_ptr<FutureResult> r);
};
}}
diff --git a/qpid/cpp/src/qpid/client/FutureFactory.cpp b/qpid/cpp/src/qpid/client/FutureFactory.cpp
deleted file mode 100644
index 7f9d51e77f..0000000000
--- a/qpid/cpp/src/qpid/client/FutureFactory.cpp
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "FutureFactory.h"
-
-using namespace qpid::client;
-using namespace boost;
-
-shared_ptr<FutureCompletion> FutureFactory::createCompletion()
-{
- shared_ptr<FutureCompletion> f(new FutureCompletion());
- weak_ptr<FutureCompletion> w(f);
- set.push_back(w);
- return f;
-}
-
-shared_ptr<FutureResponse> FutureFactory::createResponse()
-{
- shared_ptr<FutureResponse> f(new FutureResponse());
- weak_ptr<FutureCompletion> w(static_pointer_cast<FutureCompletion>(f));
- set.push_back(w);
- return f;
-}
-
-void FutureFactory::close(uint16_t code, const std::string& text)
-{
- for (WeakPtrSet::iterator i = set.begin(); i != set.end(); i++) {
- shared_ptr<FutureCompletion> p = i->lock();
- if (p) {
- p->close(code, text);
- }
- }
-}
diff --git a/qpid/cpp/src/qpid/client/FutureFactory.h b/qpid/cpp/src/qpid/client/FutureFactory.h
deleted file mode 100644
index b126e296fd..0000000000
--- a/qpid/cpp/src/qpid/client/FutureFactory.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#ifndef _FutureFactory_
-#define _FutureFactory_
-
-#include <vector>
-#include <boost/shared_ptr.hpp>
-#include <boost/weak_ptr.hpp>
-#include "FutureCompletion.h"
-#include "FutureResponse.h"
-
-namespace qpid {
-namespace client {
-
-class FutureFactory
-{
- typedef std::vector< boost::weak_ptr<FutureCompletion> > WeakPtrSet;
- WeakPtrSet set;
-
-public:
- boost::shared_ptr<FutureCompletion> createCompletion();
- boost::shared_ptr<FutureResponse> createResponse();
- void close(uint16_t code, const std::string& text);
-};
-
-}}
-
-
-#endif
diff --git a/qpid/cpp/src/qpid/client/FutureResponse.cpp b/qpid/cpp/src/qpid/client/FutureResponse.cpp
deleted file mode 100644
index 32d99531fa..0000000000
--- a/qpid/cpp/src/qpid/client/FutureResponse.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "FutureResponse.h"
-
-#include "SessionCore.h"
-
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-
-AMQMethodBody* FutureResponse::getResponse(SessionCore& session)
-{
- waitForCompletion();
- session.assertOpen();
- return response.getMethod();
-}
-
-void FutureResponse::received(const AMQMethodBody* r)
-{
- Monitor::ScopedLock l(lock);
- response.setBody(*r);
- complete = true;
- lock.notifyAll();
-}
-
diff --git a/qpid/cpp/src/qpid/client/FutureResult.cpp b/qpid/cpp/src/qpid/client/FutureResult.cpp
index 681202edea..007f278051 100644
--- a/qpid/cpp/src/qpid/client/FutureResult.cpp
+++ b/qpid/cpp/src/qpid/client/FutureResult.cpp
@@ -21,13 +21,13 @@
#include "FutureResult.h"
-#include "SessionCore.h"
+#include "SessionImpl.h"
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
-const std::string& FutureResult::getResult(SessionCore& session) const
+const std::string& FutureResult::getResult(SessionImpl& session) const
{
waitForCompletion();
session.assertOpen();
diff --git a/qpid/cpp/src/qpid/client/FutureResult.h b/qpid/cpp/src/qpid/client/FutureResult.h
index 3117b63802..f889706493 100644
--- a/qpid/cpp/src/qpid/client/FutureResult.h
+++ b/qpid/cpp/src/qpid/client/FutureResult.h
@@ -29,13 +29,13 @@
namespace qpid {
namespace client {
-class SessionCore;
+class SessionImpl;
class FutureResult : public FutureCompletion
{
std::string result;
public:
- const std::string& getResult(SessionCore& session) const;
+ const std::string& getResult(SessionImpl& session) const;
void received(const std::string& result);
};
diff --git a/qpid/cpp/src/qpid/client/Message.cpp b/qpid/cpp/src/qpid/client/Message.cpp
new file mode 100644
index 0000000000..3d4b9da9fa
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/Message.cpp
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Message.h"
+
+namespace qpid {
+namespace client {
+
+ Message::Message(const std::string& data_,
+ const std::string& routingKey,
+ const std::string& exchange) : TransferContent(data_, routingKey, exchange) {}
+
+ std::string Message::getDestination() const
+ {
+ return method.getDestination();
+ }
+
+ bool Message::isRedelivered() const
+ {
+ return hasDeliveryProperties() && getDeliveryProperties().getRedelivered();
+ }
+
+ void Message::setRedelivered(bool redelivered)
+ {
+ getDeliveryProperties().setRedelivered(redelivered);
+ }
+
+ framing::FieldTable& Message::getHeaders()
+ {
+ return getMessageProperties().getApplicationHeaders();
+ }
+
+ void Message::acknowledge(bool cumulative, bool notifyPeer) const
+ {
+ const_cast<Session&>(session).getExecution().markCompleted(id, cumulative, notifyPeer);
+ }
+
+ const framing::MessageTransferBody& Message::getMethod() const
+ {
+ return method;
+ }
+
+ const framing::SequenceNumber& Message::getId() const
+ {
+ return id;
+ }
+
+ /**@internal for incoming messages */
+ Message::Message(const framing::FrameSet& frameset, Session s) :
+ method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s)
+ {
+ populate(frameset);
+ }
+
+ /**@internal use for incoming messages. */
+ void Message::setSession(Session s) { session=s; }
+
+}}
diff --git a/qpid/cpp/src/qpid/client/Message.h b/qpid/cpp/src/qpid/client/Message.h
index daac30ba36..977cc89146 100644
--- a/qpid/cpp/src/qpid/client/Message.h
+++ b/qpid/cpp/src/qpid/client/Message.h
@@ -30,7 +30,7 @@ namespace qpid {
namespace client {
/**
- * A representation of messages for sent or recived through the
+ * A representation of messages for sent or received through the
* client api.
*
* \ingroup clientapi
@@ -38,60 +38,21 @@ namespace client {
class Message : public framing::TransferContent
{
public:
- Message(const std::string& data_=std::string(),
+ Message(const std::string& data=std::string(),
const std::string& routingKey=std::string(),
- const std::string& exchange=std::string()
- ) : TransferContent(data_, routingKey, exchange) {}
-
- std::string getDestination() const
- {
- return method.getDestination();
- }
-
- bool isRedelivered() const
- {
- return hasDeliveryProperties() && getDeliveryProperties().getRedelivered();
- }
-
- void setRedelivered(bool redelivered)
- {
- getDeliveryProperties().setRedelivered(redelivered);
- }
-
- framing::FieldTable& getHeaders()
- {
- return getMessageProperties().getApplicationHeaders();
- }
-
- void acknowledge(Session& session, bool cumulative = true, bool send = true) const
- {
- session.getExecution().completed(id, cumulative, send);
- }
-
- void acknowledge(bool cumulative = true, bool send = true) const
- {
- const_cast<Session&>(session).getExecution().completed(id, cumulative, send);
- }
+ const std::string& exchange=std::string());
+ std::string getDestination() const;
+ bool isRedelivered() const;
+ void setRedelivered(bool redelivered);
+ framing::FieldTable& getHeaders();
+ void acknowledge(bool cumulative = true, bool notifyPeer = true) const;
+ const framing::MessageTransferBody& getMethod() const;
+ const framing::SequenceNumber& getId() const;
/**@internal for incoming messages */
- Message(const framing::FrameSet& frameset, Session s) :
- method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s)
- {
- populate(frameset);
- }
-
- const framing::MessageTransferBody& getMethod() const
- {
- return method;
- }
-
- const framing::SequenceNumber& getId() const
- {
- return id;
- }
-
+ Message(const framing::FrameSet& frameset, Session s);
/**@internal use for incoming messages. */
- void setSession(Session s) { session=s; }
+ void setSession(Session s);
private:
//method and id are only set for received messages:
framing::MessageTransferBody method;
diff --git a/qpid/cpp/src/qpid/client/Results.cpp b/qpid/cpp/src/qpid/client/Results.cpp
new file mode 100644
index 0000000000..1fb3a6fec9
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/Results.cpp
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Results.h"
+#include "FutureResult.h"
+
+using namespace qpid::framing;
+using namespace boost;
+
+namespace qpid {
+namespace client {
+
+Results::Results() {}
+
+void Results::close()
+{
+ for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) {
+ i->second->completed();
+ }
+ listeners.clear();
+}
+
+void Results::completed(const SequenceSet& set)
+{
+ //call complete on those listeners whose ids fall within the set
+ Listeners::iterator i = listeners.begin();
+ while (i != listeners.end()) {
+ if (set.contains(i->first)) {
+ i->second->completed();
+ listeners.erase(i++);
+ } else {
+ i++;
+ }
+ }
+}
+
+void Results::received(const SequenceNumber& id, const std::string& result)
+{
+ Listeners::iterator i = listeners.find(id);
+ if (i != listeners.end()) {
+ i->second->received(result);
+ listeners.erase(i);
+ }
+}
+
+Results::FutureResultPtr Results::listenForResult(const SequenceNumber& id)
+{
+ FutureResultPtr l(new FutureResult());
+ listeners[id] = l;
+ return l;
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/client/Response.h b/qpid/cpp/src/qpid/client/Results.h
index 2b7d55ec1f..e17021b327 100644
--- a/qpid/cpp/src/qpid/client/Response.h
+++ b/qpid/cpp/src/qpid/client/Results.h
@@ -19,34 +19,37 @@
*
*/
-#ifndef _Response_
-#define _Response_
-
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/SequenceSet.h"
+#include <map>
#include <boost/shared_ptr.hpp>
-#include "qpid/framing/amqp_framing.h"
-#include "Completion.h"
+
+#ifndef _Results_
+#define _Results_
namespace qpid {
namespace client {
-class Response : public Completion
+class FutureResult;
+
+class Results
{
public:
- Response(Future f, shared_ptr<SessionCore> s) : Completion(f, s) {}
-
- template <class T> T& as()
- {
- framing::AMQMethodBody* response(future.getResponse(*session));
- return *boost::polymorphic_downcast<T*>(response);
- }
-
- template <class T> bool isA()
- {
- framing::AMQMethodBody* response(future.getResponse(*session));
- return response && response->isA<T>();
- }
+ typedef boost::shared_ptr<FutureResult> FutureResultPtr;
+
+ Results();
+ void completed(const framing::SequenceSet& set);
+ void received(const framing::SequenceNumber& id, const std::string& result);
+ FutureResultPtr listenForResult(const framing::SequenceNumber& point);
+ void close();
+
+private:
+ typedef std::map<framing::SequenceNumber, FutureResultPtr> Listeners;
+ Listeners listeners;
};
-}}
+}
+}
+
#endif
diff --git a/qpid/cpp/src/qpid/client/SessionBase.cpp b/qpid/cpp/src/qpid/client/SessionBase.cpp
index 0e1fa67bda..d6a7571e9f 100644
--- a/qpid/cpp/src/qpid/client/SessionBase.cpp
+++ b/qpid/cpp/src/qpid/client/SessionBase.cpp
@@ -19,6 +19,7 @@
*
*/
#include "SessionBase.h"
+#include "qpid/framing/all_method_bodies.h"
namespace qpid {
namespace client {
@@ -26,7 +27,7 @@ using namespace framing;
SessionBase::SessionBase() {}
SessionBase::~SessionBase() {}
-SessionBase::SessionBase(shared_ptr<SessionCore> core) : impl(core) {}
+SessionBase::SessionBase(shared_ptr<SessionImpl> core) : impl(core) {}
void SessionBase::suspend() { impl->suspend(); }
void SessionBase::close() { impl->close(); }
@@ -37,14 +38,30 @@ SynchronousMode SessionBase::getSynchronous() const {
return SynchronousMode(impl->isSync());
}
-Execution& SessionBase::getExecution() { return impl->getExecution(); }
+Execution& SessionBase::getExecution()
+{
+ return *impl;
+}
+
+void SessionBase::sync()
+{
+ ExecutionSyncBody b;
+ b.setSync(true);
+ impl->send(b).wait(*impl);
+}
+
Uuid SessionBase::getId() const { return impl->getId(); }
framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
-void SessionBase::sync() {
- Execution& ex = getExecution();
- ex.syncWait(ex.lastSent());
- impl->assertOpen();
+SessionBase::ScopedSync::ScopedSync(SessionBase& s) : session(s), change(!s.isSynchronous())
+{
+ if (change) session.setSynchronous(true);
}
+SessionBase::ScopedSync::~ScopedSync()
+{
+ if (change) session.setSynchronous(false);
+}
+
+
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/SessionBase.h b/qpid/cpp/src/qpid/client/SessionBase.h
index 3565145bb9..54484113b1 100644
--- a/qpid/cpp/src/qpid/client/SessionBase.h
+++ b/qpid/cpp/src/qpid/client/SessionBase.h
@@ -29,8 +29,8 @@
#include "qpid/framing/TransferContent.h"
#include "qpid/client/Completion.h"
#include "qpid/client/ConnectionImpl.h"
-#include "qpid/client/Response.h"
-#include "qpid/client/SessionCore.h"
+#include "qpid/client/Execution.h"
+#include "qpid/client/SessionImpl.h"
#include "qpid/client/TypedResult.h"
#include "qpid/shared_ptr.h"
#include <string>
@@ -61,8 +61,11 @@ using framing::Uuid;
* <em>later</em> function call.
*
* If you need to notify some extenal agent that some actions have
- * been taken (e.g. binding queues to exchanages), you must call
- * Session::sync() first, to ensure that all the commands are complete.
+ * been taken (e.g. binding queues to exchanges), you must ensure that
+ * the broker has completed the command. In synchronous mode this is
+ * when the session method for the command returns. In asynchronous
+ * mode you can call Session::sync(), to ensure that all the commands
+ * are complete.
*
* You can freely switch between modes by calling Session::setSynchronous()
*
@@ -77,6 +80,21 @@ enum SynchronousMode { SYNC=true, ASYNC=false };
class SessionBase
{
public:
+ /**
+ * Instances of this class turn synchronous mode on for the
+ * duration of their scope (and revert back to async if required
+ * afterwards).
+ */
+ class ScopedSync
+ {
+ SessionBase& session;
+ const bool change;
+ public:
+ ScopedSync(SessionBase& s);
+ ~ScopedSync();
+ };
+
+
SessionBase();
~SessionBase();
@@ -110,23 +128,17 @@ class SessionBase
/** Close the session */
void close();
-
- /**
- * Synchronize with the broker. Wait for all commands issued so far in
- * the session to complete.
- * @see SynchronousMode
- */
- void sync();
Execution& getExecution();
+ void sync();
typedef framing::TransferContent DefaultContent;
protected:
- shared_ptr<SessionCore> impl;
+ shared_ptr<SessionImpl> impl;
framing::ProtocolVersion version;
friend class Connection;
- SessionBase(shared_ptr<SessionCore>);
+ SessionBase(shared_ptr<SessionImpl>);
};
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/SessionCore.cpp b/qpid/cpp/src/qpid/client/SessionCore.cpp
deleted file mode 100644
index 5079c47b5e..0000000000
--- a/qpid/cpp/src/qpid/client/SessionCore.cpp
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "SessionCore.h"
-#include "Future.h"
-#include "FutureResponse.h"
-#include "FutureResult.h"
-#include "ConnectionImpl.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/constants.h"
-#include "qpid/framing/ClientInvoker.h"
-#include "qpid/log/Statement.h"
-
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace client {
-
-using namespace qpid::framing;
-
-namespace { const std::string OK="ok"; }
-
-typedef sys::Monitor::ScopedLock Lock;
-typedef sys::Monitor::ScopedUnlock UnLock;
-
-inline void SessionCore::invariant() const {
- switch (state.get()) {
- case OPENING:
- assert(!session);
- assert(code==REPLY_SUCCESS);
- assert(connection);
- assert(channel.get());
- assert(channel.next == connection.get());
- break;
- case RESUMING:
- assert(session);
- assert(session->getState() == SessionState::RESUMING);
- assert(code==REPLY_SUCCESS);
- assert(connection);
- assert(channel.get());
- assert(channel.next == connection.get());
- break;
- case OPEN:
- case CLOSING:
- case SUSPENDING:
- assert(session);
- assert(connection);
- assert(channel.get());
- assert(channel.next == connection.get());
- break;
- case SUSPENDED:
- assert(session);
- assert(!connection);
- break;
- case CLOSED:
- assert(!session);
- assert(!connection);
- break;
- }
-}
-
-inline void SessionCore::setState(State s) {
- state = s;
- invariant();
-}
-
-inline void SessionCore::waitFor(State s) {
- invariant();
- // We can be CLOSED or SUSPENDED by error at any time.
- state.waitFor(States(s, CLOSED, SUSPENDED));
- check();
- invariant();
-}
-
-SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn,
- uint16_t ch, uint64_t maxFrameSize)
- : l3(maxFrameSize),
- sync(false),
- channel(ch),
- proxy(channel),
- state(OPENING),
- detachedLifetime(0)
-{
- l3.out = &out;
- attaching(conn);
-}
-
-void SessionCore::attaching(shared_ptr<ConnectionImpl> c) {
- assert(c);
- assert(channel.get());
- connection = c;
- channel.next = connection.get();
- code = REPLY_SUCCESS;
- text = OK;
- state = session ? RESUMING : OPENING;
- invariant();
-}
-
-SessionCore::~SessionCore() {
- Lock l(state);
- detach(COMMAND_INVALID, "Session deleted");
- state.waitWaiters();
-}
-
-void SessionCore::detach(int c, const std::string& t) {
- connection.reset();
- channel.next = 0;
- code=c;
- text=t;
- l3.getDemux().close();
-}
-
-void SessionCore::doClose(int code, const std::string& text) {
- if (state != CLOSED) {
- session.reset();
- detach(code, text);
- setState(CLOSED);
- l3.getCompletionTracker().close();
- }
- invariant();
-}
-
-void SessionCore::doSuspend(int code, const std::string& text) {
- if (state != CLOSED && state != SUSPENDED) {
- detach(code, text);
- session->suspend();
- setState(SUSPENDED);
- }
- invariant();
-}
-
-ExecutionHandler& SessionCore::getExecution() { // user thread
- return l3;
-}
-
-void SessionCore::setSync(bool s) { // user thread
- sync = s;
-}
-
-bool SessionCore::isSync() { // user thread
- return sync;
-}
-
-FrameSet::shared_ptr SessionCore::get() { // user thread
- // No lock here: pop does a blocking wait.
- return l3.getDemux().getDefault()->pop();
-}
-
-static const std::string CANNOT_REOPEN_SESSION="Cannot re-open a session.";
-
-void SessionCore::open(uint32_t timeout) { // user thread
- Lock l(state);
- check(state==OPENING && !session,
- COMMAND_INVALID, CANNOT_REOPEN_SESSION);
- detachedLifetime=timeout;
- proxy.open(detachedLifetime);
- waitFor(OPEN);
-}
-
-void SessionCore::close() { // user thread
- Lock l(state);
- check();
- if (state==OPEN) {
- setState(CLOSING);
- proxy.close();
- waitFor(CLOSED);
- }
- else
- doClose(REPLY_SUCCESS, OK);
-}
-
-void SessionCore::suspend() { // user thread
- Lock l(state);
- checkOpen();
- setState(SUSPENDING);
- proxy.suspend();
- waitFor(SUSPENDED);
-}
-
-void SessionCore::setChannel(uint16_t ch) { channel=ch; }
-
-static const std::string CANNOT_RESUME_SESSION("Session cannot be resumed.");
-
-void SessionCore::resume(shared_ptr<ConnectionImpl> c) {
- // user thread
- {
- Lock l(state);
- if (state==SUSPENDED) { // Clear error that caused suspend
- code=REPLY_SUCCESS;
- text=OK;
- }
- check(state==SUSPENDED, COMMAND_INVALID, CANNOT_RESUME_SESSION);
- SequenceNumber sendAck=session->resuming();
- attaching(c);
- proxy.resume(getId());
- waitFor(OPEN);
- proxy.ack(sendAck, SequenceNumberSet());
- // TODO aconway 2007-10-23: Replay inside the lock might be a prolem
- // for large replay sets.
- SessionState::Replay replay=session->replay();
- for (SessionState::Replay::iterator i = replay.begin();
- i != replay.end(); ++i)
- {
- invariant();
- channel.handle(*i); // Direct to channel.
- check();
- }
- l3.getDemux().open();
- }
-}
-
-void SessionCore::assertOpen() const {
- Lock l(state);
- checkOpen();
-}
-
-static const std::string UNEXPECTED_SESSION_ATTACHED(
- "Received unexpected session.attached");
-
-static const std::string INVALID_SESSION_RESUME_ID(
- "session.resumed has invalid ID.");
-
-// network thread
-void SessionCore::attached(const Uuid& sessionId,
- uint32_t /*detachedLifetime*/)
-{
- Lock l(state);
- invariant();
- check(state == OPENING || state == RESUMING,
- COMMAND_INVALID, UNEXPECTED_SESSION_ATTACHED);
- if (state==OPENING) { // New session
- // TODO aconway 2007-10-17: 0 disables sesskon.ack for now.
- // If AMQP WG decides to keep it, we need to add configuration
- // for the ack rate.
- session=in_place<SessionState>(0, detachedLifetime > 0, sessionId);
- setState(OPEN);
- }
- else { // RESUMING
- check(sessionId == session->getId(),
- INVALID_ARGUMENT, INVALID_SESSION_RESUME_ID);
- // Don't setState yet, wait for first incoming ack.
- }
-}
-
-static const std::string UNEXPECTED_SESSION_DETACHED(
- "Received unexpected session.detached.");
-
-static const std::string UNEXPECTED_SESSION_ACK(
- "Received unexpected session.ack");
-
-void SessionCore::detached() { // network thread
- Lock l(state);
- check(state == SUSPENDING,
- COMMAND_INVALID, UNEXPECTED_SESSION_DETACHED);
- doSuspend(REPLY_SUCCESS, OK);
-}
-
-void SessionCore::ack(uint32_t ack, const SequenceNumberSet&) {
- Lock l(state);
- invariant();
- check(state==OPEN || state==RESUMING,
- COMMAND_INVALID, UNEXPECTED_SESSION_ACK);
- session->receivedAck(ack);
- if (state==RESUMING) {
- setState(OPEN);
- }
- invariant();
-}
-
-void SessionCore::closed(uint16_t code, const std::string& text)
-{ // network thread
- Lock l(state);
- invariant();
- doClose(code, text);
-}
-
-// closed by connection
-void SessionCore::connectionClosed(uint16_t code, const std::string& text) {
- Lock l(state);
- try {
- doClose(code, text);
- } catch(...) { assert (0); }
-}
-
-void SessionCore::connectionBroke(uint16_t code, const std::string& text) {
- Lock l(state);
- try {
- doSuspend(code, text);
- } catch (...) { assert(0); }
-}
-
-void SessionCore::check() const { // Called with lock held.
- invariant();
- if (code != REPLY_SUCCESS)
- throwReplyException(code, text);
-}
-
-void SessionCore::check(bool cond, int newCode, const std::string& msg) const {
- check();
- if (!cond) {
- const_cast<SessionCore*>(this)->doClose(newCode, msg);
- throwReplyException(code, text);
- }
-}
-
-static const std::string SESSION_NOT_OPEN("Session is not open");
-
-void SessionCore::checkOpen() const {
- if (state==SUSPENDED) {
- std::string cause;
- if (code != REPLY_SUCCESS)
- cause=" by :"+text;
- throw CommandInvalidException(QPID_MSG("Session is suspended" << cause));
- }
- check(state==OPEN, COMMAND_INVALID, SESSION_NOT_OPEN);
-}
-
-Future SessionCore::send(const AMQBody& command)
-{
- Lock l(state);
- checkOpen();
- command.getMethod()->setSync(sync);
- Future f;
- //any result/response listeners must be set before the command is sent
- if (command.getMethod()->resultExpected()) {
- boost::shared_ptr<FutureResult> r(new FutureResult());
- f.setFutureResult(r);
- //result listener is tied to command id, and is set when that
- //is allocated by the execution handler, so pass it to send
- f.setCommandId(l3.send(command, boost::bind(&FutureResult::received, r, _1)));
- } else {
- if (command.getMethod()->responseExpected()) {
- boost::shared_ptr<FutureResponse> r(new FutureResponse());
- f.setFutureResponse(r);
- l3.getCorrelator().listen(boost::bind(&FutureResponse::received, r, _1));
- }
-
- f.setCommandId(l3.send(command));
- }
- return f;
-}
-
-Future SessionCore::send(const AMQBody& command, const MethodContent& content)
-{
- Lock l(state);
- checkOpen();
- //content bearing methods don't currently have responses or
- //results, if that changes should follow procedure for the other
- //send method impl:
- return Future(l3.send(command, content));
-}
-
-namespace {
-bool isCloseResponse(const AMQFrame& frame) {
- return frame.getMethod() &&
- frame.getMethod()->amqpClassId() == SESSION_CLASS_ID &&
- frame.getMethod()->amqpMethodId() == SESSION_CLOSED_METHOD_ID;
-}
-}
-
-// Network thread.
-void SessionCore::handleIn(AMQFrame& frame) {
- ConnectionImpl::shared_ptr save;
- {
- Lock l(state);
- save=connection;
- // Ignore frames received while closing other than closed response.
- if (state==CLOSING && !isCloseResponse(frame))
- return;
- }
- try {
- // Cast to expose private SessionHandler functions.
- if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
- // If we were detached by a session command, tell the connection.
- if (!connection) save->erase(channel);
- }
- else {
- session->received(frame);
- l3.handle(frame);
- }
- } catch (const ChannelException& e) {
- QPID_LOG(error, "Channel exception:" << e.what());
- doClose(e.code, e.what());
- }
-}
-
-void SessionCore::handleOut(AMQFrame& frame)
-{
- Lock l(state);
- if (state==OPEN) {
- if (detachedLifetime > 0 && session->sent(frame))
- proxy.solicitAck();
- channel.handle(frame);
- }
-}
-
-void SessionCore::solicitAck( ) {
- Lock l(state);
- checkOpen();
- proxy.ack(session->sendingAck(), SequenceNumberSet());
-}
-
-void SessionCore::flow(bool) {
- assert(0); throw NotImplementedException("session.flow");
-}
-
-void SessionCore::flowOk(bool /*active*/) {
- assert(0); throw NotImplementedException("session.flow");
-}
-
-void SessionCore::highWaterMark(uint32_t /*lastSentMark*/) {
- // TODO aconway 2007-10-02: may be removed from spec.
- assert(0); throw NotImplementedException("session.highWaterMark");
-}
-
-const Uuid SessionCore::getId() const {
- if (session)
- return session->getId();
- throw Exception(QPID_MSG("Closed session, no ID."));
-}
-
-}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/SessionCore.h b/qpid/cpp/src/qpid/client/SessionCore.h
deleted file mode 100644
index 2bb0f41fbf..0000000000
--- a/qpid/cpp/src/qpid/client/SessionCore.h
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#ifndef _SessionCore_
-#define _SessionCore_
-
-#include "qpid/shared_ptr.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/ChannelHandler.h"
-#include "qpid/framing/SessionState.h"
-#include "qpid/framing/SequenceNumber.h"
-#include "qpid/framing/AMQP_ClientOperations.h"
-#include "qpid/framing/AMQP_ServerProxy.h"
-#include "qpid/sys/StateMonitor.h"
-#include "ExecutionHandler.h"
-
-#include <boost/optional.hpp>
-
-namespace qpid {
-namespace framing {
-class FrameSet;
-class MethodContent;
-class SequenceNumberSet;
-}
-
-namespace client {
-
-class Future;
-class ConnectionImpl;
-
-/**
- * Session implementation, sets up handler chains.
- * Attaches to a SessionHandler when active, detaches
- * when closed.
- */
-class SessionCore : public framing::FrameHandler::InOutHandler,
- private framing::AMQP_ClientOperations::SessionHandler
-{
- public:
- SessionCore(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
- ~SessionCore();
-
- framing::FrameSet::shared_ptr get();
- const framing::Uuid getId() const;
- uint16_t getChannel() const { return channel; }
- void assertOpen() const;
-
- // NOTE: Public functions called in user thread.
- void open(uint32_t detachedLifetime);
- void close();
- void resume(shared_ptr<ConnectionImpl>);
- void suspend();
- void setChannel(uint16_t channel);
-
- void setSync(bool s);
- bool isSync();
- ExecutionHandler& getExecution();
-
- Future send(const framing::AMQBody& command);
-
- Future send(const framing::AMQBody& command, const framing::MethodContent& content);
-
- void connectionClosed(uint16_t code, const std::string& text);
- void connectionBroke(uint16_t code, const std::string& text);
-
- private:
- enum State {
- OPENING,
- RESUMING,
- OPEN,
- CLOSING,
- SUSPENDING,
- SUSPENDED,
- CLOSED
- };
- typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
- typedef sys::StateMonitor<State, CLOSED> StateMonitor;
- typedef StateMonitor::Set States;
-
- inline void invariant() const;
- inline void setState(State s);
- inline void waitFor(State);
- void doClose(int code, const std::string& text);
- void doSuspend(int code, const std::string& text);
-
- /** If there is an error, throw the exception */
- void check(bool condition, int code, const std::string& text) const;
- /** Throw if *error */
- void check() const;
-
- void handleIn(framing::AMQFrame& frame);
- void handleOut(framing::AMQFrame& frame);
-
- // Private functions are called by broker in network thread.
- void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
- void flow(bool active);
- void flowOk(bool active);
- void detached();
- void ack(uint32_t cumulativeSeenMark,
- const framing::SequenceNumberSet& seenFrameSet);
- void highWaterMark(uint32_t lastSentMark);
- void solicitAck();
- void closed(uint16_t code, const std::string& text);
-
- void attaching(shared_ptr<ConnectionImpl>);
- void detach(int code, const std::string& text);
- void checkOpen() const;
-
- int code; // Error code
- std::string text; // Error text
- boost::optional<framing::SessionState> session;
- shared_ptr<ConnectionImpl> connection;
- ExecutionHandler l3;
- volatile bool sync;
- framing::ChannelHandler channel;
- framing::AMQP_ServerProxy::Session proxy;
- mutable StateMonitor state;
- uint32_t detachedLifetime;
-};
-
-}} // namespace qpid::client
-
-#endif
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp
new file mode 100644
index 0000000000..57f12cf28e
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp
@@ -0,0 +1,605 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SessionImpl.h"
+
+#include "ConnectionImpl.h"
+#include "Future.h"
+
+#include "qpid/framing/all_method_bodies.h"
+#include "qpid/framing/ClientInvoker.h"
+#include "qpid/framing/constants.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/MethodContent.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/log/Statement.h"
+
+#include <boost/bind.hpp>
+
+namespace { const std::string OK="ok"; }
+
+namespace qpid {
+namespace client {
+
+using namespace qpid::framing;
+
+typedef sys::Monitor::ScopedLock Lock;
+typedef sys::Monitor::ScopedUnlock UnLock;
+
+
+SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn,
+ uint16_t ch, uint64_t _maxFrameSize)
+ : code(REPLY_SUCCESS),
+ text(OK),
+ state(INACTIVE),
+ syncMode(false),
+ detachedLifetime(0),
+ maxFrameSize(_maxFrameSize),
+ id(true),//generate unique uuid for each session
+ name(id.str()),
+ //TODO: may want to allow application defined names instead
+ connection(conn),
+ channel(ch),
+ proxy(channel),
+ nextIn(0),
+ nextOut(0)
+{
+ channel.next = connection.get();
+}
+
+SessionImpl::~SessionImpl() {
+ Lock l(state);
+ if (state != DETACHED) {
+ QPID_LOG(warning, "Detaching deleted session");
+ setState(DETACHED);
+ handleClosed();
+ state.waitWaiters();
+ }
+ connection->erase(channel);
+}
+
+void SessionImpl::setSync(bool s) // user thread
+{
+ syncMode = s; //syncMode is volatile
+}
+
+bool SessionImpl::isSync() // user thread
+{
+ return syncMode; //syncMode is volatile
+}
+
+FrameSet::shared_ptr SessionImpl::get() // user thread
+{
+ // No lock here: pop does a blocking wait.
+ return demux.getDefault()->pop();
+}
+
+const Uuid SessionImpl::getId() const //user thread
+{
+ return id; //id is immutable
+}
+
+void SessionImpl::open(uint32_t timeout) // user thread
+{
+ Lock l(state);
+ if (state == INACTIVE) {
+ setState(ATTACHING);
+ proxy.attach(name, false);
+ waitFor(ATTACHED);
+ //TODO: timeout will not be set locally until get response to
+ //confirm, should we wait for that?
+ proxy.requestTimeout(timeout);
+ proxy.commandPoint(nextOut, 0);
+ } else {
+ throw Exception("Open already called for this session");
+ }
+}
+
+void SessionImpl::close() //user thread
+{
+ Lock l(state);
+ if (detachedLifetime) {
+ proxy.requestTimeout(0);
+ //should we wait for the timeout response?
+ detachedLifetime = 0;
+ }
+ detach();
+ waitFor(DETACHED);
+}
+
+void SessionImpl::resume(shared_ptr<ConnectionImpl>) // user thread
+{
+ throw NotImplementedException("Resume not yet implemented by client!");
+}
+
+void SessionImpl::suspend() //user thread
+{
+ Lock l(state);
+ detach();
+}
+
+void SessionImpl::detach() //call with lock held
+{
+ if (state == ATTACHED) {
+ proxy.detach(name);
+ setState(DETACHING);
+ }
+}
+
+
+uint16_t SessionImpl::getChannel() const // user thread
+{
+ return channel;
+}
+
+void SessionImpl::setChannel(uint16_t c) // user thread
+{
+ //channel will only ever be set when session is detached (and
+ //about to be resumed)
+ channel = c;
+}
+
+Demux& SessionImpl::getDemux()
+{
+ return demux;
+}
+
+void SessionImpl::waitForCompletion(const SequenceNumber& id)
+{
+ Lock l(state);
+ waitForCompletionImpl(id);
+}
+
+void SessionImpl::waitForCompletionImpl(const SequenceNumber& id) //call with lock held
+{
+ while (incompleteOut.contains(id)) {
+ checkOpen();
+ state.wait();
+ }
+}
+
+bool SessionImpl::isComplete(const SequenceNumber& id)
+{
+ Lock l(state);
+ return !incompleteOut.contains(id);
+}
+
+struct IsCompleteUpTo
+{
+ const SequenceNumber& id;
+ bool result;
+
+ IsCompleteUpTo(const SequenceNumber& _id) : id(_id), result(true) {}
+ void operator()(const SequenceNumber& start, const SequenceNumber&)
+ {
+ if (start <= id) result = false;
+ }
+
+};
+
+bool SessionImpl::isCompleteUpTo(const SequenceNumber& id)
+{
+ Lock l(state);
+ //return false if incompleteOut contains anything less than id,
+ //true otherwise
+ IsCompleteUpTo f(id);
+ incompleteIn.for_each(f);
+ return f.result;
+}
+
+struct MarkCompleted
+{
+ const SequenceNumber& id;
+ SequenceSet& completedIn;
+
+ MarkCompleted(const SequenceNumber& _id, SequenceSet& set) : id(_id), completedIn(set) {}
+
+ void operator()(const SequenceNumber& start, const SequenceNumber& end)
+ {
+ if (id >= end) {
+ completedIn.add(start, end);
+ } else if (id >= start) {
+ completedIn.add(start, id);
+ }
+ }
+
+};
+
+void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool notifyPeer)
+{
+ Lock l(state);
+ if (cumulative) {
+ //everything in incompleteIn less than or equal to id is now complete
+ MarkCompleted f(id, completedIn);
+ incompleteIn.for_each(f);
+ //make sure id itself is in
+ completedIn.add(id);
+ //then remove anything thats completed from the incomplete set
+ incompleteIn.remove(completedIn);
+ } else if (incompleteIn.contains(id)) {
+ incompleteIn.remove(id);
+ completedIn.add(id);
+ }
+ if (notifyPeer) {
+ sendCompletion();
+ }
+}
+
+/**
+ * Called by ConnectionImpl to notify active sessions when connection
+ * is explictly closed
+ */
+void SessionImpl::connectionClosed(uint16_t _code, const std::string& _text)
+{
+ Lock l(state);
+ code = _code;
+ text = _text;
+ setState(DETACHED);
+ handleClosed();
+}
+
+/**
+ * Called by ConnectionImpl to notify active sessions when connection
+ * is disconnected
+ */
+void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text)
+{
+ connectionClosed(_code, _text);
+}
+
+Future SessionImpl::send(const AMQBody& command)
+{
+ return sendCommand(command);
+}
+
+Future SessionImpl::send(const AMQBody& command, const MethodContent& content)
+{
+ return sendCommand(command, &content);
+}
+
+Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content)
+{
+ Lock l(state);
+ checkOpen();
+ SequenceNumber id = nextOut++;
+ incompleteOut.add(id);
+
+ if (syncMode) command.getMethod()->setSync(syncMode);
+ Future f(id);
+ if (command.getMethod()->resultExpected()) {
+ //result listener must be set before the command is sent
+ f.setFutureResult(results.listenForResult(id));
+ }
+ AMQFrame frame(command);
+ if (content) {
+ frame.setEof(false);
+ }
+ handleOut(frame);
+ if (content) {
+ sendContent(*content);
+ }
+ if (syncMode) {
+ waitForCompletionImpl(id);
+ }
+ return f;
+}
+
+void SessionImpl::sendContent(const MethodContent& content)
+{
+ AMQFrame header(content.getHeader());
+ header.setBof(false);
+ u_int64_t data_length = content.getData().length();
+ if(data_length > 0){
+ header.setEof(false);
+ handleOut(header);
+ /*Note: end of frame marker included in overhead but not in size*/
+ const u_int32_t frag_size = maxFrameSize - (AMQFrame::frameOverhead() - 1);
+
+ if(data_length < frag_size){
+ AMQFrame frame(in_place<AMQContentBody>(content.getData()));
+ frame.setBof(false);
+ handleOut(frame);
+ }else{
+ u_int32_t offset = 0;
+ u_int32_t remaining = data_length - offset;
+ while (remaining > 0) {
+ u_int32_t length = remaining > frag_size ? frag_size : remaining;
+ string frag(content.getData().substr(offset, length));
+ AMQFrame frame(in_place<AMQContentBody>(frag));
+ frame.setBof(false);
+ if (offset > 0) {
+ frame.setBos(false);
+ }
+ offset += length;
+ remaining = data_length - offset;
+ if (remaining) {
+ frame.setEos(false);
+ frame.setEof(false);
+ }
+ handleOut(frame);
+ }
+ }
+ } else {
+ handleOut(header);
+ }
+}
+
+
+bool isMessageMethod(AMQMethodBody* method)
+{
+ return method->isA<MessageTransferBody>();
+}
+
+bool isMessageMethod(AMQBody* body)
+{
+ AMQMethodBody* method=body->getMethod();
+ return method && isMessageMethod(method);
+}
+
+bool isContentFrame(AMQFrame& frame)
+{
+ AMQBody* body = frame.getBody();
+ uint8_t type = body->type();
+ return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body);
+}
+
+void SessionImpl::handleIn(AMQFrame& frame) // network thread
+{
+ try {
+ if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
+ if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) {
+ //make sure the command id sequence and completion
+ //tracking takes account of execution commands
+ Lock l(state);
+ completedIn.add(nextIn++);
+ } else {
+ //if not handled by this class, its for the application:
+ deliver(frame);
+ }
+ }
+ } catch (const SessionException& e) {
+ //TODO: proper 0-10 exception handling
+ QPID_LOG(error, "Session exception:" << e.what());
+ Lock l(state);
+ code = e.code;
+ text = e.what();
+ }
+}
+
+void SessionImpl::handleOut(AMQFrame& frame) // user thread
+{
+ channel.handle(frame);
+}
+
+void SessionImpl::deliver(AMQFrame& frame) // network thread
+{
+ if (!arriving) {
+ arriving = FrameSet::shared_ptr(new FrameSet(nextIn++));
+ }
+ arriving->append(frame);
+ if (arriving->isComplete()) {
+ //message.transfers will be marked completed only when 'acked'
+ //as completion affects flow control; other commands will be
+ //considered completed as soon as processed here
+ if (arriving->isA<MessageTransferBody>()) {
+ Lock l(state);
+ incompleteIn.add(arriving->getId());
+ } else {
+ Lock l(state);
+ completedIn.add(arriving->getId());
+ }
+ demux.handle(arriving);
+ arriving.reset();
+ }
+}
+
+//control handler methods (called by network thread when controls are
+//received from peer):
+
+void SessionImpl::attach(const std::string& /*name*/, bool /*force*/)
+{
+ throw NotImplementedException("Client does not support attach");
+}
+
+void SessionImpl::attached(const std::string& _name)
+{
+ Lock l(state);
+ if (name != _name) throw InternalErrorException("Incorrect session name");
+ setState(ATTACHED);
+}
+
+void SessionImpl::detach(const std::string& /*name*/)
+{
+ throw NotImplementedException("Client does not support detach");
+}
+
+void SessionImpl::detached(const std::string& _name, uint8_t _code)
+{
+ Lock l(state);
+ if (name != _name) throw InternalErrorException("Incorrect session name");
+ setState(DETACHED);
+ if (_code) {
+ //TODO: make sure this works with execution.exception - don't
+ //want to overwrite the code from that
+ QPID_LOG(error, "Session detached by peer: " << name << " " << code);
+ code = _code;
+ text = "Session detached by peer";
+ }
+ if (detachedLifetime == 0) {
+ handleClosed();
+ }
+}
+
+void SessionImpl::requestTimeout(uint32_t t)
+{
+ Lock l(state);
+ detachedLifetime = t;
+ proxy.timeout(t);
+}
+
+void SessionImpl::timeout(uint32_t t)
+{
+ Lock l(state);
+ detachedLifetime = t;
+}
+
+void SessionImpl::commandPoint(const framing::SequenceNumber& id, uint64_t offset)
+{
+ if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point");
+
+ Lock l(state);
+ nextIn = id;
+}
+
+void SessionImpl::expected(const framing::SequenceSet& commands, const framing::Array& fragments)
+{
+ if (!commands.empty() || fragments.size()) {
+ throw NotImplementedException("Session resumption not yet supported");
+ }
+}
+
+void SessionImpl::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/)
+{
+ //don't really care too much about this yet
+}
+
+void SessionImpl::completed(const framing::SequenceSet& commands, bool timelyReply)
+{
+ Lock l(state);
+ incompleteOut.remove(commands);
+ state.notify();//notify any waiters of completion
+ completedOut.add(commands);
+ //notify any waiting results of completion
+ results.completed(commands);
+
+ if (timelyReply) {
+ proxy.knownCompleted(completedOut);
+ completedOut.clear();
+ }
+}
+
+void SessionImpl::knownCompleted(const framing::SequenceSet& commands)
+{
+ Lock l(state);
+ completedIn.remove(commands);
+}
+
+void SessionImpl::flush(bool expected, bool confirmed, bool completed)
+{
+ Lock l(state);
+ if (expected) {
+ proxy.expected(SequenceSet(nextIn), Array());
+ }
+ if (confirmed) {
+ proxy.confirmed(completedIn, Array());
+ }
+ if (completed) {
+ proxy.completed(completedIn, true);
+ }
+}
+
+void SessionImpl::sendCompletion()
+{
+ proxy.completed(completedIn, true);
+}
+
+void SessionImpl::gap(const framing::SequenceSet& /*commands*/)
+{
+ throw NotImplementedException("gap not yet supported");
+}
+
+
+
+void SessionImpl::sync() {}
+
+void SessionImpl::result(uint32_t commandId, const std::string& value)
+{
+ Lock l(state);
+ results.received(commandId, value);
+}
+
+void SessionImpl::exception(uint16_t errorCode,
+ uint32_t commandId,
+ uint8_t classCode,
+ uint8_t commandCode,
+ uint8_t /*fieldIndex*/,
+ const std::string& description,
+ const framing::FieldTable& /*errorInfo*/)
+{
+ QPID_LOG(warning, "Exception received from peer: " << errorCode << ":" << description
+ << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]");
+
+ Lock l(state);
+ code = errorCode;
+ text = description;
+ if (detachedLifetime) {
+ proxy.requestTimeout(0);
+ //should we wait for the timeout response?
+ detachedLifetime = 0;
+ }
+ detach();
+}
+
+
+//private utility methods:
+
+inline void SessionImpl::setState(State s) //call with lock held
+{
+ state = s;
+}
+
+inline void SessionImpl::waitFor(State s) //call with lock held
+{
+ // We can be DETACHED at any time
+ state.waitFor(States(s, DETACHED));
+ check();
+}
+
+void SessionImpl::check() const //call with lock held.
+{
+ if (code != REPLY_SUCCESS) {
+ throwReplyException(code, text);
+ }
+}
+
+void SessionImpl::checkOpen() const //call with lock held.
+{
+ check();
+ if (state != ATTACHED) {
+ throwReplyException(0, "Session isn't attached");
+ }
+}
+
+void SessionImpl::assertOpen() const
+{
+ Lock l(state);
+ checkOpen();
+}
+
+void SessionImpl::handleClosed()
+{
+ QPID_LOG(info, "SessionImpl::handleClosed(): entering");
+ demux.close();
+ results.close();
+ QPID_LOG(info, "SessionImpl::handleClosed(): returning");
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h
new file mode 100644
index 0000000000..1284670389
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/SessionImpl.h
@@ -0,0 +1,188 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _SessionImpl_
+#define _SessionImpl_
+
+#include "Demux.h"
+#include "Execution.h"
+#include "Results.h"
+
+#include "qpid/shared_ptr.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/ChannelHandler.h"
+#include "qpid/framing/SessionState.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/sys/StateMonitor.h"
+
+#include <boost/optional.hpp>
+
+namespace qpid {
+
+namespace framing {
+
+class FrameSet;
+class MethodContent;
+class SequenceSet;
+
+}
+
+namespace client {
+
+class Future;
+class ConnectionImpl;
+
+class SessionImpl : public framing::FrameHandler::InOutHandler,
+ public Execution,
+ private framing::AMQP_ClientOperations::Session010Handler,
+ private framing::AMQP_ClientOperations::Execution010Handler
+{
+public:
+ SessionImpl(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
+ ~SessionImpl();
+
+
+ //NOTE: Public functions called in user thread.
+ framing::FrameSet::shared_ptr get();
+
+ const framing::Uuid getId() const;
+
+ uint16_t getChannel() const;
+ void setChannel(uint16_t channel);
+
+ void open(uint32_t detachedLifetime);
+ void close();
+ void resume(shared_ptr<ConnectionImpl>);
+ void suspend();
+
+ void setSync(bool s);
+ bool isSync();
+ void assertOpen() const;
+
+ Future send(const framing::AMQBody& command);
+ Future send(const framing::AMQBody& command, const framing::MethodContent& content);
+
+ Demux& getDemux();
+ void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
+ bool isComplete(const framing::SequenceNumber& id);
+ bool isCompleteUpTo(const framing::SequenceNumber& id);
+ void waitForCompletion(const framing::SequenceNumber& id);
+
+ //NOTE: these are called by the network thread when the connection is closed or dies
+ void connectionClosed(uint16_t code, const std::string& text);
+ void connectionBroke(uint16_t code, const std::string& text);
+
+private:
+ enum State {
+ INACTIVE,
+ ATTACHING,
+ ATTACHED,
+ DETACHING,
+ DETACHED
+ };
+ typedef framing::AMQP_ClientOperations::Session010Handler SessionHandler;
+ typedef framing::AMQP_ClientOperations::Execution010Handler ExecutionHandler;
+ typedef sys::StateMonitor<State, DETACHED> StateMonitor;
+ typedef StateMonitor::Set States;
+
+ inline void setState(State s);
+ inline void waitFor(State);
+
+ void detach();
+
+ void check() const;
+ void checkOpen() const;
+ void handleClosed();
+
+ void handleIn(framing::AMQFrame& frame);
+ void handleOut(framing::AMQFrame& frame);
+ void deliver(framing::AMQFrame& frame);
+
+ Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
+ void sendContent(const framing::MethodContent&);
+ void waitForCompletionImpl(const framing::SequenceNumber& id);
+
+ void sendCompletion();
+
+ // Note: Following methods are called by network thread in
+ // response to session controls from the broker
+ void attach(const std::string& name, bool force);
+ void attached(const std::string& name);
+ void detach(const std::string& name);
+ void detached(const std::string& name, uint8_t detachCode);
+ void requestTimeout(uint32_t timeout);
+ void timeout(uint32_t timeout);
+ void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);
+ void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
+ void confirmed(const framing::SequenceSet& commands, const framing::Array& fragments);
+ void completed(const framing::SequenceSet& commands, bool timelyReply);
+ void knownCompleted(const framing::SequenceSet& commands);
+ void flush(bool expected, bool confirmed, bool completed);
+ void gap(const framing::SequenceSet& commands);
+
+ // Note: Following methods are called by network thread in
+ // response to execution commands from the broker
+ void sync();
+ void result(uint32_t commandId, const std::string& value);
+ void exception(uint16_t errorCode,
+ uint32_t commandId,
+ uint8_t classCode,
+ uint8_t commandCode,
+ uint8_t fieldIndex,
+ const std::string& description,
+ const framing::FieldTable& errorInfo);
+
+
+ //hack for old generator:
+ void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); }
+
+ int code; // Error code
+ std::string text; // Error text
+ mutable StateMonitor state;
+ volatile bool syncMode;
+ uint32_t detachedLifetime;
+ const uint64_t maxFrameSize;
+ const framing::Uuid id;
+ const std::string name;
+
+
+ shared_ptr<ConnectionImpl> connection;
+ framing::ChannelHandler channel;
+ framing::AMQP_ServerProxy::Session010 proxy;
+
+ Results results;
+ Demux demux;
+ framing::FrameSet::shared_ptr arriving;
+
+ framing::SequenceSet incompleteIn;//incoming commands that are as yet incomplete
+ framing::SequenceSet completedIn;//incoming commands that are have completed
+ framing::SequenceSet incompleteOut;//outgoing commands not yet known to be complete
+ framing::SequenceSet completedOut;//outgoing commands that we know to be completed
+ framing::SequenceNumber nextIn;
+ framing::SequenceNumber nextOut;
+
+};
+
+}} // namespace qpid::client
+
+#endif
diff --git a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
index f14344225c..b353be481b 100644
--- a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -35,7 +35,7 @@ namespace client {
SubscriptionManager::SubscriptionManager(Session& s)
: dispatcher(s), session(s),
messages(UNLIMITED), bytes(UNLIMITED), window(true),
- confirmMode(true), acquireMode(false),
+ acceptMode(0), acquireMode(0),
autoStop(true)
{}
@@ -43,7 +43,7 @@ Completion SubscriptionManager::subscribeInternal(
const std::string& q, const std::string& dest)
{
Completion c = session.messageSubscribe(arg::queue=q, arg::destination=dest,
- arg::confirmMode=confirmMode, arg::acquireMode=acquireMode);
+ arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
setFlowControl(dest, messages, bytes, window);
return c;
}
@@ -68,7 +68,7 @@ Completion SubscriptionManager::subscribe(
void SubscriptionManager::setFlowControl(
const std::string& dest, uint32_t messages, uint32_t bytes, bool window)
{
- session.messageFlowMode(dest, window);
+ session.messageSetFlowMode(dest, window);
session.messageFlow(dest, 0, messages);
session.messageFlow(dest, 1, bytes);
}
@@ -81,7 +81,7 @@ void SubscriptionManager::setFlowControl(
window=window_;
}
-void SubscriptionManager::setConfirmMode(bool c) { confirmMode=c; }
+void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; }
void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; }
diff --git a/qpid/cpp/src/qpid/client/SubscriptionManager.h b/qpid/cpp/src/qpid/client/SubscriptionManager.h
index 1741796f4f..48cb725fb8 100644
--- a/qpid/cpp/src/qpid/client/SubscriptionManager.h
+++ b/qpid/cpp/src/qpid/client/SubscriptionManager.h
@@ -53,7 +53,7 @@ class SubscriptionManager : public sys::Runnable
uint32_t bytes;
bool window;
AckPolicy autoAck;
- bool confirmMode;
+ bool acceptMode;
bool acquireMode;
bool autoStop;
@@ -116,16 +116,16 @@ class SubscriptionManager : public sys::Runnable
*/
void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true);
- /** Set the confirm-mode for new subscriptions. Defaults to true.
- *@param confirm: if true messages must be confirmed by calling
+ /** Set the accept-mode for new subscriptions. Defaults to true.
+ *@param required: if true messages must be confirmed by calling
*Message::acknowledge() or automatically, see setAckPolicy()
*/
- void setConfirmMode(bool confirm);
+ void setAcceptMode(bool required);
/** Set the acquire-mode for new subscriptions. Defaults to false.
*@param acquire: if false messages pre-acquired, if true
* messages are dequed on acknowledgement or on transfer
- * depending on confirmMode.
+ * depending on acceptMode.
*/
void setAcquireMode(bool acquire);
diff --git a/qpid/cpp/src/qpid/client/TypedResult.h b/qpid/cpp/src/qpid/client/TypedResult.h
index edcf728c54..0b36be9716 100644
--- a/qpid/cpp/src/qpid/client/TypedResult.h
+++ b/qpid/cpp/src/qpid/client/TypedResult.h
@@ -33,7 +33,7 @@ template <class T> class TypedResult : public Completion
bool decoded;
public:
- TypedResult(Future f, shared_ptr<SessionCore> s) : Completion(f, s), decoded(false) {}
+ TypedResult(Future f, shared_ptr<SessionImpl> s) : Completion(f, s), decoded(false) {}
T& get()
{
diff --git a/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp b/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp
index a4ea257f0c..b241ee8e36 100644
--- a/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp
@@ -32,10 +32,10 @@ struct ClassifierHandler::Visitor : public FrameDefaultVisitor {
void visit(const ExchangeDeclareBody&) { chosen=&classifier.wiring; }
void visit(const ExchangeDeleteBody&) { chosen=&classifier.wiring; }
- void visit(const QueueBindBody&) { chosen=&classifier.wiring; }
+ void visit(const ExchangeBindBody&) { chosen=&classifier.wiring; }
+ void visit(const ExchangeUnbindBody&) { chosen=&classifier.wiring; }
void visit(const QueueDeclareBody&) { chosen=&classifier.wiring; }
void visit(const QueueDeleteBody&) { chosen=&classifier.wiring; }
- void visit(const QueueUnbindBody&) { chosen=&classifier.wiring; }
void defaultVisit(const AMQBody&) { chosen=&classifier.other; }
using framing::FrameDefaultVisitor::visit;
diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h
index 2318492e2d..5a8e55f9d2 100644
--- a/qpid/cpp/src/qpid/framing/AMQFrame.h
+++ b/qpid/cpp/src/qpid/framing/AMQFrame.h
@@ -105,7 +105,6 @@ class AMQFrame : public AMQDataBlock
void setEos(bool isEos) { eos = isEos; }
static uint32_t frameOverhead();
-
private:
void init() { bof = eof = bos = eos = true; subchannel=0; channel=0; }
diff --git a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
index 8a3a92936e..2064468785 100644
--- a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
+++ b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
@@ -26,8 +26,8 @@
#include "Buffer.h"
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/MessageProperties.h"
-#include "qpid/framing/DeliveryProperties010.h"
-#include "qpid/framing/MessageProperties010.h"
+#include "qpid/framing/PreviewDeliveryProperties.h"
+#include "qpid/framing/PreviewMessageProperties.h"
#include <iostream>
#include <boost/optional.hpp>
@@ -79,8 +79,8 @@ class AMQHeaderBody : public AMQBody
// Could use boost::mpl::fold to construct a larger set.
typedef PropSet< PropSet< PropSet<PropSet<Empty, DeliveryProperties>,
MessageProperties>,
- DeliveryProperties010>,
- MessageProperties010> Properties;
+ PreviewDeliveryProperties>,
+ PreviewMessageProperties> Properties;
Properties properties;
diff --git a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
index b15e14d6f6..0aa1bf7e66 100644
--- a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
+++ b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
@@ -32,8 +32,8 @@
namespace qpid {
namespace framing {
-static ProtocolVersion highestProtocolVersion(99, 0);
-//static ProtocolVersion highestProtocolVersion(0, 10);
+//static ProtocolVersion highestProtocolVersion(99, 0);
+static ProtocolVersion highestProtocolVersion(0, 10);
} /* namespace framing */
} /* namespace qpid */
diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.h b/qpid/cpp/src/qpid/framing/SequenceSet.h
index eabc729411..8b3d6b3f28 100644
--- a/qpid/cpp/src/qpid/framing/SequenceSet.h
+++ b/qpid/cpp/src/qpid/framing/SequenceSet.h
@@ -69,11 +69,12 @@ class SequenceSet {
bool empty() const;
template <class T>
- void for_each(T& t) const
+ T for_each(T& t) const
{
for (Ranges::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
t(i->start, i->end);
}
+ return t;
}
template <class S> void serialize(S& s) { s.split(*this); s(ranges.begin(), ranges.end()); }
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index cace04bef5..769593c8d2 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -24,6 +24,7 @@
#include "qpid/log/Statement.h"
#include <qpid/broker/Message.h>
#include <qpid/broker/MessageDelivery.h>
+#include "qpid/framing/MessageXTransferBody.h"
#include <list>
#include <iostream>
#include <fstream>
@@ -217,7 +218,7 @@ void ManagementAgent::SendBuffer (Buffer& buf,
return;
intrusive_ptr<Message> msg (new Message ());
- AMQFrame method (in_place<MessageTransferBody>(
+ AMQFrame method (in_place<MessageXTransferBody>(
ProtocolVersion(), 0, exchange->getName (), 0, 0));
AMQFrame header (in_place<AMQHeaderBody>());
AMQFrame content(in_place<AMQContentBody>());
@@ -232,8 +233,8 @@ void ManagementAgent::SendBuffer (Buffer& buf,
msg->getFrames().append(method);
msg->getFrames().append(header);
- MessageProperties* props =
- msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ PreviewMessageProperties* props =
+ msg->getFrames().getHeaders()->get<PreviewMessageProperties>(true);
props->setContentLength(length);
msg->getFrames().append(content);
@@ -393,8 +394,8 @@ void ManagementAgent::dispatchMethod (Message& msg,
uint64_t objId = inBuffer.getLongLong ();
string replyToKey;
- const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ const framing::PreviewMessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::PreviewMessageProperties>();
if (p && p->hasReplyTo())
{
const framing::ReplyTo& rt = p->getReplyTo ();
@@ -600,8 +601,8 @@ void ManagementAgent::dispatchAgentCommand (Message& msg)
uint32_t sequence;
string replyToKey;
- const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ const framing::PreviewMessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::PreviewMessageProperties>();
if (p && p->hasReplyTo())
{
const framing::ReplyTo& rt = p->getReplyTo ();
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp
index 44d5ed4650..9b6e0dce21 100644
--- a/qpid/cpp/src/tests/ClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/ClientSessionTest.cpp
@@ -118,13 +118,13 @@ QPID_AUTO_TEST_CASE(testTransfer)
ClientSessionFixture fix;
fix.session=fix.connection.newSession(ASYNC);
fix.declareSubscribe();
- fix.session.messageTransfer(content=TransferContent("my-message", "my-queue"));
+ fix.session.messageTransfer(acceptMode=1, content=TransferContent("my-message", "my-queue"));
//get & test the message:
FrameSet::shared_ptr msg = fix.session.get();
BOOST_CHECK(msg->isA<MessageTransferBody>());
BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
//confirm receipt:
- fix.session.getExecution().completed(msg->getId(), true, true);
+ fix.session.getExecution().markCompleted(msg->getId(), true, true);
}
QPID_AUTO_TEST_CASE(testDispatcher)
@@ -161,6 +161,8 @@ BOOST_FIXTURE_TEST_CASE(testDispatcherThread, ClientSessionFixture)
}
*/
+/*
+ * GS (18-APR-2008): disabled resume tests until resumption for 0-10 final spec is implemented
QPID_AUTO_TEST_CASE(_FIXTURE)
{
ClientSessionFixture fix;
@@ -195,7 +197,7 @@ QPID_AUTO_TEST_CASE(testSuspendResume)
FrameSet::shared_ptr msg = fix.session.get();
BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
}
-
+*/
/**
* Currently broken due to a deadlock in SessionCore
*
diff --git a/qpid/cpp/src/tests/ExchangeTest.cpp b/qpid/cpp/src/tests/ExchangeTest.cpp
index 2904424d5c..94e2c025d6 100644
--- a/qpid/cpp/src/tests/ExchangeTest.cpp
+++ b/qpid/cpp/src/tests/ExchangeTest.cpp
@@ -30,7 +30,6 @@
#include "qpid/broker/TopicExchange.h"
#include "qpid_test_plugin.h"
#include <iostream>
-#include "qpid/framing/BasicGetBody.h"
#include "MessageUtils.h"
using boost::intrusive_ptr;
diff --git a/qpid/cpp/src/tests/FramingTest.cpp b/qpid/cpp/src/tests/FramingTest.cpp
index 0c7adb2af8..275d32acfe 100644
--- a/qpid/cpp/src/tests/FramingTest.cpp
+++ b/qpid/cpp/src/tests/FramingTest.cpp
@@ -23,8 +23,6 @@
#include "qpid/client/Connection.h"
#include "qpid/client/Connector.h"
#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include "qpid/framing/ConnectionRedirectBody.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/all_method_bodies.h"
#include "qpid/framing/amqp_framing.h"
@@ -54,16 +52,12 @@ std::string tostring(const T& x)
class FramingTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(FramingTest);
- CPPUNIT_TEST(testBasicQosBody);
+ CPPUNIT_TEST(testMessageTransferBody);
CPPUNIT_TEST(testConnectionSecureBody);
CPPUNIT_TEST(testConnectionRedirectBody);
- CPPUNIT_TEST(testAccessRequestBody);
- CPPUNIT_TEST(testBasicConsumeBody);
+ CPPUNIT_TEST(testQueueDeclareBody);
CPPUNIT_TEST(testConnectionRedirectBodyFrame);
- CPPUNIT_TEST(testBasicConsumeOkBodyFrame);
- CPPUNIT_TEST(testInlineContent);
- CPPUNIT_TEST(testContentReference);
- CPPUNIT_TEST(testContentValidation);
+ CPPUNIT_TEST(testMessageCancelBodyFrame);
CPPUNIT_TEST_SUITE_END();
private:
@@ -74,14 +68,14 @@ class FramingTest : public CppUnit::TestCase
FramingTest() : version(highestProtocolVersion) {}
- void testBasicQosBody()
+ void testMessageTransferBody()
{
Buffer wbuff(buffer, sizeof(buffer));
- BasicQosBody in(version, 0xCAFEBABE, 0xABBA, true);
+ MessageTransferBody in(version, "my-exchange", 1, 1);
in.encode(wbuff);
Buffer rbuff(buffer, sizeof(buffer));
- BasicQosBody out(version);
+ MessageTransferBody out(version);
out.decode(rbuff);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -104,7 +98,11 @@ class FramingTest : public CppUnit::TestCase
Buffer wbuff(buffer, sizeof(buffer));
std::string a = "hostA";
std::string b = "hostB";
- ConnectionRedirectBody in(version, a, b);
+ Array hosts(0x95);
+ hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(a)));
+ hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(b)));
+
+ ConnectionRedirectBody in(version, a, hosts);
in.encode(wbuff);
Buffer rbuff(buffer, sizeof(buffer));
@@ -113,41 +111,28 @@ class FramingTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
- void testAccessRequestBody()
- {
- Buffer wbuff(buffer, sizeof(buffer));
- std::string s = "text";
- AccessRequestBody in(version, s, true, false, true, false, true);
- in.encode(wbuff);
-
- Buffer rbuff(buffer, sizeof(buffer));
- AccessRequestBody out(version);
- out.decode(rbuff);
- CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
- }
-
- void testBasicConsumeBody()
+ void testQueueDeclareBody()
{
Buffer wbuff(buffer, sizeof(buffer));
- std::string q = "queue";
- std::string t = "tag";
- BasicConsumeBody in(version, 0, q, t, false, true, false, false,
- FieldTable());
+ QueueDeclareBody in(version, "name", "dlq", true, false, true, false, FieldTable());
in.encode(wbuff);
Buffer rbuff(buffer, sizeof(buffer));
- BasicConsumeBody out(version);
+ QueueDeclareBody out(version);
out.decode(rbuff);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
-
void testConnectionRedirectBodyFrame()
{
Buffer wbuff(buffer, sizeof(buffer));
std::string a = "hostA";
std::string b = "hostB";
- AMQFrame in(in_place<ConnectionRedirectBody>(version, a, b));
+ Array hosts(0x95);
+ hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(a)));
+ hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(b)));
+
+ AMQFrame in(in_place<ConnectionRedirectBody>(version, a, hosts));
in.setChannel(999);
in.encode(wbuff);
@@ -157,11 +142,10 @@ class FramingTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
- void testBasicConsumeOkBodyFrame()
+ void testMessageCancelBodyFrame()
{
Buffer wbuff(buffer, sizeof(buffer));
- std::string s = "hostA";
- AMQFrame in(in_place<BasicConsumeOkBody>(version, s));
+ AMQFrame in(in_place<MessageCancelBody>(version, "tag"));
in.setChannel(999);
in.encode(wbuff);
@@ -171,56 +155,6 @@ class FramingTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
- void testInlineContent() {
- Buffer wbuff(buffer, sizeof(buffer));
- Content content(INLINE, "MyData");
- CPPUNIT_ASSERT(content.isInline());
- content.encode(wbuff);
-
- Buffer rbuff(buffer, sizeof(buffer));
- Content recovered;
- recovered.decode(rbuff);
- CPPUNIT_ASSERT(recovered.isInline());
- CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue());
- }
-
- void testContentReference() {
- Buffer wbuff(buffer, sizeof(buffer));
- Content content(REFERENCE, "MyRef");
- CPPUNIT_ASSERT(content.isReference());
- content.encode(wbuff);
-
- Buffer rbuff(buffer, sizeof(buffer));
- Content recovered;
- recovered.decode(rbuff);
- CPPUNIT_ASSERT(recovered.isReference());
- CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue());
- }
-
- void testContentValidation() {
- try {
- Content content(REFERENCE, "");
- CPPUNIT_ASSERT(false);//fail, expected exception
- } catch (const InvalidArgumentException& e) {}
-
- try {
- Content content(2, "Blah");
- CPPUNIT_ASSERT(false);//fail, expected exception
- } catch (const SyntaxErrorException& e) {}
-
- try {
- Buffer wbuff(buffer, sizeof(buffer));
- wbuff.putOctet(2);
- wbuff.putLongString("blah, blah");
-
- Buffer rbuff(buffer, sizeof(buffer));
- Content content;
- content.decode(rbuff);
- CPPUNIT_FAIL("Expected exception");
- } catch (Exception& e) {}
-
- }
-
};
diff --git a/qpid/cpp/src/tests/HeaderTest.cpp b/qpid/cpp/src/tests/HeaderTest.cpp
index 9e2bddb4de..56be38a302 100644
--- a/qpid/cpp/src/tests/HeaderTest.cpp
+++ b/qpid/cpp/src/tests/HeaderTest.cpp
@@ -61,16 +61,13 @@ class HeaderTest : public CppUnit::TestCase
out.castBody<AMQHeaderBody>()->get<MessageProperties>(true);
props1->setContentLength(42);
- props1->setMessageId("messageId");
+ props1->setMessageId(Uuid(true));
props1->setCorrelationId("correlationId");
props1->setReplyTo(ReplyTo("ex","key"));
props1->setContentType("contentType");
props1->setContentEncoding("contentEncoding");
- props1->setType("type");
props1->setUserId("userId");
props1->setAppId("appId");
- props1->setTransactionId("transactionId");
- props1->setSecurityToken("securityToken");
char buff[10000];
Buffer wbuffer(buff, 10000);
@@ -87,11 +84,8 @@ class HeaderTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(props1->getCorrelationId(), props2->getCorrelationId());
CPPUNIT_ASSERT_EQUAL(props1->getContentType(), props2->getContentType());
CPPUNIT_ASSERT_EQUAL(props1->getContentEncoding(), props2->getContentEncoding());
- CPPUNIT_ASSERT_EQUAL(props1->getType(), props2->getType());
CPPUNIT_ASSERT_EQUAL(props1->getUserId(), props2->getUserId());
CPPUNIT_ASSERT_EQUAL(props1->getAppId(), props2->getAppId());
- CPPUNIT_ASSERT_EQUAL(props1->getTransactionId(), props2->getTransactionId());
- CPPUNIT_ASSERT_EQUAL(props1->getSecurityToken(), props2->getSecurityToken());
}
diff --git a/qpid/cpp/src/tests/MessageBuilderTest.cpp b/qpid/cpp/src/tests/MessageBuilderTest.cpp
index 092e02cc2f..7149ec50c7 100644
--- a/qpid/cpp/src/tests/MessageBuilderTest.cpp
+++ b/qpid/cpp/src/tests/MessageBuilderTest.cpp
@@ -22,6 +22,7 @@
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/TypeFilter.h"
#include "qpid_test_plugin.h"
#include <list>
@@ -101,7 +102,7 @@ class MessageBuilderTest : public CppUnit::TestCase
std::string key("builder-exchange");
AMQFrame method(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, exchange, 0, 0));
+ ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(0);
@@ -124,7 +125,7 @@ class MessageBuilderTest : public CppUnit::TestCase
std::string exchange("builder-exchange");
std::string key("builder-exchange");
- AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
AMQFrame content(in_place<AMQContentBody>(data));
method.setEof(false);
@@ -158,7 +159,7 @@ class MessageBuilderTest : public CppUnit::TestCase
std::string key("builder-exchange");
AMQFrame method(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, exchange, 0, 0));
+ ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
AMQFrame content1(in_place<AMQContentBody>(data1));
AMQFrame content2(in_place<AMQContentBody>(data2));
@@ -194,7 +195,7 @@ class MessageBuilderTest : public CppUnit::TestCase
std::string key("builder-exchange");
AMQFrame method(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, exchange, 0, 0));
+ ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
AMQFrame content1(in_place<AMQContentBody>(data1));
AMQFrame content2(in_place<AMQContentBody>(data2));
diff --git a/qpid/cpp/src/tests/MessageTest.cpp b/qpid/cpp/src/tests/MessageTest.cpp
index a19080e1ce..d7688c74a9 100644
--- a/qpid/cpp/src/tests/MessageTest.cpp
+++ b/qpid/cpp/src/tests/MessageTest.cpp
@@ -21,7 +21,9 @@
#include "qpid/broker/Message.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/Uuid.h"
#include "qpid_test_plugin.h"
@@ -44,14 +46,14 @@ class MessageTest : public CppUnit::TestCase
{
string exchange = "MyExchange";
string routingKey = "MyRoutingKey";
- string messageId = "MyMessage";
+ Uuid messageId(true);
string data1("abcdefg");
string data2("hijklmn");
intrusive_ptr<Message> msg(new Message());
AMQFrame method(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, exchange, 0, 0));
+ ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
AMQFrame content1(in_place<AMQContentBody>(data1));
AMQFrame content2(in_place<AMQContentBody>(data2));
diff --git a/qpid/cpp/src/tests/MessageUtils.h b/qpid/cpp/src/tests/MessageUtils.h
index 3def8cd41b..21ee834ba7 100644
--- a/qpid/cpp/src/tests/MessageUtils.h
+++ b/qpid/cpp/src/tests/MessageUtils.h
@@ -22,6 +22,8 @@
#include "qpid/broker/Message.h"
#include "qpid/broker/MessageDelivery.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/Uuid.h"
using namespace qpid;
using namespace broker;
@@ -29,12 +31,12 @@ using namespace framing;
struct MessageUtils
{
- static boost::intrusive_ptr<Message> createMessage(const string& exchange, const string& routingKey,
- const string& messageId, uint64_t contentSize = 0)
+ static boost::intrusive_ptr<Message> createMessage(const string& exchange="", const string& routingKey="",
+ const Uuid& messageId=Uuid(true), uint64_t contentSize = 0)
{
boost::intrusive_ptr<Message> msg(new Message());
- AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
msg->getFrames().append(method);
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 70132bce76..1d454d9f4a 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -23,6 +23,7 @@
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid_test_plugin.h"
#include <iostream>
#include "boost/format.hpp"
@@ -75,7 +76,7 @@ class QueueTest : public CppUnit::TestCase
intrusive_ptr<Message> message(std::string exchange, std::string routingKey) {
intrusive_ptr<Message> msg(new Message());
AMQFrame method(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, exchange, 0, 0));
+ ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
msg->getFrames().append(method);
msg->getFrames().append(header);
diff --git a/qpid/cpp/src/tests/TxAckTest.cpp b/qpid/cpp/src/tests/TxAckTest.cpp
index 89b98cb93c..b86f3d75e0 100644
--- a/qpid/cpp/src/tests/TxAckTest.cpp
+++ b/qpid/cpp/src/tests/TxAckTest.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "MessageUtils.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/RecoveryManager.h"
#include "qpid/broker/TxAck.h"
@@ -69,14 +70,8 @@ public:
TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries)
{
for(int i = 0; i < 10; i++){
- intrusive_ptr<Message> msg(new Message());
- AMQFrame method(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, "exchange", 0, 0));
- AMQFrame header(in_place<AMQHeaderBody>());
- msg->getFrames().append(method);
- msg->getFrames().append(header);
+ intrusive_ptr<Message> msg(MessageUtils::createMessage("exchange", "routing_key"));
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
- msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key");
messages.push_back(msg);
QueuedMessage qm(queue.get());
qm.payload = msg;
diff --git a/qpid/cpp/src/tests/client_test.cpp b/qpid/cpp/src/tests/client_test.cpp
index bd2a541c92..011dcd4678 100644
--- a/qpid/cpp/src/tests/client_test.cpp
+++ b/qpid/cpp/src/tests/client_test.cpp
@@ -33,12 +33,11 @@
#include "qpid/client/Message.h"
#include "qpid/client/Session.h"
#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/all_method_bodies.h"
using namespace qpid;
using namespace qpid::client;
-using qpid::framing::FrameSet;
-using qpid::framing::MessageTransferBody;
+using namespace qpid::framing;
using std::string;
struct Args : public qpid::TestOptions {
@@ -104,14 +103,14 @@ int main(int argc, char** argv)
if (opts.trace) std::cout << "Declared queue." << std::endl;
//now bind the queue to the exchange
- session.queueBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::routingKey="MyKey");
+ session.exchangeBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::bindingKey="MyKey");
if (opts.trace) std::cout << "Bound queue to exchange." << std::endl;
//create and send a message to the exchange using the routing
//key we bound our queue with:
Message msgOut(generateData(opts.msgSize));
msgOut.getDeliveryProperties().setRoutingKey("MyKey");
- session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut);
+ session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut, arg::acceptMode=1);
if (opts.trace) print("Published message: ", msgOut);
//subscribe to the queue, add sufficient credit and then get
@@ -132,6 +131,8 @@ int main(int argc, char** argv)
} else {
print("Received an unexepected message: ", msgIn);
}
+ } else {
+ throw Exception("Unexpected command received");
}
//close the session & connection
diff --git a/qpid/cpp/src/tests/exception_test.cpp b/qpid/cpp/src/tests/exception_test.cpp
index f7a91f662f..7c68973d4d 100644
--- a/qpid/cpp/src/tests/exception_test.cpp
+++ b/qpid/cpp/src/tests/exception_test.cpp
@@ -96,7 +96,8 @@ QPID_AUTO_TEST_CASE(DisconnectedListen) {
QPID_AUTO_TEST_CASE(NoSuchQueueTest) {
ProxySessionFixture fix;
- BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue").sync(), NotFoundException);
+ fix.session.setSynchronous(true);
+ BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue"), NotFoundException);
}
QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/latencytest.cpp b/qpid/cpp/src/tests/latencytest.cpp
index 2b44a5477a..a61a4b2e42 100644
--- a/qpid/cpp/src/tests/latencytest.cpp
+++ b/qpid/cpp/src/tests/latencytest.cpp
@@ -192,7 +192,7 @@ Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0
mgr.setAckPolicy(AckPolicy(opts.ack ? opts.ack : (opts.prefetch / 2)));
mgr.setFlowControl(opts.prefetch, SubscriptionManager::UNLIMITED, true);
} else {
- mgr.setConfirmMode(false);
+ mgr.setAcceptMode(1/*not-required*/);
mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
}
mgr.subscribe(*this, queue);
@@ -257,14 +257,13 @@ void Sender::sendByCount()
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
}
- Completion c;
for (uint i = 0; i < opts.count; i++) {
uint64_t sentAt(current_time());
msg.getDeliveryProperties().setTimestamp(sentAt);
//msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
- c = session.messageTransfer(arg::content=msg);
+ session.messageTransfer(arg::content=msg, arg::acceptMode=1);
}
- c.sync();
+ session.sync();
}
void Sender::sendByRate()
@@ -283,7 +282,7 @@ void Sender::sendByRate()
uint64_t sentAt(current_time());
msg.getDeliveryProperties().setTimestamp(sentAt);
//msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
- session.messageTransfer(arg::content=msg);
+ session.messageTransfer(arg::content=msg, arg::acceptMode=1);
}
uint64_t timeTaken = (current_time() - start) / TIME_USEC;
if (timeTaken < 1000) {
diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp
index 3dd4e876fc..966d708ff6 100644
--- a/qpid/cpp/src/tests/perftest.cpp
+++ b/qpid/cpp/src/tests/perftest.cpp
@@ -210,7 +210,8 @@ struct Setup : public Client {
void queueInit(string name, bool durable=false, const framing::FieldTable& settings=framing::FieldTable()) {
session.queueDeclare(arg::queue=name, arg::durable=durable, arg::arguments=settings);
- session.queuePurge(arg::queue=name).sync();
+ session.queuePurge(arg::queue=name);
+ session.sync();
}
void run() {
@@ -334,7 +335,7 @@ struct Controller : public Client {
<< endl;
Message msg(data, queue);
for (size_t i = 0; i < n; ++i)
- session.messageTransfer(arg::content=msg);
+ session.messageTransfer(arg::content=msg, arg::acceptMode=1);
}
void run() { // Controller
@@ -421,7 +422,6 @@ struct PublishThread : public Client {
}
void run() { // Publisher
- Completion completion;
try {
string data;
size_t offset(0);
@@ -459,19 +459,19 @@ struct PublishThread : public Client {
// any heap allocation.
const_cast<std::string&>(msg.getData()).replace(offset, sizeof(uint32_t),
reinterpret_cast<const char*>(&i), sizeof(uint32_t));
- completion = session.messageTransfer(
+ session.messageTransfer(
arg::destination=destination,
arg::content=msg,
- arg::confirmMode=opts.confirm);
- if (opts.intervalPub) ::usleep(opts.intervalPub*1000);
+ arg::acceptMode=1);
+ if (opts.intervalPub) ::usleep(opts.intervalPub*1000);
}
- if (opts.confirm) completion.sync();
+ if (opts.confirm) session.sync();
AbsTime end=now();
double time=secs(start,end);
// Send result to controller.
Message report(lexical_cast<string>(opts.count/time), "pub_done");
- session.messageTransfer(arg::content=report);
+ session.messageTransfer(arg::content=report, arg::acceptMode=1);
}
session.close();
}
@@ -496,9 +496,9 @@ struct SubscribeThread : public Client {
arg::exclusive=true,
arg::autoDelete=true,
arg::durable=opts.durable);
- session.queueBind(arg::queue=queue,
- arg::exchange=ex,
- arg::routingKey=key);
+ session.exchangeBind(arg::queue=queue,
+ arg::exchange=ex,
+ arg::bindingKey=key);
}
void verify(bool cond, const char* test, uint32_t expect, uint32_t actual) {
@@ -506,7 +506,7 @@ struct SubscribeThread : public Client {
Message error(
QPID_MSG("Sequence error: expected n" << test << expect << " but got " << actual),
"sub_done");
- session.messageTransfer(arg::content=error);
+ session.messageTransfer(arg::content=error, arg::acceptMode=1);
throw Exception(error.getData());
}
}
@@ -515,12 +515,12 @@ struct SubscribeThread : public Client {
try {
SubscriptionManager subs(session);
LocalQueue lq(AckPolicy(opts.ack));
- subs.setConfirmMode(opts.ack > 0);
+ subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
false);
subs.subscribe(lq, queue);
// Notify controller we are ready.
- session.messageTransfer(arg::content=Message("ready", "sub_ready"));
+ session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1);
for (size_t j = 0; j < opts.iterations; ++j) {
@@ -556,7 +556,7 @@ struct SubscribeThread : public Client {
// Report to publisher.
Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
"sub_done");
- session.messageTransfer(arg::content=result);
+ session.messageTransfer(arg::content=result, arg::acceptMode=1);
}
session.close();
}
diff --git a/qpid/cpp/src/tests/topic_listener.cpp b/qpid/cpp/src/tests/topic_listener.cpp
index e5e7d24112..5208b67445 100644
--- a/qpid/cpp/src/tests/topic_listener.cpp
+++ b/qpid/cpp/src/tests/topic_listener.cpp
@@ -114,7 +114,7 @@ int main(int argc, char** argv){
} else {
session.queueDeclare(arg::queue=control, arg::exclusive=true, arg::autoDelete=true);
}
- session.queueBind(arg::exchange="amq.topic", arg::queue=control, arg::routingKey="topic_control");
+ session.exchangeBind(arg::exchange="amq.topic", arg::queue=control, arg::bindingKey="topic_control");
//set up listener
SubscriptionManager mgr(session);
@@ -123,7 +123,7 @@ int main(int argc, char** argv){
mgr.setAckPolicy(AckPolicy(args.ack ? args.ack : (args.prefetch / 2)));
mgr.setFlowControl(args.prefetch, SubscriptionManager::UNLIMITED, true);
} else {
- mgr.setConfirmMode(false);
+ mgr.setAcceptMode(1/*-not-required*/);
mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
}
mgr.subscribe(listener, control);
@@ -181,7 +181,7 @@ void Listener::report(){
<< time/TIME_MSEC << " ms.";
Message msg(reportstr.str(), responseQueue);
msg.getHeaders().setString("TYPE", "REPORT");
- session.messageTransfer(arg::content=msg);
+ session.messageTransfer(arg::content=msg, arg::acceptMode=1);
if(transactional){
session.txCommit();
}
diff --git a/qpid/cpp/src/tests/topic_publisher.cpp b/qpid/cpp/src/tests/topic_publisher.cpp
index 2271849c35..8242530db1 100644
--- a/qpid/cpp/src/tests/topic_publisher.cpp
+++ b/qpid/cpp/src/tests/topic_publisher.cpp
@@ -164,12 +164,12 @@ int64_t Publisher::publish(int msgs, int listeners, int size){
AbsTime start = now();
for(int i = 0; i < msgs; i++){
- session.messageTransfer(arg::content=msg, arg::destination="amq.topic");
+ session.messageTransfer(arg::content=msg, arg::destination="amq.topic", arg::acceptMode=1);
}
//send report request
Message reportRequest("", controlTopic);
reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
- session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic");
+ session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic", arg::acceptMode=1);
if(transactional){
session.txCommit();
}
@@ -198,7 +198,7 @@ void Publisher::terminate(){
//send termination request
Message terminationRequest("", controlTopic);
terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
- session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic");
+ session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic", arg::acceptMode=1);
if(transactional){
session.txCommit();
}
diff --git a/qpid/cpp/src/tests/txtest.cpp b/qpid/cpp/src/tests/txtest.cpp
index 4c5814986c..5030b24070 100644
--- a/qpid/cpp/src/tests/txtest.cpp
+++ b/qpid/cpp/src/tests/txtest.cpp
@@ -142,7 +142,7 @@ struct Transfer : public Client, public Runnable
out.setData(in.getData());
out.getMessageProperties().setCorrelationId(in.getMessageProperties().getCorrelationId());
out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode());
- session.messageTransfer(arg::content=out);
+ session.messageTransfer(arg::content=out, arg::acceptMode=1);
}
in.acknowledge();
session.txCommit();
@@ -168,7 +168,8 @@ struct Controller : public Client
{
//declare queues
for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
- session.queueDeclare(arg::queue=*i, arg::durable=opts.durable).sync();
+ session.queueDeclare(arg::queue=*i, arg::durable=opts.durable);
+ session.sync();
}
Message msg(generateData(opts.size), *queues.begin());
@@ -179,7 +180,7 @@ struct Controller : public Client
//publish messages
for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) {
msg.getMessageProperties().setCorrelationId(*i);
- session.messageTransfer(arg::content=msg);
+ session.messageTransfer(arg::content=msg, arg::acceptMode=1);
}
}
@@ -205,7 +206,7 @@ struct Controller : public Client
{
SubscriptionManager subs(session);
subs.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
- subs.setConfirmMode(false);
+ subs.setAcceptMode(1/*not-required*/);
StringSet drained;
//drain each queue and verify the correct set of messages are available
@@ -213,7 +214,8 @@ struct Controller : public Client
//subscribe, allocate credit and flush
LocalQueue lq(AckPolicy(0));//manual acking
subs.subscribe(lq, *i, *i);
- session.messageFlush(arg::destination=*i).sync();
+ session.messageFlush(arg::destination=*i);
+ session.sync();
uint count(0);
while (!lq.empty()) {
diff --git a/qpid/cpp/xml/extra.xml b/qpid/cpp/xml/extra.xml
index 36a76765d5..5cb37ae65f 100644
--- a/qpid/cpp/xml/extra.xml
+++ b/qpid/cpp/xml/extra.xml
@@ -623,7 +623,7 @@
<class name="message010" index="4">
<doc>blah, blah</doc>
- <method name = "transfer" index="1">
+ <method name = "transfer" content="1" index="1">
<doc>blah, blah</doc>
<chassis name="server" implement="MUST" />
<chassis name="client" implement="MUST" />
@@ -818,7 +818,7 @@
<method name = "declare" index="1">
<doc>blah, blah</doc>
<chassis name="server" implement="MUST" />
- <field name="name" domain="shortstr"/>
+ <field name="exchange" domain="shortstr"/>
<field name="type" domain="shortstr"/>
<field name="alternate-exchange" domain="shortstr"/>
<field name="passive" domain="bit"/>
@@ -829,7 +829,7 @@
<method name = "delete" index="2">
<doc>blah, blah</doc>
<chassis name="server" implement="MUST" />
- <field name="name" domain="shortstr"/>
+ <field name="exchange" domain="shortstr"/>
<field name="if-unused" domain="bit"/>
</method>
<method name = "query" index="3">
@@ -863,8 +863,8 @@
<method name = "bound" index="6">
<doc>blah, blah</doc>
<chassis name="server" implement="MUST" />
- <field name="queue" domain="shortstr"/>
<field name="exchange" domain="shortstr"/>
+ <field name="queue" domain="shortstr"/>
<field name="binding-key" domain="shortstr"/>
<field name="arguments" domain="table"/>
<result>
@@ -884,7 +884,7 @@
<method name = "declare" index="1">
<doc>blah, blah</doc>
<chassis name="server" implement="MUST" />
- <field name="name" domain="shortstr"/>
+ <field name="queue" domain="shortstr"/>
<field name="alternate-exchange" domain="shortstr"/>
<field name="passive" domain="bit"/>
<field name="durable" domain="bit"/>
@@ -895,25 +895,25 @@
<method name = "delete" index="2">
<doc>blah, blah</doc>
<chassis name="server" implement="MUST" />
- <field name="name" domain="shortstr"/>
+ <field name="queue" domain="shortstr"/>
<field name="if-unused" domain="bit"/>
<field name="if-empty" domain="bit"/>
</method>
<method name = "purge" index="3">
<doc>blah, blah</doc>
<chassis name="server" implement="MUST" />
- <field name="name" domain="shortstr"/>
+ <field name="queue" domain="shortstr"/>
</method>
<method name = "query" index="4">
<doc>blah, blah</doc>
<chassis name="server" implement="MUST" />
- <field name="name" domain="shortstr"/>
+ <field name="queue" domain="shortstr"/>
<result>
<struct size="long" type="1">
<field name="name" domain="shortstr"/>
<field name="alternate-exchange" domain="shortstr"/>
- <field name="passive" domain="bit"/>
<field name="durable" domain="bit"/>
+ <field name="exclusive" domain="bit"/>
<field name="auto-delete" domain="bit"/>
<field name="arguments" domain="table"/>
<field name="message-count" domain="long"/>