summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java13
-rw-r--r--cpp/gentools/src/org/apache/qpid/gentools/Main.java4
-rw-r--r--cpp/gentools/templ.cpp/AMQP_MethodVersionMap.h.tmpl57
-rw-r--r--cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl24
-rwxr-xr-xcpp/rubygen/amqpgen.rb9
-rwxr-xr-xcpp/rubygen/cppgen.rb9
-rwxr-xr-xcpp/rubygen/templates/MethodBodyConstVisitor.rb27
-rwxr-xr-xcpp/rubygen/templates/MethodHolder.rb37
-rwxr-xr-xcpp/rubygen/templates/Proxy.rb2
-rw-r--r--cpp/rubygen/templates/Session.rb4
-rwxr-xr-xcpp/rubygen/templates/all_method_bodies.rb21
-rw-r--r--cpp/src/Makefile.am8
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp6
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h6
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.cpp59
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.h8
-rw-r--r--cpp/src/qpid/broker/BrokerMessageBase.h13
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.cpp232
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.h15
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.cpp4
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.h8
-rw-r--r--cpp/src/qpid/broker/Content.h2
-rw-r--r--cpp/src/qpid/broker/InMemoryContent.cpp26
-rw-r--r--cpp/src/qpid/broker/InMemoryContent.h7
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.cpp6
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.h2
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp4
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.h4
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp5
-rw-r--r--cpp/src/qpid/broker/Reference.cpp6
-rw-r--r--cpp/src/qpid/broker/Reference.h7
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp24
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h20
-rw-r--r--cpp/src/qpid/client/ChannelHandler.cpp44
-rw-r--r--cpp/src/qpid/client/ChannelHandler.h8
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp10
-rw-r--r--cpp/src/qpid/client/Connection.h1
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp65
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.h8
-rw-r--r--cpp/src/qpid/client/Correlator.cpp2
-rw-r--r--cpp/src/qpid/client/Correlator.h4
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp40
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h4
-rw-r--r--cpp/src/qpid/client/FutureResponse.cpp8
-rw-r--r--cpp/src/qpid/client/FutureResponse.h8
-rw-r--r--cpp/src/qpid/client/ReceivedContent.cpp21
-rw-r--r--cpp/src/qpid/client/ReceivedContent.h28
-rw-r--r--cpp/src/qpid/client/Response.h4
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp4
-rw-r--r--cpp/src/qpid/client/SessionCore.h4
-rw-r--r--cpp/src/qpid/cluster/ClassifierHandler.cpp6
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/SessionManager.cpp12
-rw-r--r--cpp/src/qpid/framing/AMQBody.h77
-rw-r--r--cpp/src/qpid/framing/AMQContentBody.cpp2
-rw-r--r--cpp/src/qpid/framing/AMQContentBody.h5
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp117
-rw-r--r--cpp/src/qpid/framing/AMQFrame.h87
-rw-r--r--cpp/src/qpid/framing/AMQHeaderBody.cpp42
-rw-r--r--cpp/src/qpid/framing/AMQHeaderBody.h16
-rw-r--r--cpp/src/qpid/framing/AMQHeartbeatBody.h3
-rw-r--r--cpp/src/qpid/framing/AMQMethodBody.cpp42
-rw-r--r--cpp/src/qpid/framing/AMQMethodBody.h67
-rw-r--r--cpp/src/qpid/framing/BasicHeaderProperties.h2
-rw-r--r--cpp/src/qpid/framing/Blob.cpp30
-rw-r--r--cpp/src/qpid/framing/Blob.h55
-rw-r--r--cpp/src/qpid/framing/BodyHandler.cpp13
-rw-r--r--cpp/src/qpid/framing/BodyHandler.h12
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.cpp5
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.h6
-rw-r--r--cpp/src/qpid/framing/Frame.cpp117
-rw-r--r--cpp/src/qpid/framing/Frame.h72
-rw-r--r--cpp/src/qpid/framing/HeaderProperties.h2
-rw-r--r--cpp/src/qpid/framing/MethodHolder.cpp14
-rw-r--r--cpp/src/qpid/framing/MethodHolder.h54
-rw-r--r--cpp/src/qpid/framing/amqp_framing.h1
-rw-r--r--cpp/src/qpid/framing/amqp_types.h2
-rw-r--r--cpp/src/qpid/shared_ptr.h2
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp4
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp24
-rw-r--r--cpp/src/tests/Cluster.cpp8
-rw-r--r--cpp/src/tests/Cluster.h2
-rw-r--r--cpp/src/tests/Cluster_child.cpp2
-rw-r--r--cpp/src/tests/FramingTest.cpp93
-rw-r--r--cpp/src/tests/InMemoryContentTest.cpp9
-rw-r--r--cpp/src/tests/InProcessBroker.h2
-rw-r--r--cpp/src/tests/LazyLoadedContentTest.cpp3
-rw-r--r--cpp/src/tests/Makefile.am5
-rw-r--r--cpp/src/tests/MessageBuilderTest.cpp46
-rw-r--r--cpp/src/tests/MessageTest.cpp17
-rw-r--r--cpp/src/tests/MockChannel.h11
-rw-r--r--cpp/src/tests/ReferenceTest.cpp22
-rw-r--r--cpp/src/tests/TxAckTest.cpp3
-rw-r--r--cpp/src/tests/TxPublishTest.cpp3
-rw-r--r--cpp/src/tests/Visitor.cpp58
95 files changed, 934 insertions, 1185 deletions
diff --git a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
index 80ce266f39..114e3308d3 100644
--- a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
+++ b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
@@ -1164,7 +1164,7 @@ public class CppGenerator extends Generator
{
String[] fieldDomainPair = ordinalFieldMap.get(thisOrdinal);
sb.append(indent + "" + setRef(fieldDomainPair[FIELD_CODE_TYPE]) + " get" +
- Utils.firstUpper(fieldDomainPair[FIELD_NAME]) + "() { return " +
+ Utils.firstUpper(fieldDomainPair[FIELD_NAME]) + "() const { return " +
fieldDomainPair[FIELD_NAME] + "; }" + cr);
}
return sb.toString();
@@ -1443,12 +1443,13 @@ public class CppGenerator extends Generator
StringBuffer sb = new StringBuffer();
if (method.fieldMap.size() > 0)
{
- sb.append(indent + thisClass.name + Utils.firstUpper(method.name) + "Body(ProtocolVersion version," + cr);
+ sb.append(indent + thisClass.name + Utils.firstUpper(method.name) + "Body(ProtocolVersion," + cr);
sb.append(generateFieldList(method.fieldMap, version, true, false, 8));
- sb.append(indent + tab + ") : " + baseClass(method, version) + "(version");
- sb.append(")");
- if (method.fieldMap.size() > 0)
- sb.append(", \n" + generateFieldList(method.fieldMap, version, false, true, 8));
+ sb.append(indent + tab + ")");
+ if (method.fieldMap.size() > 0) {
+ sb.append(" : ");
+ sb.append(generateFieldList(method.fieldMap, version, false, true, 8));
+ }
sb.append(indent + "{ }\n");
}
return sb.toString();
diff --git a/cpp/gentools/src/org/apache/qpid/gentools/Main.java b/cpp/gentools/src/org/apache/qpid/gentools/Main.java
index 74e1ce1ab9..a25ddd103c 100644
--- a/cpp/gentools/src/org/apache/qpid/gentools/Main.java
+++ b/cpp/gentools/src/org/apache/qpid/gentools/Main.java
@@ -234,9 +234,7 @@ public class Main
new File(tmplDir + Utils.fileSeparator + "AMQP_ServerOperations.h.tmpl"),
new File(tmplDir + Utils.fileSeparator + "AMQP_ClientOperations.h.tmpl"),
new File(tmplDir + Utils.fileSeparator + "AMQP_Constants.h.tmpl"),
- new File(tmplDir + Utils.fileSeparator + "AMQP_MethodVersionMap.h.tmpl"),
- new File(tmplDir + Utils.fileSeparator + "AMQP_MethodVersionMap.cpp.tmpl"),
- new File(tmplDir + Utils.fileSeparator + "AMQP_HighestVersion.h.tmpl")
+ new File(tmplDir + Utils.fileSeparator + "AMQP_HighestVersion.h.tmpl")
};
methodTemplateFiles = new File[]
{
diff --git a/cpp/gentools/templ.cpp/AMQP_MethodVersionMap.h.tmpl b/cpp/gentools/templ.cpp/AMQP_MethodVersionMap.h.tmpl
deleted file mode 100644
index 83c3065a68..0000000000
--- a/cpp/gentools/templ.cpp/AMQP_MethodVersionMap.h.tmpl
+++ /dev/null
@@ -1,57 +0,0 @@
-&{AMQP_MethodVersionMap.h}
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/*
- * This file is auto-generated by ${GENERATOR} - do not modify.
- * Supported AMQP versions:
-%{VLIST} * ${major}-${minor}
- */
-
-#ifndef qpid_framing_AMQP_MethodVersionMap__
-#define qpid_framing_AMQP_MethodVersionMap__
-
-#include <map>
-#include "qpid/framing/AMQMethodBody.h"
-
-%{MLIST} ${mc_method_body_include}
-
-namespace qpid
-{
-namespace framing
-{
-
-template <class T> AMQMethodBody* createMethodBodyFn(u_int8_t major, u_int8_t minor) { return new T(ProtocolVersion(major, minor)); }
-typedef AMQMethodBody* (*fnPtr)(u_int8_t, u_int8_t);
-
-class AMQP_MethodVersionMap: public std::map<u_int64_t, fnPtr>
-{
-protected:
- u_int64_t createMapKey(u_int16_t classId, u_int16_t methodId, u_int8_t major, u_int8_t minor);
-public:
- AMQP_MethodVersionMap();
- AMQMethodBody* createMethodBody(u_int16_t classId, u_int16_t methodId, u_int8_t major, u_int8_t minor);
-};
-
-} /* namespace framing */
-} /* namespace qpid */
-
-#endif
diff --git a/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl b/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl
index 605b09ac94..5ffd2f2b4d 100644
--- a/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl
+++ b/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl
@@ -38,6 +38,9 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FramingContent.h"
#include "qpid/framing/SequenceNumberSet.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/MethodBodyConstVisitor.h"
+
namespace qpid
{
@@ -45,7 +48,7 @@ namespace framing
{
${version_namespace_start}
-class ${CLASS}${METHOD}Body : public ${mb_base_class}
+class ${CLASS}${METHOD}Body : public AMQMethodBody
{
// Method field declarations
@@ -57,13 +60,11 @@ public:
static const ClassId CLASS_ID= ${CLASS_ID_INIT};
static const MethodId METHOD_ID = ${METHOD_ID_INIT};
- typedef boost::shared_ptr<${CLASS}${METHOD}Body> shared_ptr;
-
- // Constructors and destructors
+ // Constructors and destructors
${mb_constructor_with_initializers}
- ${CLASS}${METHOD}Body(ProtocolVersion version=ProtocolVersion() ): ${mb_base_class}(version) {}
+ ${CLASS}${METHOD}Body(ProtocolVersion=ProtocolVersion()) {}
virtual ~${CLASS}${METHOD}Body() {}
// Attribute get methods
@@ -71,10 +72,11 @@ ${mb_constructor_with_initializers}
%{FLIST} ${mb_field_get_method}
// Helper methods
-
- inline void print(std::ostream& out) const
+ using AMQMethodBody::accept;
+ void accept(MethodBodyConstVisitor& v) const { v.visit(*this); }
+
+ void print(std::ostream& out) const
{
- printPrefix(out);
out << "${CLASS}${METHOD}: ";
%{FLIST} ${mb_field_print}
}
@@ -84,17 +86,17 @@ ${mb_constructor_with_initializers}
u_int32_t size() const
{
- u_int32_t sz = baseSize();
+ u_int32_t sz = 0;
%{FLIST} ${mb_body_size}
return sz;
}
- void encodeContent(Buffer& ${mb_buffer_param}) const
+ void encode(Buffer& ${mb_buffer_param}) const
{
%{FLIST} ${mb_encode}
}
- inline void decodeContent(Buffer& ${mb_buffer_param})
+ inline void decode(Buffer& ${mb_buffer_param}, uint32_t=0)
{
%{FLIST} ${mb_decode}
}
diff --git a/cpp/rubygen/amqpgen.rb b/cpp/rubygen/amqpgen.rb
index a144825f08..333c7dd654 100755
--- a/cpp/rubygen/amqpgen.rb
+++ b/cpp/rubygen/amqpgen.rb
@@ -115,11 +115,7 @@ class AmqpClass < AmqpElement
# chassis should be "client" or "server"
def amqp_methods_on(chassis)
@cache_amqp_methods_on ||= { }
-
- els = elements.collect("method/chassis[@name='#{chassis}']/..") { |m|
- AmqpMethod.new(m,self)
- }.sort_by_name
- @cache_amqp_methods_on[chassis] ||= els
+ @cache_amqp_methods_on[chassis] ||= elements.collect("method/chassis[@name='#{chassis}']/..") { |m| AmqpMethod.new(m,self) }.sort_by_name
end
end
@@ -153,7 +149,8 @@ class AmqpRoot < AmqpElement
end
def amqp_classes()
- @cache_amqp_classes ||= elements.collect("class") { |c| AmqpClass.new(c,self) }.sort_by_name
+ @cache_amqp_classes ||= elements.collect("class") { |c|
+ AmqpClass.new(c,self) }.sort_by_name
end
# Return all methods on all classes.
diff --git a/cpp/rubygen/cppgen.rb b/cpp/rubygen/cppgen.rb
index 724634e514..a3314c7e11 100755
--- a/cpp/rubygen/cppgen.rb
+++ b/cpp/rubygen/cppgen.rb
@@ -63,6 +63,7 @@ end
class AmqpField
def cppname() @cache_cppname ||= name.lcaps.cppsafe; end
def cpptype() @cache_cpptype ||= amqp_root.param_type(field_type); end
+ def cppret_type() @cache_cpptype ||= amqp_root.return_type(field_type); end
def type_name () @type_name ||= cpptype+" "+cppname; end
end
@@ -159,8 +160,12 @@ class CppGen < Generator
def struct_class(type, name, bases, &block)
genl
gen "#{type} #{name}"
- gen ": #{bases.join(', ')}" unless bases.empty?
- scope(" {","};") { yield }
+ if (!bases.empty?)
+ genl ":"
+ indent { gen "#{bases.join(",\n")}" }
+ end
+ genl
+ scope("{","};") { yield }
end
def struct(name, *bases, &block) struct_class("struct", name, bases, &block); end
diff --git a/cpp/rubygen/templates/MethodBodyConstVisitor.rb b/cpp/rubygen/templates/MethodBodyConstVisitor.rb
new file mode 100755
index 0000000000..6fd7fe8ead
--- /dev/null
+++ b/cpp/rubygen/templates/MethodBodyConstVisitor.rb
@@ -0,0 +1,27 @@
+#!/usr/bin/env ruby
+$: << ".." # Include .. in load path
+require 'cppgen'
+
+class MethodBodyConstVisitorGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @namespace="qpid::framing"
+ @classname="MethodBodyConstVisitor"
+ @filename="qpid/framing/MethodBodyConstVisitor"
+ end
+
+ def generate()
+ h_file("#{@filename}") {
+ namespace(@namespace) {
+ @amqp.amqp_methods.each { |m| genl "class #{m.body_name};" }
+ cpp_class("MethodBodyConstVisitor") {
+ genl "public:"
+ genl "virtual ~MethodBodyConstVisitor() {}"
+ @amqp.amqp_methods.each { |m| genl "virtual void visit(const #{m.body_name}&) = 0;" }
+ }}}
+ end
+end
+
+MethodBodyConstVisitorGen.new(Outdir, Amqp).generate();
+
diff --git a/cpp/rubygen/templates/MethodHolder.rb b/cpp/rubygen/templates/MethodHolder.rb
index 65fa824fd4..39a570c982 100755
--- a/cpp/rubygen/templates/MethodHolder.rb
+++ b/cpp/rubygen/templates/MethodHolder.rb
@@ -40,23 +40,42 @@ EOS
def gen_construct
cpp_file(@filename+"_construct") {
include @filename
+ include "qpid/framing/MethodBodyConstVisitor.h"
@amqp.amqp_methods.each { |m| include "qpid/framing/#{m.body_name}" }
genl
- namespace(@namespace) {
- scope("void #{@classname}::construct(const Id& newId) {") {
- scope("switch (newId.first) {") {
+ include "qpid/Exception.h"
+ genl
+ namespace(@namespace) {
+ # construct function
+ scope("void #{@classname}::construct(ClassId c, MethodId m) {") {
+ scope("switch (c) {") {
@amqp.amqp_classes.each { |c|
- scope("case #{c.index}: switch(newId.second) {") {
+ scope("case #{c.index}: switch(m) {") {
c.amqp_methods.each { |m|
genl "case #{m.index}: blob.construct(in_place<#{m.body_name}>()); break;"
- }}
+ }
+ genl "default: throw Exception(QPID_MSG(\"Invalid method id \" << m << \" for class #{c.name} \"));"
+ }
genl "break;"
- }}
- genl "id=newId;";
- }}}
+ }
+ genl "default: throw Exception(QPID_MSG(\"Invalid class id \" << c));"
+ }
+ }
+ # CopyVisitor
+ struct("#{@classname}::CopyVisitor", "public MethodBodyConstVisitor") { genl "MethodHolder& holder;"
+ genl "CopyVisitor(MethodHolder& h) : holder(h) {}"
+ @amqp.amqp_methods.each { |m|
+ genl "void visit(const #{m.body_name}& x) { holder.blob=x; }"
+ }
+ }
+ genl
+ # operator=
+ scope("#{@classname}& MethodHolder::operator=(const AMQMethodBody& m) {") {
+ genl "CopyVisitor cv(*this); m.accept(cv); return *this;"
+ }
+ }}
end
-
def generate
gen_max_size
gen_construct
diff --git a/cpp/rubygen/templates/Proxy.rb b/cpp/rubygen/templates/Proxy.rb
index f36d6bcd99..a6c0763a8a 100755
--- a/cpp/rubygen/templates/Proxy.rb
+++ b/cpp/rubygen/templates/Proxy.rb
@@ -38,7 +38,7 @@ EOS
genl "void #{@classname}::#{cname}::#{m.cppname}(#{m.signature.join(", ")})"
scope {
params=(["channel.getVersion()"]+m.param_names).join(", ")
- genl "channel.send(make_shared_ptr(new #{m.body_name}(#{params})));"
+ genl "channel.send(#{m.body_name}(#{params}));"
}}
end
diff --git a/cpp/rubygen/templates/Session.rb b/cpp/rubygen/templates/Session.rb
index eaa4347974..4265731d32 100644
--- a/cpp/rubygen/templates/Session.rb
+++ b/cpp/rubygen/templates/Session.rb
@@ -37,7 +37,7 @@ class SessionGen < CppGen
indent { gen params.join(",\n") }
gen "){\n\n"
indent (2) {
- gen "return impl->send(AMQMethodBody::shared_ptr(new #{m.body_name}("
+ gen "return impl->send(#{m.body_name}("
params = ["version"] + m.param_names
gen params.join(", ")
other_params=[]
@@ -49,7 +49,7 @@ class SessionGen < CppGen
else
other_params << "true"
end
- gen ")), #{other_params.join(", ")});\n"
+ gen "), #{other_params.join(", ")});\n"
}
gen "}\n\n"
end
diff --git a/cpp/rubygen/templates/all_method_bodies.rb b/cpp/rubygen/templates/all_method_bodies.rb
new file mode 100755
index 0000000000..38fbc31593
--- /dev/null
+++ b/cpp/rubygen/templates/all_method_bodies.rb
@@ -0,0 +1,21 @@
+#!/usr/bin/env ruby
+$: << ".." # Include .. in load path
+require 'cppgen'
+
+class AllMethodBodiesGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @namespace="qpid::framing"
+ @filename="qpid/framing/all_method_bodies"
+ end
+
+ def generate()
+ h_file(@filename) {
+ @amqp.amqp_methods.each { |m| include "qpid/framing/"+m.body_name }
+ }
+ end
+end
+
+AllMethodBodiesGen.new(Outdir, Amqp).generate();
+
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 0f91faba63..977db7ea36 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -46,8 +46,10 @@ rgen_tdir=$(rgen_dir)/templates
rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)/generate
rgen_templates=$(rgen_tdir)/MethodHolder.rb \
+ $(rgen_tdir)/MethodBodyConstVisitor.rb \
$(rgen_tdir)/frame_body_lists.rb \
$(rgen_tdir)/Session.rb \
+ $(rgen_tdir)/all_method_bodies.rb \
$(rgen_tdir)/Proxy.rb
rgen_generator=$(rgen_dir)/generate \
@@ -135,11 +137,11 @@ libqpidcommon_la_LIBADD = \
libqpidcommon_la_SOURCES = \
$(platform_src) \
qpid/framing/AMQBody.cpp \
+ qpid/framing/AMQMethodBody.cpp \
qpid/framing/AMQContentBody.cpp \
qpid/framing/AMQFrame.cpp \
qpid/framing/AMQHeaderBody.cpp \
qpid/framing/AMQHeartbeatBody.cpp \
- qpid/framing/AMQMethodBody.cpp \
qpid/framing/FrameHandler.h \
qpid/framing/BasicHeaderProperties.cpp \
qpid/framing/BodyHandler.cpp \
@@ -162,10 +164,12 @@ libqpidcommon_la_SOURCES = \
qpid/framing/Blob.h \
qpid/framing/AMQP_ClientProxy.cpp \
qpid/framing/AMQP_ServerProxy.cpp \
+ qpid/framing/variant.h \
gen/qpid/framing/AMQP_HighestVersion.h \
- gen/qpid/framing/AMQP_MethodVersionMap.cpp \
+ qpid/framing/Blob.cpp \
qpid/framing/MethodHolder.h qpid/framing/MethodHolder.cpp \
qpid/framing/MethodHolder_construct.cpp \
+ qpid/framing/MethodHolderMaxSize.h \
qpid/Exception.cpp \
qpid/Plugin.h \
qpid/Plugin.cpp \
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index 6e577ab354..e135e960c4 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -307,19 +307,19 @@ void Channel::handlePublish(Message* _message)
messageBuilder.initialise(message);
}
-void Channel::handleHeader(AMQHeaderBody::shared_ptr header)
+void Channel::handleHeader(AMQHeaderBody* header)
{
messageBuilder.setHeader(header);
//at this point, decide based on the size of the message whether we want
//to stage it by saving content directly to disk as it arrives
}
-void Channel::handleContent(AMQContentBody::shared_ptr content)
+void Channel::handleContent(AMQContentBody* content)
{
messageBuilder.addContent(content);
}
-void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {
+void Channel::handleHeartbeat(AMQHeartbeatBody*) {
// TODO aconway 2007-01-17: Implement heartbeating.
}
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index 1f4f6f35e7..021110cf8c 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -170,9 +170,9 @@ class Channel : public CompletionHandler
void flow(bool active);
void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);
void handlePublish(Message* msg);
- void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
- void handleContent(boost::shared_ptr<framing::AMQContentBody>);
- void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
+ void handleHeader(framing::AMQHeaderBody*);
+ void handleContent(framing::AMQContentBody*);
+ void handleHeartbeat(framing::AMQHeartbeatBody*);
void handleInlineTransfer(Message::shared_ptr msg);
};
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp
index 244bee4a92..bddd5802cf 100644
--- a/cpp/src/qpid/broker/BrokerMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessage.cpp
@@ -72,22 +72,25 @@ BasicMessage::BasicMessage(
const string& _exchange, const string& _routingKey,
bool _mandatory, bool _immediate
) :
- Message(_publisher, _exchange, _routingKey, _mandatory,
- _immediate, framing::AMQMethodBody::shared_ptr(new BasicPublishBody(ProtocolVersion(0,9)))),
+ Message(_publisher, _exchange, _routingKey, _mandatory, _immediate),
size(0)
{}
// For tests only.
-BasicMessage::BasicMessage() : size(0)
-{}
+BasicMessage::BasicMessage() : isHeaderSet(false), size(0) {}
BasicMessage::~BasicMessage(){}
-void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){
- this->header = _header;
+void BasicMessage::setHeader(AMQHeaderBody* _header){
+ if (_header) {
+ this->header = *_header;
+ isHeaderSet = true;
+ }
+ else
+ isHeaderSet = false;
}
-void BasicMessage::addContent(AMQContentBody::shared_ptr data){
+void BasicMessage::addContent(AMQContentBody* data){
if (!content.get()) {
content = std::auto_ptr<Content>(new InMemoryContent());
}
@@ -96,7 +99,7 @@ void BasicMessage::addContent(AMQContentBody::shared_ptr data){
}
bool BasicMessage::isComplete(){
- return header.get() && (header->getContentSize() == contentSize());
+ return isHeaderSet && (header.getContentSize() == contentSize());
}
DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue)
@@ -113,10 +116,9 @@ void BasicMessage::deliver(ChannelAdapter& channel,
const string& consumerTag, DeliveryId id,
uint32_t framesize)
{
- channel.send(make_shared_ptr(
- new BasicDeliverBody(
+ channel.send(BasicDeliverBody(
channel.getVersion(), consumerTag, id.getValue(),
- getRedelivered(), getExchange(), getRoutingKey())));
+ getRedelivered(), getExchange(), getRoutingKey()));
sendContent(channel, framesize);
}
@@ -125,11 +127,11 @@ void BasicMessage::sendGetOk(ChannelAdapter& channel,
DeliveryId id,
uint32_t framesize)
{
- channel.send(make_shared_ptr(
- new BasicGetOkBody(
+ channel.send(
+ BasicGetOkBody(
channel.getVersion(),
id.getValue(), getRedelivered(), getExchange(),
- getRoutingKey(), messageCount)));
+ getRoutingKey(), messageCount));
sendContent(channel, framesize);
}
@@ -156,12 +158,11 @@ void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize)
channel.send(header);
Mutex::ScopedLock locker(contentLock);
if (content.get())
- content->send(channel, framesize);
+ content->send(channel, framesize);
}
BasicHeaderProperties* BasicMessage::getHeaderProperties(){
- return boost::polymorphic_downcast<BasicHeaderProperties*>(
- header->getProperties());
+ return isHeaderSet ? dynamic_cast<BasicHeaderProperties*>(header.getProperties()) : 0;
}
const FieldTable& BasicMessage::getApplicationHeaders(){
@@ -170,7 +171,7 @@ const FieldTable& BasicMessage::getApplicationHeaders(){
bool BasicMessage::isPersistent()
{
- if(!header) return false;
+ if(!isHeaderSet) return false;
BasicHeaderProperties* props = getHeaderProperties();
return props && props->getDeliveryMode() == PERSISTENT;
}
@@ -194,9 +195,9 @@ void BasicMessage::decodeHeader(Buffer& buffer)
setRouting(exchange, routingKey);
uint32_t headerSize = buffer.getLong();
- AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody());
- headerBody->decode(buffer, headerSize);
- setHeader(headerBody);
+ AMQHeaderBody headerBody;
+ headerBody.decode(buffer, headerSize);
+ setHeader(&headerBody);
}
void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize)
@@ -214,9 +215,9 @@ void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize)
uint64_t total = 0;
while (total < expectedContentSize()) {
uint64_t remaining = expected - total;
- AMQContentBody::shared_ptr contentBody(new AMQContentBody());
- contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize);
- addContent(contentBody);
+ AMQContentBody contentBody;
+ contentBody.decode(buffer, remaining < chunkSize ? remaining : chunkSize);
+ addContent(&contentBody);
total += chunkSize;
}
}
@@ -232,8 +233,8 @@ void BasicMessage::encodeHeader(Buffer& buffer) const
RecoveryManagerImpl::encodeMessageType(*this, buffer);
buffer.putShortString(getExchange());
buffer.putShortString(getRoutingKey());
- buffer.putLong(header->size());
- header->encode(buffer);
+ buffer.putLong(header.size());
+ header.encode(buffer);
}
void BasicMessage::encodeContent(Buffer& buffer) const
@@ -258,12 +259,12 @@ uint32_t BasicMessage::encodedHeaderSize() const
return RecoveryManagerImpl::encodedMessageTypeSize()
+getExchange().size() + 1
+ getRoutingKey().size() + 1
- + header->size() + 4;//4 extra bytes for size
+ + header.size() + 4;//4 extra bytes for size
}
uint64_t BasicMessage::expectedContentSize()
{
- return header.get() ? header->getContentSize() : 0;
+ return isHeaderSet ? header.getContentSize() : 0;
}
void BasicMessage::releaseContent(MessageStore* store)
@@ -294,5 +295,5 @@ void BasicMessage::setContent(std::auto_ptr<Content>& _content)
uint32_t BasicMessage::getRequiredCredit() const
{
- return header->size() + contentSize();
+ return header.size() + contentSize();
}
diff --git a/cpp/src/qpid/broker/BrokerMessage.h b/cpp/src/qpid/broker/BrokerMessage.h
index af8e4e62e9..0f46ff2e83 100644
--- a/cpp/src/qpid/broker/BrokerMessage.h
+++ b/cpp/src/qpid/broker/BrokerMessage.h
@@ -27,6 +27,7 @@
#include "BrokerMessageBase.h"
#include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/AMQHeaderBody.h"
#include "ConnectionToken.h"
#include "Content.h"
#include "qpid/sys/Mutex.h"
@@ -51,7 +52,8 @@ using framing::string;
* request.
*/
class BasicMessage : public Message {
- boost::shared_ptr<framing::AMQHeaderBody> header;
+ framing::AMQHeaderBody header;
+ bool isHeaderSet;
std::auto_ptr<Content> content;
mutable sys::Mutex contentLock;
uint64_t size;
@@ -66,8 +68,8 @@ class BasicMessage : public Message {
bool mandatory, bool immediate);
BasicMessage();
~BasicMessage();
- void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header);
- void addContent(framing::AMQContentBody::shared_ptr data);
+ void setHeader(framing::AMQHeaderBody* header);
+ void addContent(framing::AMQContentBody* data);
bool isComplete();
static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue);
diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h
index 94035905ce..bac5dc6386 100644
--- a/cpp/src/qpid/broker/BrokerMessageBase.h
+++ b/cpp/src/qpid/broker/BrokerMessageBase.h
@@ -54,22 +54,18 @@ class MessageStore;
class Message : public PersistableMessage{
public:
typedef boost::shared_ptr<Message> shared_ptr;
- typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr;
-
Message(const ConnectionToken* publisher_,
const std::string& _exchange,
const std::string& _routingKey,
- bool _mandatory, bool _immediate,
- AMQMethodBodyPtr respondTo_) :
+ bool _mandatory, bool _immediate) :
publisher(publisher_),
exchange(_exchange),
routingKey(_routingKey),
mandatory(_mandatory),
immediate(_immediate),
persistenceId(0),
- redelivered(false),
- respondTo(respondTo_)
+ redelivered(false)
{}
Message() :
@@ -145,8 +141,8 @@ class Message : public PersistableMessage{
* it uses).
*/
virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
- virtual void setHeader(boost::shared_ptr<framing::AMQHeaderBody>) {};
- virtual void addContent(boost::shared_ptr<framing::AMQContentBody>) {};
+ virtual void setHeader(framing::AMQHeaderBody*) {};
+ virtual void addContent(framing::AMQContentBody*) {};
/**
* Releases the in-memory content data held by this
* message. Must pass in a store from which the data can
@@ -164,7 +160,6 @@ class Message : public PersistableMessage{
const bool immediate;
mutable uint64_t persistenceId;
bool redelivered;
- AMQMethodBodyPtr respondTo;
};
}}
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
index 0da5f3d8f5..1184885aeb 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
@@ -48,35 +48,34 @@ struct MessageDeliveryToken : public DeliveryToken
};
MessageMessage::MessageMessage(
- ConnectionToken* publisher, TransferPtr transfer_
+ ConnectionToken* publisher, const MessageTransferBody* transfer_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
transfer_->getRejectUnroutable(),
- transfer_->getImmediate(),
- transfer_),
- transfer(transfer_)
+ transfer_->getImmediate()),
+ transfer(*transfer_)
{
- assert(transfer->getBody().isInline());
+ assert(transfer.getBody().isInline());
}
MessageMessage::MessageMessage(
- ConnectionToken* publisher, TransferPtr transfer_, ReferencePtr reference_
+ ConnectionToken* publisher, const MessageTransferBody* transfer_,
+ ReferencePtr reference_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
transfer_->getRejectUnroutable(),
- transfer_->getImmediate(),
- transfer_),
- transfer(transfer_),
+ transfer_->getImmediate()),
+ transfer(*transfer_),
reference(reference_)
{
- assert(!transfer->getBody().isInline());
+ assert(!transfer.getBody().isInline());
assert(reference_);
}
/**
* Currently used by message store impls to recover messages
*/
-MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {}
+MessageMessage::MessageMessage() {}
// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring
void MessageMessage::transferMessage(
@@ -84,27 +83,28 @@ void MessageMessage::transferMessage(
const std::string& consumerTag,
uint32_t framesize)
{
- const framing::Content& body = transfer->getBody();
+ const framing::Content& body = transfer.getBody();
// Send any reference data
ReferencePtr ref= getReference();
if (ref){
// Open
- channel.send(make_shared_ptr(new MessageOpenBody(channel.getVersion(), ref->getId())));
+ channel.send(MessageOpenBody(channel.getVersion(), ref->getId()));
// Appends
for(Reference::Appends::const_iterator a = ref->getAppends().begin();
a != ref->getAppends().end();
++a) {
- uint32_t sizeleft = (*a)->size();
- const string& content = (*a)->getBytes();
+ uint32_t sizeleft = a->size();
+ const string& content = a->getBytes();
// Calculate overhead bytes
// Assume that the overhead is constant as the reference name doesn't change
uint32_t overhead = sizeleft - content.size();
string::size_type contentStart = 0;
while (sizeleft) {
string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
- channel.send(make_shared_ptr(new MessageAppendBody(channel.getVersion(), ref->getId(),
- string(content, contentStart, contentSize))));
+
+ channel.send(MessageAppendBody(channel.getVersion(), ref->getId(),
+ string(content, contentStart, contentSize)));
sizeleft -= contentSize;
contentStart += contentSize;
}
@@ -112,76 +112,73 @@ void MessageMessage::transferMessage(
}
// The transfer
- if ( transfer->size()<=framesize ) {
- channel.send(make_shared_ptr(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- body)));
+ if ( transfer.size()<=framesize ) {
+ channel.send(MessageTransferBody(ProtocolVersion(),
+ transfer.getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ body));
} else {
// Thing to do here is to construct a simple reference message then deliver that instead
// fragmentation will be taken care of in the delivery if necessary;
string content = body.getValue();
string refname = "dummy";
- TransferPtr newTransfer(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- framing::Content(REFERENCE, refname)));
+ MessageTransferBody newTransfer(channel.getVersion(),
+ transfer.getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ framing::Content(REFERENCE, refname));
ReferencePtr newRef(new Reference(refname));
- Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
- newRef->append(newAppend);
- MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), newTransfer, newRef);
+ newRef->append(MessageAppendBody(channel.getVersion(), refname, content));
+ MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), &newTransfer, newRef);
newMsg.transferMessage(channel, consumerTag, framesize);
return;
}
// Close any reference data
if (ref)
- channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId())));
+ channel.send(MessageCloseBody(ProtocolVersion(), ref->getId()));
}
@@ -202,8 +199,8 @@ bool MessageMessage::isComplete()
uint64_t MessageMessage::contentSize() const
{
- if (transfer->getBody().isInline())
- return transfer->getBody().getValue().size();
+ if (transfer.getBody().isInline())
+ return transfer.getBody().getValue().size();
else {
assert(getReference());
return getReference()->getSize();
@@ -217,11 +214,11 @@ qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
const FieldTable& MessageMessage::getApplicationHeaders()
{
- return transfer->getApplicationHeaders();
+ return transfer.getApplicationHeaders();
}
bool MessageMessage::isPersistent()
{
- return transfer->getDeliveryMode() == PERSISTENT;
+ return transfer.getDeliveryMode() == PERSISTENT;
}
uint32_t MessageMessage::encodedSize() const
@@ -231,7 +228,7 @@ uint32_t MessageMessage::encodedSize() const
uint32_t MessageMessage::encodedHeaderSize() const
{
- return RecoveryManagerImpl::encodedMessageTypeSize() + transfer->size() - transfer->baseSize();
+ return RecoveryManagerImpl::encodedMessageTypeSize() + transfer.size();
}
uint32_t MessageMessage::encodedContentSize() const
@@ -252,18 +249,17 @@ void MessageMessage::encode(Buffer& buffer) const
void MessageMessage::encodeHeader(Buffer& buffer) const
{
RecoveryManagerImpl::encodeMessageType(*this, buffer);
- if (transfer->getBody().isInline()) {
- transfer->encodeContent(buffer);
+ if (transfer.getBody().isInline()) {
+ transfer.encode(buffer);
} else {
assert(getReference());
string data;
const Reference::Appends& appends = getReference()->getAppends();
for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) {
- data += (*a)->getBytes();
+ data += a->getBytes();
}
framing::Content body(INLINE, data);
- std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body));
- copy->encodeContent(buffer);
+ copyTransfer(ProtocolVersion(), transfer.getDestination(), body).encode(buffer);
}
}
@@ -271,8 +267,7 @@ void MessageMessage::decodeHeader(Buffer& buffer)
{
//don't care about the type here, but want encode/decode to be symmetric
RecoveryManagerImpl::decodeMessageType(buffer);
-
- transfer->decodeContent(buffer);
+ transfer.decode(buffer);
}
void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
@@ -280,37 +275,36 @@ void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
}
-MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version,
- const string& destination,
- const framing::Content& body) const
+MessageTransferBody MessageMessage::copyTransfer(const ProtocolVersion& version,
+ const string& destination,
+ const framing::Content& body) const
{
- return new MessageTransferBody(version,
- transfer->getTicket(),
- destination,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- body);
-
+ return MessageTransferBody(version,
+ transfer.getTicket(),
+ destination,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ body);
}
MessageMessage::ReferencePtr MessageMessage::getReference() const {
@@ -321,7 +315,7 @@ uint32_t MessageMessage::getRequiredCredit() const
{
//TODO: change when encoding changes. Should be the payload of any
//header & body frames.
- return transfer->size();
+ return transfer.size();
}
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.h b/cpp/src/qpid/broker/BrokerMessageMessage.h
index 6b1bd9ab5d..6bfd0e045d 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.h
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.h
@@ -29,10 +29,6 @@
namespace qpid {
-namespace framing {
-class MessageTransferBody;
-}
-
namespace broker {
class ConnectionToken;
class Reference;
@@ -40,16 +36,15 @@ class Reference;
class MessageMessage: public Message{
public:
typedef boost::shared_ptr<MessageMessage> shared_ptr;
- typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
typedef boost::shared_ptr<Reference> ReferencePtr;
- MessageMessage(ConnectionToken* publisher, TransferPtr transfer);
- MessageMessage(ConnectionToken* publisher, TransferPtr transfer, ReferencePtr reference);
+ MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer);
+ MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer, ReferencePtr reference);
MessageMessage();
// Default destructor okay
- TransferPtr getTransfer() const { return transfer; }
+ framing::MessageTransferBody* getTransfer() const { return const_cast<framing::MessageTransferBody*>(&transfer); }
ReferencePtr getReference() const ;
void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
@@ -80,12 +75,12 @@ class MessageMessage: public Message{
const std::string& consumerTag,
uint32_t framesize);
- framing::MessageTransferBody* copyTransfer(
+ framing::MessageTransferBody copyTransfer(
const framing::ProtocolVersion& version,
const std::string& destination,
const framing::Content& body) const;
- const TransferPtr transfer;
+ framing::MessageTransferBody transfer;
const boost::shared_ptr<Reference> reference;
};
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp
index a67a5557c6..175f57df7d 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.cpp
+++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp
@@ -1,3 +1,4 @@
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -42,8 +43,7 @@ void ConnectionAdapter::close(ReplyCode code, const string& text, ClassId classI
handler->client.close(code, text, classId, methodId);
}
-void ConnectionAdapter::handleMethod(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method)
+void ConnectionAdapter::handleMethod(framing::AMQMethodBody* method)
{
try{
method->invoke(*this);
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h
index 1ce850a659..9aa3d130e8 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.h
+++ b/cpp/src/qpid/broker/ConnectionAdapter.h
@@ -47,12 +47,12 @@ public:
void handle(framing::AMQFrame& frame);
//ChannelAdapter virtual methods:
- void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method);
+ void handleMethod(framing::AMQMethodBody* method);
bool isOpen() const { return true; } //channel 0 is always open
//never needed:
- void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>) {}
- void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>) {}
- void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>) {}
+ void handleHeader(framing::AMQHeaderBody*) {}
+ void handleContent(framing::AMQContentBody*) {}
+ void handleHeartbeat(framing::AMQHeartbeatBody*) {}
//AMQP_ServerOperations:
ConnectionHandler* getConnectionHandler();
diff --git a/cpp/src/qpid/broker/Content.h b/cpp/src/qpid/broker/Content.h
index df9e7a1132..97dce0d3f7 100644
--- a/cpp/src/qpid/broker/Content.h
+++ b/cpp/src/qpid/broker/Content.h
@@ -42,7 +42,7 @@ class Content{
virtual ~Content(){}
/** Add a block of data to the content */
- virtual void add(framing::AMQContentBody::shared_ptr data) = 0;
+ virtual void add(framing::AMQContentBody* data) = 0;
/** Total size of content in bytes */
virtual uint32_t size() = 0;
diff --git a/cpp/src/qpid/broker/InMemoryContent.cpp b/cpp/src/qpid/broker/InMemoryContent.cpp
index a6ce820f7e..d69dcfafe7 100644
--- a/cpp/src/qpid/broker/InMemoryContent.cpp
+++ b/cpp/src/qpid/broker/InMemoryContent.cpp
@@ -26,16 +26,16 @@ using namespace qpid::broker;
using namespace qpid::framing;
using boost::static_pointer_cast;
-void InMemoryContent::add(AMQContentBody::shared_ptr data)
+void InMemoryContent::add(AMQContentBody* data)
{
- content.push_back(data);
+ content.push_back(*data);
}
uint32_t InMemoryContent::size()
{
int sum(0);
for (content_iterator i = content.begin(); i != content.end(); i++) {
- sum += (*i)->size();
+ sum += i->size();
}
return sum;
}
@@ -43,22 +43,20 @@ uint32_t InMemoryContent::size()
void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize)
{
for (content_iterator i = content.begin(); i != content.end(); i++) {
- if ((*i)->size() > framesize) {
+ if (i->size() > framesize) {
uint32_t offset = 0;
- for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
- string data = (*i)->getData().substr(offset, framesize);
- channel.send(make_shared_ptr(new AMQContentBody(data)));
+ for (int chunk = i->size() / framesize; chunk > 0; chunk--) {
+ string data = i->getData().substr(offset, framesize);
+ channel.send(AMQContentBody(data));
offset += framesize;
}
- uint32_t remainder = (*i)->size() % framesize;
+ uint32_t remainder = i->size() % framesize;
if (remainder) {
- string data = (*i)->getData().substr(offset, remainder);
- channel.send(make_shared_ptr(new AMQContentBody(data)));
+ string data = i->getData().substr(offset, remainder);
+ channel.send(AMQContentBody(data));
}
} else {
- AMQBody::shared_ptr contentBody =
- static_pointer_cast<AMQBody, AMQContentBody>(*i);
- channel.send(contentBody);
+ channel.send(*i);
}
}
}
@@ -66,7 +64,7 @@ void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize)
void InMemoryContent::encode(Buffer& buffer)
{
for (content_iterator i = content.begin(); i != content.end(); i++) {
- (*i)->encode(buffer);
+ i->encode(buffer);
}
}
diff --git a/cpp/src/qpid/broker/InMemoryContent.h b/cpp/src/qpid/broker/InMemoryContent.h
index 425f0e4e26..a6fca7ca98 100644
--- a/cpp/src/qpid/broker/InMemoryContent.h
+++ b/cpp/src/qpid/broker/InMemoryContent.h
@@ -22,21 +22,22 @@
#define _InMemoryContent_
#include "Content.h"
+#include "qpid/framing/AMQContentBody.h"
#include <vector>
namespace qpid {
namespace broker {
class InMemoryContent : public Content{
- typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
+ typedef std::vector<framing::AMQContentBody> content_list;
typedef content_list::iterator content_iterator;
content_list content;
public:
- void add(qpid::framing::AMQContentBody::shared_ptr data);
+ void add(framing::AMQContentBody* data);
uint32_t size();
void send(framing::ChannelAdapter&, uint32_t framesize);
- void encode(qpid::framing::Buffer& buffer);
+ void encode(framing::Buffer& buffer);
};
}
}
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp
index 80d06ebf2b..b8b5b37f45 100644
--- a/cpp/src/qpid/broker/LazyLoadedContent.cpp
+++ b/cpp/src/qpid/broker/LazyLoadedContent.cpp
@@ -33,7 +33,7 @@ LazyLoadedContent::~LazyLoadedContent()
LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, uint64_t _expectedSize) :
store(_store), msg(_msg), expectedSize(_expectedSize) {}
-void LazyLoadedContent::add(AMQContentBody::shared_ptr data)
+void LazyLoadedContent::add(AMQContentBody* data)
{
store->appendContent(*msg, data->getData());
}
@@ -52,12 +52,12 @@ void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize)
string data;
store->loadContent(*msg, data, offset,
remaining > framesize ? framesize : remaining);
- channel.send(make_shared_ptr(new AMQContentBody(data)));
+ channel.send(AMQContentBody(data));
}
} else {
string data;
store->loadContent(*msg, data, 0, expectedSize);
- channel.send(make_shared_ptr(new AMQContentBody(data)));
+ channel.send(AMQContentBody(data));
}
}
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.h b/cpp/src/qpid/broker/LazyLoadedContent.h
index 9dff6158a5..79a33ed7a9 100644
--- a/cpp/src/qpid/broker/LazyLoadedContent.h
+++ b/cpp/src/qpid/broker/LazyLoadedContent.h
@@ -36,7 +36,7 @@ namespace qpid {
MessageStore* const store, Message* const msg,
uint64_t expectedSize);
~LazyLoadedContent();
- void add(qpid::framing::AMQContentBody::shared_ptr data);
+ void add(qpid::framing::AMQContentBody* data);
uint32_t size();
void send(
framing::ChannelAdapter&,
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index 6c33b38e72..f19927b708 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -50,7 +50,7 @@ void MessageBuilder::initialise(Message::shared_ptr& msg){
message = msg;
}
-void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
+void MessageBuilder::setHeader(AMQHeaderBody* header){
if(!message.get()){
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
}
@@ -65,7 +65,7 @@ void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
route();
}
-void MessageBuilder::addContent(AMQContentBody::shared_ptr& content){
+void MessageBuilder::addContent(AMQContentBody* content){
if(!message.get()){
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
}
diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h
index 7c4a529a64..18e85d7383 100644
--- a/cpp/src/qpid/broker/MessageBuilder.h
+++ b/cpp/src/qpid/broker/MessageBuilder.h
@@ -39,8 +39,8 @@ namespace qpid {
MessageStore* const store = 0,
uint64_t stagingThreshold = 0);
void initialise(Message::shared_ptr& msg);
- void setHeader(framing::AMQHeaderBody::shared_ptr& header);
- void addContent(framing::AMQContentBody::shared_ptr& content);
+ void setHeader(framing::AMQHeaderBody* header);
+ void addContent(framing::AMQContentBody* content);
Message::shared_ptr getMessage() { return message; }
private:
Message::shared_ptr message;
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index b5bea05eac..3f407c11f7 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -28,6 +28,7 @@
#include "BrokerAdapter.h"
#include <boost/format.hpp>
+#include <boost/cast.hpp>
namespace qpid {
namespace broker {
@@ -159,9 +160,7 @@ MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ )
void
MessageHandlerImpl::transfer(const framing::AMQMethodBody& context)
{
- MessageTransferBody::shared_ptr transfer(
- make_shared_ptr(new MessageTransferBody(static_cast<const MessageTransferBody&>(context))));
-
+ const MessageTransferBody* transfer = boost::polymorphic_downcast<const MessageTransferBody*>(&context);
if (transfer->getBody().isInline()) {
MessageMessage::shared_ptr message(new MessageMessage(&connection, transfer));
channel.handleInlineTransfer(message);
diff --git a/cpp/src/qpid/broker/Reference.cpp b/cpp/src/qpid/broker/Reference.cpp
index c2060f8fee..283b231b60 100644
--- a/cpp/src/qpid/broker/Reference.cpp
+++ b/cpp/src/qpid/broker/Reference.cpp
@@ -40,9 +40,9 @@ Reference::shared_ptr ReferenceRegistry::get(const Reference::Id& id) {
return i->second;
}
-void Reference::append(AppendPtr ptr) {
- appends.push_back(ptr);
- size += ptr->getBytes().length();
+void Reference::append(const framing::MessageAppendBody& app) {
+ appends.push_back(app);
+ size += app.getBytes().length();
}
void Reference::close() {
diff --git a/cpp/src/qpid/broker/Reference.h b/cpp/src/qpid/broker/Reference.h
index 277eb7b917..5a373fbeba 100644
--- a/cpp/src/qpid/broker/Reference.h
+++ b/cpp/src/qpid/broker/Reference.h
@@ -19,6 +19,8 @@
*
*/
+#include "qpid/framing/MessageAppendBody.h"
+
#include <string>
#include <vector>
#include <map>
@@ -56,8 +58,7 @@ class Reference
typedef boost::shared_ptr<Reference> shared_ptr;
typedef boost::shared_ptr<MessageMessage> MessagePtr;
typedef std::vector<MessagePtr> Messages;
- typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr;
- typedef std::vector<AppendPtr> Appends;
+ typedef std::vector<framing::MessageAppendBody> Appends;
Reference(const Id& id_=Id(), ReferenceRegistry* reg=0)
: id(id_), size(0), registry(reg) {}
@@ -69,7 +70,7 @@ class Reference
void addMessage(MessagePtr message) { messages.push_back(message); }
/** Append more data to the reference */
- void append(AppendPtr ptr);
+ void append(const framing::MessageAppendBody&);
/** Close the reference, complete each associated message */
void close();
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index 6ef2162a4a..b7aa2aad25 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -22,6 +22,8 @@
#include "SemanticHandler.h"
#include "BrokerAdapter.h"
#include "qpid/framing/ChannelAdapter.h"
+#include "qpid/framing/ExecutionCompleteBody.h"
+#include "qpid/framing/ChannelCloseOkBody.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -60,7 +62,7 @@ void SemanticHandler::handle(framing::AMQFrame& frame)
}
//ChannelAdapter virtual methods:
-void SemanticHandler::handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method)
+void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
{
try {
if (!method->invoke(this)) {
@@ -108,11 +110,11 @@ void SemanticHandler::flush()
incoming.lwm = incoming.hwm;
if (isOpen()) {
Mutex::ScopedLock l(outLock);
- ChannelAdapter::send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
+ ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()));
}
}
-void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method)
+void SemanticHandler::handleL4(framing::AMQMethodBody* method)
{
try{
if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
@@ -139,17 +141,17 @@ bool SemanticHandler::isOpen() const
return channel.isOpen();
}
-void SemanticHandler::handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody> body)
+void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody* body)
{
channel.handleHeader(body);
}
-void SemanticHandler::handleContent(boost::shared_ptr<qpid::framing::AMQContentBody> body)
+void SemanticHandler::handleContent(qpid::framing::AMQContentBody* body)
{
channel.handleContent(body);
}
-void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody> body)
+void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody* body)
{
channel.handleHeartbeat(body);
}
@@ -169,16 +171,12 @@ void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_
msg->deliver(*this, tag, token, connection.getFrameMax());
}
-void SemanticHandler::send(shared_ptr<AMQBody> body)
+void SemanticHandler::send(const AMQBody& body)
{
Mutex::ScopedLock l(outLock);
- uint8_t type(body->type());
- if (type == METHOD_BODY) {
+ if (body.getMethod() && body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID) {
//temporary hack until channel management is moved to its own handler:
- if (dynamic_pointer_cast<AMQMethodBody>(body)->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- ++outgoing.hwm;
- //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl;
- }
+ ++outgoing.hwm;
}
ChannelAdapter::send(body);
}
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index 016c94738d..6748da8500 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -31,6 +31,14 @@
#include "qpid/framing/SequenceNumber.h"
namespace qpid {
+
+namespace framing {
+class AMQMethodBody;
+class AMQHeaderBody;
+class AMQContentBody;
+class AMQHeaderBody;
+}
+
namespace broker {
class BrokerAdapter;
@@ -48,16 +56,16 @@ class SemanticHandler : private framing::ChannelAdapter,
framing::Window outgoing;
sys::Mutex outLock;
- void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method);
+ void handleL4(framing::AMQMethodBody* method);
//ChannelAdapter virtual methods:
- void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method);
+ void handleMethod(framing::AMQMethodBody* method);
bool isOpen() const;
- void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
- void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
- void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
+ void handleHeader(framing::AMQHeaderBody*);
+ void handleContent(framing::AMQContentBody*);
+ void handleHeartbeat(framing::AMQHeartbeatBody*);
- void send(shared_ptr<framing::AMQBody> body);
+ void send(const framing::AMQBody& body);
//delivery adapter methods:
diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp
index a6aea438f0..b3d720baf0 100644
--- a/cpp/src/qpid/client/ChannelHandler.cpp
+++ b/cpp/src/qpid/client/ChannelHandler.cpp
@@ -21,6 +21,7 @@
#include "ChannelHandler.h"
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/all_method_bodies.h"
using namespace qpid::client;
using namespace qpid::framing;
@@ -30,40 +31,39 @@ ChannelHandler::ChannelHandler() : StateManager(CLOSED), id(0) {}
void ChannelHandler::incoming(AMQFrame& frame)
{
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
if (getState() == OPEN) {
- if (isA<ChannelCloseBody>(body)) {
- ChannelCloseBody::shared_ptr method(shared_polymorphic_cast<ChannelCloseBody>(body));
+ ChannelCloseBody* closeBody=
+ dynamic_cast<ChannelCloseBody*>(body->getMethod());
+ if (closeBody) {
setState(CLOSED);
if (onClose) {
- onClose(method->getReplyCode(), method->getReplyText());
+ onClose(closeBody->getReplyCode(), closeBody->getReplyText());
}
} else {
try {
in(frame);
}catch(ChannelException& e){
- if (body->type() == METHOD_BODY) {
- AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body));
- close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
- } else {
+ AMQMethodBody* method=body->getMethod();
+ if (method)
+ close(e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ else
close(e.code, e.toString(), 0, 0);
- }
}
}
} else {
- if (body->type() == METHOD_BODY) {
- handleMethod(shared_polymorphic_cast<AMQMethodBody>(body));
- } else {
+ if (body->getMethod())
+ handleMethod(body->getMethod());
+ else
throw new ConnectionException(504, "Channel not open.");
- }
-
}
}
void ChannelHandler::outgoing(AMQFrame& frame)
{
if (getState() == OPEN) {
- frame.channel = id;
+ frame.setChannel(id);
out(frame);
} else {
throw Exception("Channel not open");
@@ -75,7 +75,7 @@ void ChannelHandler::open(uint16_t _id)
id = _id;
setState(OPENING);
- AMQFrame f(version, id, make_shared_ptr(new ChannelOpenBody(version)));
+ AMQFrame f(version, id, ChannelOpenBody(version));
out(f);
std::set<int> states;
@@ -90,7 +90,7 @@ void ChannelHandler::open(uint16_t _id)
void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
{
setState(CLOSING);
- AMQFrame f(version, id, make_shared_ptr(new ChannelCloseBody(version, code, message, classId, methodId)));
+ AMQFrame f(version, id, ChannelCloseBody(version, code, message, classId, methodId));
out(f);
}
@@ -100,24 +100,24 @@ void ChannelHandler::close()
waitFor(CLOSED);
}
-void ChannelHandler::handleMethod(AMQMethodBody::shared_ptr method)
+void ChannelHandler::handleMethod(AMQMethodBody* method)
{
switch (getState()) {
- case OPENING:
+ case OPENING:
if (method->isA<ChannelOpenOkBody>()) {
setState(OPEN);
} else {
throw ConnectionException(504, "Channel not opened.");
}
break;
- case CLOSING:
+ case CLOSING:
if (method->isA<ChannelCloseOkBody>()) {
setState(CLOSED);
} //else just ignore it
break;
- case CLOSED:
+ case CLOSED:
throw ConnectionException(504, "Channel not opened.");
- default:
+ default:
throw Exception("Unexpected state encountered in ChannelHandler!");
}
}
diff --git a/cpp/src/qpid/client/ChannelHandler.h b/cpp/src/qpid/client/ChannelHandler.h
index eaa7e7cc72..556e13a4f8 100644
--- a/cpp/src/qpid/client/ChannelHandler.h
+++ b/cpp/src/qpid/client/ChannelHandler.h
@@ -34,13 +34,7 @@ class ChannelHandler : private StateManager, public ChainableFrameHandler
framing::ProtocolVersion version;
uint16_t id;
- void handleMethod(framing::AMQMethodBody::shared_ptr method);
-
- template <class T> bool isA(framing::AMQBody::shared_ptr body) {
- return body->type() == framing::METHOD_BODY &&
- boost::shared_polymorphic_cast<framing::AMQMethodBody>(body)->isA<T>();
- }
-
+ void handleMethod(framing::AMQMethodBody* method);
void close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId);
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 8ffddd0dbf..aa73e83328 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -25,11 +25,11 @@
#include "ClientMessage.h"
#include "qpid/QpidError.h"
#include "Connection.h"
-#include "ConnectionHandler.h"
#include "FutureResponse.h"
#include "MessageListener.h"
#include <boost/format.hpp>
#include <boost/bind.hpp>
+#include "qpid/framing/all_method_bodies.h"
// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
// handling of errors that should close the connection or the channel.
@@ -37,7 +37,6 @@
//
using namespace std;
using namespace boost;
-using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
@@ -49,13 +48,11 @@ const std::string empty;
class ScopedSync
{
Session& session;
-public:
+ public:
ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); }
~ScopedSync() { session.setSynchronous(false); }
};
-}}
-
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
prefetch(_prefetch), transactional(_transactional), running(false)
{
@@ -250,3 +247,6 @@ void Channel::run() {
}
} catch (const QueueClosed&) {}
}
+
+}}
+
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index e2e83c8caf..e41ab363b5 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -27,6 +27,7 @@
#include "ClientChannel.h"
#include "ConnectionImpl.h"
#include "Session.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
namespace qpid {
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index f47506d977..66db9384e2 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -22,6 +22,8 @@
#include "ConnectionHandler.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/framing/all_method_bodies.h"
using namespace qpid::client;
using namespace qpid::framing;
@@ -53,16 +55,16 @@ void ConnectionHandler::incoming(AMQFrame& frame)
throw Exception("Connection is closed.");
}
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
if (frame.getChannel() == 0) {
- if (body->type() == METHOD_BODY) {
- handle(shared_polymorphic_cast<AMQMethodBody>(body));
+ if (body->getMethod()) {
+ handle(body->getMethod());
} else {
error(503, "Cannot send content on channel zero.");
}
} else {
switch(getState()) {
- case OPEN:
+ case OPEN:
try {
in(frame);
}catch(ConnectionException& e){
@@ -71,10 +73,10 @@ void ConnectionHandler::incoming(AMQFrame& frame)
error(541/*internal error*/, e.what(), body);
}
break;
- case CLOSING:
+ case CLOSING:
QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored.");
break;
- default:
+ default:
//must be in connection initialisation:
fail("Cannot receive frames on non-zero channel until connection is established.");
}
@@ -101,32 +103,29 @@ void ConnectionHandler::waitForOpen()
void ConnectionHandler::close()
{
setState(CLOSING);
- send(make_shared_ptr(new ConnectionCloseBody(version, 200, OK, 0, 0)));
-
+ send(ConnectionCloseBody(version, 200, OK, 0, 0));
waitFor(CLOSED);
}
-void ConnectionHandler::send(framing::AMQBody::shared_ptr body)
+void ConnectionHandler::send(const framing::AMQBody& body)
{
- AMQFrame f;
- f.setBody(body);
+ AMQFrame f(ProtocolVersion(), 0, body);
out(f);
}
void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
{
setState(CLOSING);
- send(make_shared_ptr(new ConnectionCloseBody(version, code, message, classId, methodId)));
+ send(ConnectionCloseBody(version, code, message, classId, methodId));
}
-void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody::shared_ptr body)
+void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body)
{
- if (body->type() == METHOD_BODY) {
- AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body));
+ AMQMethodBody* method = body->getMethod();
+ if (method)
error(code, message, method->amqpClassId(), method->amqpMethodId());
- } else {
+ else
error(code, message);
- }
}
@@ -136,54 +135,54 @@ void ConnectionHandler::fail(const std::string& message)
setState(FAILED);
}
-void ConnectionHandler::handle(AMQMethodBody::shared_ptr method)
+void ConnectionHandler::handle(AMQMethodBody* method)
{
switch (getState()) {
- case NOT_STARTED:
+ case NOT_STARTED:
if (method->isA<ConnectionStartBody>()) {
setState(NEGOTIATING);
string response = ((char)0) + uid + ((char)0) + pwd;
- send(make_shared_ptr(new ConnectionStartOkBody(version, properties, mechanism, response, locale)));
+ send(ConnectionStartOkBody(version, properties, mechanism, response, locale));
} else {
fail("Bad method sequence, expected connection-start.");
}
break;
- case NEGOTIATING:
+ case NEGOTIATING:
if (method->isA<ConnectionTuneBody>()) {
- ConnectionTuneBody::shared_ptr proposal(shared_polymorphic_cast<ConnectionTuneBody>(method));
+ ConnectionTuneBody* proposal=polymorphic_downcast<ConnectionTuneBody*>(method);
heartbeat = proposal->getHeartbeat();
maxChannels = proposal->getChannelMax();
- send(make_shared_ptr(new ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat)));
+ send(ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat));
setState(OPENING);
- send(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, insist)));
- //TODO: support for further security challenges
- //} else if (method->isA<ConnectionSecureBody>()) {
+ send(ConnectionOpenBody(version, vhost, capabilities, insist));
+ //TODO: support for further security challenges
+ //} else if (method->isA<ConnectionSecureBody>()) {
} else {
fail("Unexpected method sequence, expected connection-tune.");
}
break;
- case OPENING:
+ case OPENING:
if (method->isA<ConnectionOpenOkBody>()) {
setState(OPEN);
- //TODO: support for redirection
- //} else if (method->isA<ConnectionRedirectBody>()) {
+ //TODO: support for redirection
+ //} else if (method->isA<ConnectionRedirectBody>()) {
} else {
fail("Unexpected method sequence, expected connection-open-ok.");
}
break;
- case OPEN:
+ case OPEN:
if (method->isA<ConnectionCloseBody>()) {
- send(make_shared_ptr(new ConnectionCloseOkBody(version)));
+ send(ConnectionCloseOkBody(version));
setState(CLOSED);
if (onError) {
- ConnectionCloseBody::shared_ptr c(shared_polymorphic_cast<ConnectionCloseBody>(method));
+ ConnectionCloseBody* c=polymorphic_downcast<ConnectionCloseBody*>(method);
onError(c->getReplyCode(), c->getReplyText());
}
} else {
error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId());
}
break;
- case CLOSING:
+ case CLOSING:
if (method->isA<ConnectionCloseOkBody>()) {
if (onClose) {
onClose();
diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h
index 464d0ca26d..d05ae1428b 100644
--- a/cpp/src/qpid/client/ConnectionHandler.h
+++ b/cpp/src/qpid/client/ConnectionHandler.h
@@ -25,6 +25,8 @@
#include "StateManager.h"
#include "ChainableFrameHandler.h"
#include "qpid/framing/InputHandler.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/AMQMethodBody.h"
namespace qpid {
namespace client {
@@ -53,10 +55,10 @@ class ConnectionHandler : private StateManager,
enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
std::set<int> ESTABLISHED;
- void handle(framing::AMQMethodBody::shared_ptr method);
- void send(framing::AMQBody::shared_ptr body);
+ void handle(framing::AMQMethodBody* method);
+ void send(const framing::AMQBody& body);
void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0);
- void error(uint16_t code, const std::string& message, framing::AMQBody::shared_ptr body);
+ void error(uint16_t code, const std::string& message, framing::AMQBody* body);
void fail(const std::string& message);
public:
diff --git a/cpp/src/qpid/client/Correlator.cpp b/cpp/src/qpid/client/Correlator.cpp
index edb16bbcee..9ef6857957 100644
--- a/cpp/src/qpid/client/Correlator.cpp
+++ b/cpp/src/qpid/client/Correlator.cpp
@@ -25,7 +25,7 @@ using qpid::client::Correlator;
using namespace qpid::framing;
using namespace boost;
-void Correlator::receive(AMQMethodBody::shared_ptr response)
+void Correlator::receive(AMQMethodBody* response)
{
if (listeners.empty()) {
throw ConnectionException(503, "Unexpected method!");//TODO: include the method & class name
diff --git a/cpp/src/qpid/client/Correlator.h b/cpp/src/qpid/client/Correlator.h
index 339c9bd0c4..d93e7b66cd 100644
--- a/cpp/src/qpid/client/Correlator.h
+++ b/cpp/src/qpid/client/Correlator.h
@@ -36,9 +36,9 @@ namespace client {
class Correlator
{
public:
- typedef boost::function<void(framing::AMQMethodBody::shared_ptr)> Listener;
+ typedef boost::function<void(framing::AMQMethodBody*)> Listener;
- void receive(framing::AMQMethodBody::shared_ptr);
+ void receive(framing::AMQMethodBody*);
void listen(Listener l);
private:
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index abfce4f9d1..6ee6429b6b 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -23,31 +23,35 @@
#include "qpid/Exception.h"
#include "qpid/framing/BasicDeliverBody.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/framing/all_method_bodies.h"
using namespace qpid::client;
using namespace qpid::framing;
using namespace boost;
-bool isMessageMethod(AMQMethodBody::shared_ptr method)
+bool isMessageMethod(AMQMethodBody* method)
{
return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>();
}
-bool isMessageMethod(AMQBody::shared_ptr body)
+bool isMessageMethod(AMQBody* body)
{
- return body->type() == METHOD_BODY && isMessageMethod(shared_polymorphic_cast<AMQMethodBody>(body));
+ AMQMethodBody* method=body->getMethod();
+ return method && isMessageMethod(method);
}
bool isContentFrame(AMQFrame& frame)
{
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
uint8_t type = body->type();
return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body);
}
-bool invoke(AMQBody::shared_ptr body, Invocable* target)
+bool invoke(AMQBody* body, Invocable* target)
{
- return body->type() == METHOD_BODY && shared_polymorphic_cast<AMQMethodBody>(body)->invoke(target);
+ AMQMethodBody* method=body->getMethod();
+ return method && method->invoke(target);
}
ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) :
@@ -56,7 +60,7 @@ ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) :
//incoming:
void ExecutionHandler::handle(AMQFrame& frame)
{
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
if (!invoke(body, this)) {
if (isContentFrame(frame)) {
if (!arriving) {
@@ -69,7 +73,7 @@ void ExecutionHandler::handle(AMQFrame& frame)
}
} else {
++incoming.hwm;
- correlation.receive(shared_polymorphic_cast<AMQMethodBody>(body));
+ correlation.receive(body->getMethod());
}
}
}
@@ -95,16 +99,15 @@ void ExecutionHandler::flush()
{
//send completion
incoming.lwm = incoming.hwm;
- //make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
}
void ExecutionHandler::sendFlush()
{
- AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version)));
+ AMQFrame frame(version, 0, ExecutionFlushBody());
out(frame);
}
-void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::Listener f, Correlator::Listener g)
+void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g)
{
//allocate id:
++outgoing.hwm;
@@ -116,18 +119,19 @@ void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::List
correlation.listen(g);
}
- AMQFrame frame(version, 0/*id will be filled in be channel handler*/, command);
+ AMQFrame frame(version, 0/*id will be filled in be channel handler*/,
+ command);
out(frame);
}
-void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeaderProperties& headers, const std::string& data,
+void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data,
CompletionTracker::Listener f, Correlator::Listener g)
{
send(command, f, g);
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header->getProperties()), headers);
- header->setContentSize(data.size());
+ AMQHeaderBody header(BASIC);
+ BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header.getProperties()), headers);
+ header.setContentSize(data.size());
AMQFrame h(version, 0, header);
out(h);
@@ -136,7 +140,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade
//frame itself uses 8 bytes
u_int32_t frag_size = maxFrameSize - 8;
if(data_length < frag_size){
- AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(data)));
+ AMQFrame frame(version, 0, AMQContentBody(data));
out(frame);
}else{
u_int32_t offset = 0;
@@ -144,7 +148,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
- AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(frag)));
+ AMQFrame frame(version, 0, AMQContentBody(frag));
out(frame);
offset += length;
remaining = data_length - offset;
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index f62598ef95..21613df779 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -56,10 +56,10 @@ public:
void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
void handle(framing::AMQFrame& frame);
- void send(framing::AMQBody::shared_ptr command,
+ void send(const framing::AMQBody& command,
CompletionTracker::Listener f = CompletionTracker::Listener(),
Correlator::Listener g = Correlator::Listener());
- void sendContent(framing::AMQBody::shared_ptr command,
+ void sendContent(const framing::AMQBody& command,
const framing::BasicHeaderProperties& headers, const std::string& data,
CompletionTracker::Listener f = CompletionTracker::Listener(),
Correlator::Listener g = Correlator::Listener());
diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp
index 6b1246a449..e63dc9c192 100644
--- a/cpp/src/qpid/client/FutureResponse.cpp
+++ b/cpp/src/qpid/client/FutureResponse.cpp
@@ -26,16 +26,16 @@ using namespace qpid::framing;
using namespace qpid::sys;
-AMQMethodBody::shared_ptr FutureResponse::getResponse()
+AMQMethodBody* FutureResponse::getResponse()
{
waitForCompletion();
- return response;
+ return response.get();
}
-void FutureResponse::received(AMQMethodBody::shared_ptr r)
+void FutureResponse::received(AMQMethodBody* r)
{
Monitor::ScopedLock l(lock);
- response = r;
+ response = *r;
complete = true;
lock.notifyAll();
}
diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/FutureResponse.h
index ccc6fb5894..75b1f72c04 100644
--- a/cpp/src/qpid/client/FutureResponse.h
+++ b/cpp/src/qpid/client/FutureResponse.h
@@ -23,6 +23,7 @@
#define _FutureResponse_
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/MethodHolder.h"
#include "FutureCompletion.h"
namespace qpid {
@@ -30,11 +31,10 @@ namespace client {
class FutureResponse : public FutureCompletion
{
- framing::AMQMethodBody::shared_ptr response;
-
+ framing::MethodHolder response;
public:
- framing::AMQMethodBody::shared_ptr getResponse();
- void received(framing::AMQMethodBody::shared_ptr response);
+ framing::AMQMethodBody* getResponse();
+ void received(framing::AMQMethodBody* response);
};
}}
diff --git a/cpp/src/qpid/client/ReceivedContent.cpp b/cpp/src/qpid/client/ReceivedContent.cpp
index 9cfee21c3c..5a1f48901a 100644
--- a/cpp/src/qpid/client/ReceivedContent.cpp
+++ b/cpp/src/qpid/client/ReceivedContent.cpp
@@ -20,6 +20,7 @@
*/
#include "ReceivedContent.h"
+#include "qpid/framing/all_method_bodies.h"
using qpid::client::ReceivedContent;
using namespace qpid::framing;
@@ -27,9 +28,9 @@ using namespace boost;
ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {}
-void ReceivedContent::append(AMQBody::shared_ptr part)
+void ReceivedContent::append(AMQBody* part)
{
- parts.push_back(part);
+ parts.push_back(AMQFrame(ProtocolVersion(), 0, part));
}
bool ReceivedContent::isComplete() const
@@ -37,7 +38,7 @@ bool ReceivedContent::isComplete() const
if (parts.empty()) {
return false;
} else if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- AMQHeaderBody::shared_ptr headers(getHeaders());
+ const AMQHeaderBody* headers(getHeaders());
return headers && headers->getContentSize() == getContentSize();
} else if (isA<MessageTransferBody>()) {
//no longer support references, headers and data are still method fields
@@ -48,14 +49,14 @@ bool ReceivedContent::isComplete() const
}
-AMQMethodBody::shared_ptr ReceivedContent::getMethod() const
+const AMQMethodBody* ReceivedContent::getMethod() const
{
- return parts.empty() ? AMQMethodBody::shared_ptr() : dynamic_pointer_cast<AMQMethodBody>(parts[0]);
+ return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody());
}
-AMQHeaderBody::shared_ptr ReceivedContent::getHeaders() const
+const AMQHeaderBody* ReceivedContent::getHeaders() const
{
- return parts.size() < 2 ? AMQHeaderBody::shared_ptr() : dynamic_pointer_cast<AMQHeaderBody>(parts[1]);
+ return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody());
}
uint64_t ReceivedContent::getContentSize() const
@@ -63,7 +64,7 @@ uint64_t ReceivedContent::getContentSize() const
if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
uint64_t size(0);
for (uint i = 2; i < parts.size(); i++) {
- size += parts[i]->size();
+ size += parts[i].getBody()->size();
}
return size;
} else if (isA<MessageTransferBody>()) {
@@ -78,7 +79,7 @@ std::string ReceivedContent::getContent() const
if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
string data;
for (uint i = 2; i < parts.size(); i++) {
- data += dynamic_pointer_cast<AMQContentBody>(parts[i])->getData();
+ data += static_cast<const AMQContentBody*>(parts[i].getBody())->getData();
}
return data;
} else if (isA<MessageTransferBody>()) {
@@ -93,7 +94,7 @@ void ReceivedContent::populate(Message& msg)
if (!isComplete()) throw Exception("Incomplete message");
if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(getHeaders()->getProperties());
+ const BasicHeaderProperties* properties = dynamic_cast<const BasicHeaderProperties*>(getHeaders()->getProperties());
BasicHeaderProperties::copy<Message, BasicHeaderProperties>(msg, *properties);
msg.setData(getContent());
} else if (isA<MessageTransferBody>()) {
diff --git a/cpp/src/qpid/client/ReceivedContent.h b/cpp/src/qpid/client/ReceivedContent.h
index 1886034f9b..4f84039c10 100644
--- a/cpp/src/qpid/client/ReceivedContent.h
+++ b/cpp/src/qpid/client/ReceivedContent.h
@@ -20,8 +20,8 @@
*/
#include <string>
#include <vector>
-#include <boost/shared_ptr.hpp>
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/SequenceNumber.h"
#include "ClientMessage.h"
@@ -38,37 +38,29 @@ namespace client {
class ReceivedContent
{
const framing::SequenceNumber id;
- std::vector<framing::AMQBody::shared_ptr> parts;
+ std::vector<framing::AMQFrame> parts;
public:
typedef boost::shared_ptr<ReceivedContent> shared_ptr;
ReceivedContent(const framing::SequenceNumber& id);
- void append(framing::AMQBody::shared_ptr part);
+ void append(framing::AMQBody* part);
bool isComplete() const;
uint64_t getContentSize() const;
std::string getContent() const;
- framing::AMQMethodBody::shared_ptr getMethod() const;
- framing::AMQHeaderBody::shared_ptr getHeaders() const;
+ const framing::AMQMethodBody* getMethod() const;
+ const framing::AMQHeaderBody* getHeaders() const;
template <class T> bool isA() const {
- framing::AMQMethodBody::shared_ptr method = getMethod();
- if (!method) {
- return false;
- } else {
- return method->isA<T>();
- }
+ const framing::AMQMethodBody* method=getMethod();
+ return method && method->isA<T>();
}
- template <class T> boost::shared_ptr<T> as() const {
- framing::AMQMethodBody::shared_ptr method = getMethod();
- if (method && method->isA<T>()) {
- return boost::dynamic_pointer_cast<T>(method);
- } else {
- return boost::shared_ptr<T>();
- }
+ template <class T> const T* as() const {
+ const framing::AMQMethodBody* method=getMethod();
+ return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0;
}
const framing::SequenceNumber& getId() const { return id; }
diff --git a/cpp/src/qpid/client/Response.h b/cpp/src/qpid/client/Response.h
index 745d4648ad..ad37d7c0f4 100644
--- a/cpp/src/qpid/client/Response.h
+++ b/cpp/src/qpid/client/Response.h
@@ -38,12 +38,12 @@ public:
template <class T> T& as()
{
- framing::AMQMethodBody::shared_ptr response(future->getResponse());
+ framing::AMQMethodBody* response(future->getResponse());
return dynamic_cast<T&>(*response);
}
template <class T> bool isA()
{
- framing::AMQMethodBody::shared_ptr response(future->getResponse());
+ framing::AMQMethodBody* response(future->getResponse());
return response && response->isA<T>();
}
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index 391dcd909d..f7ed7416cd 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -44,7 +44,7 @@ void SessionCore::flush()
l3.sendFlush();
}
-Response SessionCore::send(AMQMethodBody::shared_ptr method, bool expectResponse)
+Response SessionCore::send(const AMQMethodBody& method, bool expectResponse)
{
boost::shared_ptr<FutureResponse> f(futures.createResponse());
if (expectResponse) {
@@ -59,7 +59,7 @@ Response SessionCore::send(AMQMethodBody::shared_ptr method, bool expectResponse
return Response(f);
}
-Response SessionCore::send(AMQMethodBody::shared_ptr method, const MethodContent& content, bool expectResponse)
+Response SessionCore::send(const AMQMethodBody& method, const MethodContent& content, bool expectResponse)
{
//TODO: lots of duplication between these two send methods; refactor
boost::shared_ptr<FutureResponse> f(futures.createResponse());
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
index 15cd36114f..bcbaf0028d 100644
--- a/cpp/src/qpid/client/SessionCore.h
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -47,8 +47,8 @@ public:
typedef boost::shared_ptr<SessionCore> shared_ptr;
SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
- Response send(framing::AMQMethodBody::shared_ptr method, bool expectResponse = false);
- Response send(framing::AMQMethodBody::shared_ptr method, const framing::MethodContent& content, bool expectResponse = false);
+ Response send(const framing::AMQMethodBody& method, bool expectResponse = false);
+ Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false);
ReceivedContent::shared_ptr get();
uint16_t getId() const { return id; }
void setSync(bool);
diff --git a/cpp/src/qpid/cluster/ClassifierHandler.cpp b/cpp/src/qpid/cluster/ClassifierHandler.cpp
index 1cce126800..9410a3cc38 100644
--- a/cpp/src/qpid/cluster/ClassifierHandler.cpp
+++ b/cpp/src/qpid/cluster/ClassifierHandler.cpp
@@ -36,7 +36,7 @@ typedef uint32_t FullMethodId; // Combind class & method ID.
FullMethodId fullId(ClassId c, MethodId m) { return c<<16+m; }
-FullMethodId fullId(const shared_ptr<AMQMethodBody>& body) {
+FullMethodId fullId(const AMQMethodBody*& body) {
return fullId(body->amqpClassId(), body->amqpMethodId());
}
@@ -59,8 +59,8 @@ void ClassifierHandler::handle(AMQFrame& frame) {
// TODO aconway 2007-07-03: Flatten the frame hierarchy so we
// can do a single lookup to dispatch a frame.
Chain chosen;
- shared_ptr<AMQMethodBody> method =
- dynamic_pointer_cast<AMQMethodBody>(frame.getBody());
+ AMQMethodBody* method = dynamic_cast<AMQMethodBody*>(frame.getBody());
+
// FIXME aconway 2007-07-05: Need to stop bypassed frames
// from overtaking mcast frames.
//
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index e5af237bd2..d9f1679fdf 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -95,7 +95,7 @@ void Cluster::handle(AMQFrame& frame) {
void Cluster::notify() {
AMQFrame frame(ProtocolVersion(), 0,
- new ClusterNotifyBody(ProtocolVersion(), url));
+ ClusterNotifyBody(ProtocolVersion(), url));
handle(frame);
}
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp
index a4efa939fc..b8290ad953 100644
--- a/cpp/src/qpid/cluster/SessionManager.cpp
+++ b/cpp/src/qpid/cluster/SessionManager.cpp
@@ -36,7 +36,7 @@ using namespace sys;
using namespace broker;
/** Handler to send frames direct to local broker (bypass correlation etc.) */
- struct BrokerHandler : public FrameHandler, private ChannelAdapter, private DeliveryAdapter {
+struct BrokerHandler : public FrameHandler, private ChannelAdapter, private DeliveryAdapter {
Connection connection;
Channel channel;
BrokerAdapter adapter;
@@ -61,13 +61,13 @@ using namespace broker;
}
// Dummy methods.
- virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>){}
- virtual void handleContent(boost::shared_ptr<AMQContentBody>){}
- virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>){}
+ virtual void handleHeader(AMQHeaderBody*){}
+ virtual void handleContent(AMQContentBody*){}
+ virtual void handleHeartbeat(AMQHeartbeatBody*){}
virtual bool isOpen() const{ return true; }
- virtual void handleMethod(shared_ptr<AMQMethodBody>){}
+ virtual void handleMethod(AMQMethodBody*){}
// No-op send.
- virtual void send(shared_ptr<AMQBody>) {}
+ virtual void send(const AMQBody&) {}
//delivery adapter methods, also no-ops:
virtual DeliveryId deliver(Message::shared_ptr&, DeliveryToken::shared_ptr) { return 0; }
diff --git a/cpp/src/qpid/framing/AMQBody.h b/cpp/src/qpid/framing/AMQBody.h
index eaa568c06a..0b2718bef3 100644
--- a/cpp/src/qpid/framing/AMQBody.h
+++ b/cpp/src/qpid/framing/AMQBody.h
@@ -1,3 +1,6 @@
+#ifndef QPID_FRAMING_AMQBODY_H
+#define QPID_FRAMING_AMQBODY_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,42 +21,58 @@
* under the License.
*
*/
-#include <boost/shared_ptr.hpp>
-#include "amqp_types.h"
-#include "Buffer.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/shared_ptr.h"
-#ifndef _AMQBody_
-#define _AMQBody_
+#include <ostream>
namespace qpid {
- namespace framing {
+namespace framing {
+
+class Buffer;
+
+class AMQMethodBody;
+class AMQHeaderBody;
+class AMQContentBody;
+class AMQHeartbeatBody;
+
+struct AMQBodyConstVisitor {
+ virtual ~AMQBodyConstVisitor() {}
+ virtual void visit(const AMQHeaderBody&) = 0;
+ virtual void visit(const AMQContentBody&) = 0;
+ virtual void visit(const AMQHeartbeatBody&) = 0;
+ virtual void visit(const AMQMethodBody&) = 0;
+};
+
+class AMQBody
+{
+ public:
+ typedef shared_ptr<AMQBody> shared_ptr;
+
+ virtual ~AMQBody();
+
+ virtual uint8_t type() const = 0;
- class AMQBody
- {
- public:
- typedef boost::shared_ptr<AMQBody> shared_ptr;
+ virtual void encode(Buffer& buffer) const = 0;
+ virtual void decode(Buffer& buffer, uint32_t=0) = 0;
+ virtual uint32_t size() const = 0;
- virtual ~AMQBody();
- virtual uint32_t size() const = 0;
- virtual uint8_t type() const = 0;
- virtual void encode(Buffer& buffer) const = 0;
- virtual void decode(Buffer& buffer, uint32_t size) = 0;
+ virtual void print(std::ostream& out) const = 0;
+ virtual void accept(AMQBodyConstVisitor&) const = 0;
- virtual void print(std::ostream& out) const = 0;
- };
+ virtual AMQMethodBody* getMethod() { return 0; }
+ virtual const AMQMethodBody* getMethod() const { return 0; }
+};
- std::ostream& operator<<(std::ostream& out, const AMQBody& body) ;
+std::ostream& operator<<(std::ostream& out, const AMQBody& body) ;
- enum BodyTypes {
- METHOD_BODY = 1,
- HEADER_BODY = 2,
- CONTENT_BODY = 3,
- HEARTBEAT_BODY = 8,
- REQUEST_BODY = 9,
- RESPONSE_BODY = 10
- };
- }
-}
+enum BodyTypes {
+ METHOD_BODY = 1,
+ HEADER_BODY = 2,
+ CONTENT_BODY = 3,
+ HEARTBEAT_BODY = 8
+};
+}} // namespace qpid::framing
-#endif
+#endif /*!QPID_FRAMING_AMQBODY_H*/
diff --git a/cpp/src/qpid/framing/AMQContentBody.cpp b/cpp/src/qpid/framing/AMQContentBody.cpp
index 19d2e5714a..c472af555d 100644
--- a/cpp/src/qpid/framing/AMQContentBody.cpp
+++ b/cpp/src/qpid/framing/AMQContentBody.cpp
@@ -12,7 +12,7 @@
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+n * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
diff --git a/cpp/src/qpid/framing/AMQContentBody.h b/cpp/src/qpid/framing/AMQContentBody.h
index 6d4c1ef4b6..a476796015 100644
--- a/cpp/src/qpid/framing/AMQContentBody.h
+++ b/cpp/src/qpid/framing/AMQContentBody.h
@@ -28,13 +28,11 @@
namespace qpid {
namespace framing {
-class AMQContentBody : public AMQBody
+class AMQContentBody : public AMQBody
{
string data;
public:
- typedef boost::shared_ptr<AMQContentBody> shared_ptr;
-
AMQContentBody();
AMQContentBody(const string& data);
inline virtual ~AMQContentBody(){}
@@ -44,6 +42,7 @@ public:
void encode(Buffer& buffer) const;
void decode(Buffer& buffer, uint32_t size);
void print(std::ostream& out) const;
+ void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
};
}
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index f79eae3524..780af71be4 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -1,4 +1,3 @@
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,46 +18,70 @@
* under the License.
*
*/
-#include <boost/format.hpp>
-
#include "AMQFrame.h"
+
#include "qpid/QpidError.h"
-#include "AMQMethodBody.h"
+#include "qpid/framing/variant.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include <boost/format.hpp>
+
+#include <iostream>
namespace qpid {
namespace framing {
+namespace {
+struct GetBodyVisitor : public NoBlankVisitor<AMQBody*> {
+ QPID_USING_NOBLANK(AMQBody*);
+ AMQBody* operator()(MethodHolder& t) const { return t.get(); }
+ template <class T> AMQBody* operator()(T& t) const { return &t; }
+};
+
+struct EncodeVisitor : public NoBlankVisitor<void> {
+ Buffer& buffer;
+ EncodeVisitor(Buffer& b) : buffer(b) {}
+
+ QPID_USING_NOBLANK(void);
+ template <class T> void operator()(const T& t) const { return t.encode(buffer); }
+};
+
+struct SizeVisitor : public NoBlankVisitor<uint32_t> {
+ QPID_USING_NOBLANK(uint32_t);
+ template <class T> uint32_t operator()(const T& t) const { return t.size(); }
+};
+
+struct DecodeVisitor : public NoBlankVisitor<void> {
+ Buffer& buffer;
+ uint32_t size;
+ DecodeVisitor(Buffer& b, uint32_t s) : buffer(b), size(s) {}
+ QPID_USING_NOBLANK(void);
+ void operator()(MethodHolder& t) const { return t.decode(buffer); }
+ template <class T> void operator()(T& t) const { return t.decode(buffer, size); }
+};
-AMQP_MethodVersionMap AMQFrame::versionMap;
-
-AMQFrame::AMQFrame(ProtocolVersion _version)
- : channel(0), type(0), version(_version)
- {
- assert(version != ProtocolVersion(0,0));
- }
-
-AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, AMQBody* _body) : channel(_channel), body(_body),version(_version) {}
+}
-AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, const AMQBody::shared_ptr& _body) :
- channel(_channel), body(_body), version(_version)
-{}
+AMQBody* AMQFrame::getBody() {
+ return boost::apply_visitor(GetBodyVisitor(), body);
+}
-AMQFrame::~AMQFrame() {}
+const AMQBody* AMQFrame::getBody() const {
+ return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body));
+}
void AMQFrame::encode(Buffer& buffer)
{
- buffer.putOctet(body->type());
+ buffer.putOctet(getBody()->type());
buffer.putShort(channel);
- buffer.putLong(body->size());
- body->encode(buffer);
+ buffer.putLong(boost::apply_visitor(SizeVisitor(), body));
+ boost::apply_visitor(EncodeVisitor(buffer), body);
buffer.putOctet(0xCE);
}
uint32_t AMQFrame::size() const{
- assert(body.get());
- return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size()
- + 1/*0xCE*/;
+ return 1/*type*/ + 2/*channel*/ + 4/*body size*/ +
+ boost::apply_visitor(SizeVisitor(), body) + 1/*0xCE*/;
}
bool AMQFrame::decode(Buffer& buffer)
@@ -66,56 +89,42 @@ bool AMQFrame::decode(Buffer& buffer)
if(buffer.available() < 7)
return false;
buffer.record();
- uint32_t frameSize = decodeHead(buffer);
- if(buffer.available() < frameSize + 1){
+
+ uint8_t type = buffer.getOctet();
+ channel = buffer.getShort();
+ uint32_t size = buffer.getLong();
+
+ if(buffer.available() < size+1){
buffer.restore();
return false;
}
- decodeBody(buffer, frameSize);
+ decodeBody(buffer, size, type);
uint8_t end = buffer.getOctet();
if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
return true;
}
-uint32_t AMQFrame::decodeHead(Buffer& buffer){
- type = buffer.getOctet();
- channel = buffer.getShort();
- return buffer.getLong();
-}
-
-void AMQFrame::decodeBody(Buffer& buffer, uint32_t size)
+void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type)
{
switch(type)
{
- case METHOD_BODY:
- body = AMQMethodBody::create(versionMap, version, buffer);
- break;
- case HEADER_BODY:
- body = AMQBody::shared_ptr(new AMQHeaderBody());
- break;
- case CONTENT_BODY:
- body = AMQBody::shared_ptr(new AMQContentBody());
- break;
- case HEARTBEAT_BODY:
- body = AMQBody::shared_ptr(new AMQHeartbeatBody());
- break;
+ case METHOD_BODY: body = MethodHolder(); break;
+ case HEADER_BODY: body = AMQHeaderBody(); break;
+ case CONTENT_BODY: body = AMQContentBody(); break;
+ case HEARTBEAT_BODY: body = AMQHeartbeatBody(); break;
+
default:
THROW_QPID_ERROR(
FRAMING_ERROR,
boost::format("Unknown frame type %d") % type);
}
- body->decode(buffer, size);
+ boost::apply_visitor(DecodeVisitor(buffer,size), body);
}
-std::ostream& operator<<(std::ostream& out, const AMQFrame& t)
+std::ostream& operator<<(std::ostream& out, const AMQFrame& f)
{
- out << "Frame[channel=" << t.channel << "; ";
- if (t.body.get() == 0)
- out << "empty";
- else
- out << *t.body;
- out << "]";
- return out;
+ return out << "Frame[channel=" << f.getChannel() << "; " << *f.getBody()
+ << "]";
}
diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h
index 16c1427802..9e825a9936 100644
--- a/cpp/src/qpid/framing/AMQFrame.h
+++ b/cpp/src/qpid/framing/AMQFrame.h
@@ -21,55 +21,76 @@
* under the License.
*
*/
-#include <boost/cast.hpp>
-
-#include "amqp_types.h"
-#include "AMQBody.h"
#include "AMQDataBlock.h"
-#include "AMQMethodBody.h"
#include "AMQHeaderBody.h"
#include "AMQContentBody.h"
#include "AMQHeartbeatBody.h"
-#include "qpid/framing/AMQP_MethodVersionMap.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/shared_ptr.h"
+#include "MethodHolder.h"
+#include "ProtocolVersion.h"
+
+#include <boost/cast.hpp>
+#include <boost/variant.hpp>
namespace qpid {
namespace framing {
-
class AMQFrame : public AMQDataBlock
{
public:
- AMQFrame(ProtocolVersion _version = highestProtocolVersion);
- AMQFrame(ProtocolVersion _version, uint16_t channel, AMQBody* body);
- AMQFrame(ProtocolVersion _version, uint16_t channel, const AMQBody::shared_ptr& body);
- virtual ~AMQFrame();
- virtual void encode(Buffer& buffer);
- virtual bool decode(Buffer& buffer);
- virtual uint32_t size() const;
- uint16_t getChannel() const { return channel; }
-
- shared_ptr<AMQBody> getBody() { return body; }
- void setBody(const shared_ptr<AMQBody>& b) { body = b; }
-
- /** Convenience template to cast the body to an expected type */
- template <class T> boost::shared_ptr<T> castBody() {
- assert(dynamic_cast<T*>(getBody().get()));
- boost::static_pointer_cast<T>(getBody());
+ AMQFrame(ProtocolVersion=ProtocolVersion()) {}
+
+ /** Construct a frame with a copy of b */
+ AMQFrame(ProtocolVersion, ChannelId c, const AMQBody* b) : channel(c) {
+ setBody(*b);
+ }
+
+ AMQFrame(ProtocolVersion, ChannelId c, const AMQBody& b) : channel(c) {
+ setBody(b);
}
+
+ ChannelId getChannel() const { return channel; }
+ void setChannel(ChannelId c) { channel = c; }
- uint32_t decodeHead(Buffer& buffer);
- void decodeBody(Buffer& buffer, uint32_t size);
+ AMQBody* getBody();
+ const AMQBody* getBody() const;
- uint16_t channel;
- uint8_t type;
- AMQBody::shared_ptr body;
- ProtocolVersion version;
+ /** Copy a body instance to the frame */
+ void setBody(const AMQBody& b) { CopyVisitor cv(*this); b.accept(cv); }
+
+ /** Convenience template to cast the body to an expected type. */
+ template <class T> T* castBody() {
+ boost::polymorphic_downcast<T*>(getBody());
+ }
+
+ bool empty() { return boost::get<boost::blank>(&body); }
+
+ void encode(Buffer& buffer);
+ bool decode(Buffer& buffer);
+ uint32_t size() const;
private:
- static AMQP_MethodVersionMap versionMap;
+ struct CopyVisitor : public AMQBodyConstVisitor {
+ AMQFrame& frame;
+ CopyVisitor(AMQFrame& f) : frame(f) {}
+ void visit(const AMQHeaderBody& x) { frame.body=x; }
+ void visit(const AMQContentBody& x) { frame.body=x; }
+ void visit(const AMQHeartbeatBody& x) { frame.body=x; }
+ void visit(const AMQMethodBody& x) { frame.body=MethodHolder(x); }
+ };
+ friend struct CopyVisitor;
+
+ typedef boost::variant<boost::blank,
+ AMQHeaderBody,
+ AMQContentBody,
+ AMQHeartbeatBody,
+ MethodHolder> Variant;
+
+ void visit(AMQHeaderBody& x) { body=x; }
+
+ void decodeBody(Buffer& buffer, uint32_t size, uint8_t type);
+
+ uint16_t channel;
+ Variant body;
};
std::ostream& operator<<(std::ostream&, const AMQFrame&);
diff --git a/cpp/src/qpid/framing/AMQHeaderBody.cpp b/cpp/src/qpid/framing/AMQHeaderBody.cpp
index 4d3611eb40..6a3c8f27d1 100644
--- a/cpp/src/qpid/framing/AMQHeaderBody.cpp
+++ b/cpp/src/qpid/framing/AMQHeaderBody.cpp
@@ -22,54 +22,34 @@
#include "qpid/QpidError.h"
#include "BasicHeaderProperties.h"
-qpid::framing::AMQHeaderBody::AMQHeaderBody(int classId) : weight(0), contentSize(0){
- createProperties(classId);
-}
+qpid::framing::AMQHeaderBody::AMQHeaderBody(int) : weight(0), contentSize(0) {}
-qpid::framing::AMQHeaderBody::AMQHeaderBody() : properties(0), weight(0), contentSize(0){
-}
+qpid::framing::AMQHeaderBody::AMQHeaderBody() : weight(0), contentSize(0){}
-qpid::framing::AMQHeaderBody::~AMQHeaderBody(){
- delete properties;
-}
+qpid::framing::AMQHeaderBody::~AMQHeaderBody(){}
uint32_t qpid::framing::AMQHeaderBody::size() const{
- return 12 + properties->size();
+ return 12 + properties.size();
}
void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const {
- buffer.putShort(properties->classId());
+ buffer.putShort(properties.classId());
buffer.putShort(weight);
buffer.putLongLong(contentSize);
- properties->encode(buffer);
+ properties.encode(buffer);
}
void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t bufSize){
- uint16_t classId = buffer.getShort();
+ buffer.getShort(); // Ignore classId
weight = buffer.getShort();
contentSize = buffer.getLongLong();
- createProperties(classId);
- properties->decode(buffer, bufSize - 12);
-}
-
-void qpid::framing::AMQHeaderBody::createProperties(int classId){
- switch(classId){
- case BASIC:
- properties = new qpid::framing::BasicHeaderProperties();
- break;
- default:
- THROW_QPID_ERROR(FRAMING_ERROR, "Unknown header class");
- }
+ properties.decode(buffer, bufSize - 12);
}
void qpid::framing::AMQHeaderBody::print(std::ostream& out) const
{
out << "header (" << size() << " bytes)" << " content_size=" << getContentSize();
- const BasicHeaderProperties* props =
- dynamic_cast<const BasicHeaderProperties*>(getProperties());
- if (props) {
- out << ", message_id=" << props->getMessageId();
- out << ", delivery_mode=" << (int) props->getDeliveryMode();
- out << ", headers=" << const_cast<BasicHeaderProperties*>(props)->getHeaders();
- }
+ out << ", message_id=" << properties.getMessageId();
+ out << ", delivery_mode=" << (int) properties.getDeliveryMode();
+ out << ", headers=" << properties.getHeaders();
}
diff --git a/cpp/src/qpid/framing/AMQHeaderBody.h b/cpp/src/qpid/framing/AMQHeaderBody.h
index 691ceeff73..894936060c 100644
--- a/cpp/src/qpid/framing/AMQHeaderBody.h
+++ b/cpp/src/qpid/framing/AMQHeaderBody.h
@@ -21,7 +21,7 @@
#include "amqp_types.h"
#include "AMQBody.h"
#include "Buffer.h"
-#include "HeaderProperties.h"
+#include "BasicHeaderProperties.h"
#ifndef _AMQHeaderBody_
#define _AMQHeaderBody_
@@ -31,19 +31,15 @@ namespace framing {
class AMQHeaderBody : public AMQBody
{
- HeaderProperties* properties;
+ BasicHeaderProperties properties;
uint16_t weight;
uint64_t contentSize;
-
- void createProperties(int classId);
-public:
- typedef boost::shared_ptr<AMQHeaderBody> shared_ptr;
-
+ public:
AMQHeaderBody(int classId);
AMQHeaderBody();
inline uint8_t type() const { return HEADER_BODY; }
- HeaderProperties* getProperties(){ return properties; }
- const HeaderProperties* getProperties() const { return properties; }
+ BasicHeaderProperties* getProperties(){ return &properties; }
+ const BasicHeaderProperties* getProperties() const { return &properties; }
inline uint64_t getContentSize() const { return contentSize; }
inline void setContentSize(uint64_t _size) { contentSize = _size; }
virtual ~AMQHeaderBody();
@@ -51,6 +47,8 @@ public:
virtual void encode(Buffer& buffer) const;
virtual void decode(Buffer& buffer, uint32_t size);
virtual void print(std::ostream& out) const;
+
+ void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
};
}
diff --git a/cpp/src/qpid/framing/AMQHeartbeatBody.h b/cpp/src/qpid/framing/AMQHeartbeatBody.h
index 4c046b81a7..a2701c3398 100644
--- a/cpp/src/qpid/framing/AMQHeartbeatBody.h
+++ b/cpp/src/qpid/framing/AMQHeartbeatBody.h
@@ -31,14 +31,13 @@ namespace framing {
class AMQHeartbeatBody : public AMQBody
{
public:
- typedef boost::shared_ptr<AMQHeartbeatBody> shared_ptr;
-
virtual ~AMQHeartbeatBody();
inline uint32_t size() const { return 0; }
inline uint8_t type() const { return HEARTBEAT_BODY; }
inline void encode(Buffer& ) const {}
inline void decode(Buffer& , uint32_t /*size*/) {}
virtual void print(std::ostream& out) const;
+ void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
};
}
diff --git a/cpp/src/qpid/framing/AMQMethodBody.cpp b/cpp/src/qpid/framing/AMQMethodBody.cpp
index 0a2720e69a..924d906d43 100644
--- a/cpp/src/qpid/framing/AMQMethodBody.cpp
+++ b/cpp/src/qpid/framing/AMQMethodBody.cpp
@@ -18,51 +18,11 @@
* under the License.
*
*/
-#include "AMQFrame.h"
#include "AMQMethodBody.h"
-#include "qpid/QpidError.h"
-#include "qpid/framing/AMQP_MethodVersionMap.h"
namespace qpid {
namespace framing {
-void AMQMethodBody::encodeId(Buffer& buffer) const{
- buffer.putShort(amqpClassId());
- buffer.putShort(amqpMethodId());
-}
-
-void AMQMethodBody::invoke(AMQP_ServerOperations&){
- assert(0);
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server.");
-}
-
-bool AMQMethodBody::invoke(Invocable*) {
- return false;
-}
-
-AMQMethodBody::shared_ptr AMQMethodBody::create(
- AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
- Buffer& buffer)
-{
- ClassMethodId id;
- id.decode(buffer);
- return AMQMethodBody::shared_ptr(
- versionMap.createMethodBody(
- id.classId, id.methodId, version.getMajor(), version.getMinor()));
-}
-
-void AMQMethodBody::ClassMethodId::decode(Buffer& buffer) {
- classId = buffer.getShort();
- methodId = buffer.getShort();
-}
-
-void AMQMethodBody::decode(Buffer& buffer, uint32_t /*size*/) {
- decodeContent(buffer);
-}
-
-void AMQMethodBody::encode(Buffer& buffer) const {
- encodeId(buffer);
- encodeContent(buffer);
-}
+AMQMethodBody::~AMQMethodBody() {}
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h
index 73c5eb78a6..9c776e143b 100644
--- a/cpp/src/qpid/framing/AMQMethodBody.h
+++ b/cpp/src/qpid/framing/AMQMethodBody.h
@@ -21,65 +21,48 @@
* under the License.
*
*/
-#include <iostream>
#include "amqp_types.h"
#include "AMQBody.h"
-#include "Buffer.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/shared_ptr.h"
+
+#include <ostream>
+
+#include <assert.h>
namespace qpid {
namespace framing {
-class AMQP_MethodVersionMap;
+class Buffer;
+class AMQP_ServerOperations;
+class Invocable;
+class MethodBodyConstVisitor;
-class AMQMethodBody : public AMQBody
-{
+class AMQMethodBody : public AMQBody {
public:
- typedef boost::shared_ptr<AMQMethodBody> shared_ptr;
-
- static shared_ptr create(
- AMQP_MethodVersionMap& map, ProtocolVersion version, Buffer& buf);
-
- ProtocolVersion version;
- uint8_t type() const { return METHOD_BODY; }
- AMQMethodBody(uint8_t major, uint8_t minor) : version(major, minor) {}
- AMQMethodBody(ProtocolVersion ver) : version(ver) {}
- virtual ~AMQMethodBody() {}
- void decode(Buffer&, uint32_t);
- virtual void encode(Buffer& buffer) const;
+ AMQMethodBody() {}
+ AMQMethodBody(uint8_t, uint8_t) {}
+
+ virtual ~AMQMethodBody();
+ virtual void accept(MethodBodyConstVisitor&) const = 0;
+
virtual MethodId amqpMethodId() const = 0;
virtual ClassId amqpClassId() const = 0;
- virtual void invoke(AMQP_ServerOperations&);
- virtual bool invoke(Invocable* target);
+ virtual void invoke(AMQP_ServerOperations&) { assert(0); }
+ virtual bool invoke(Invocable*) { return false; }
- template <class T> bool isA() {
+ template <class T> bool isA() const {
return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID;
}
- /** Return request ID or response correlationID */
- virtual RequestId getRequestId() const { return 0; }
-
- virtual bool isRequest() const { return false; }
- virtual bool isResponse() const { return false; }
-
- static uint32_t baseSize() { return 4; }
- protected:
-
- struct ClassMethodId {
- uint16_t classId;
- uint16_t methodId;
- void decode(Buffer& b);
- };
-
- void encodeId(Buffer& buffer) const;
- virtual void encodeContent(Buffer& buffer) const = 0;
- virtual void decodeContent(Buffer& buffer) = 0;
-
- virtual void printPrefix(std::ostream&) const {}
+ virtual uint32_t size() const = 0;
+ virtual uint8_t type() const { return METHOD_BODY; }
- friend class MethodHolder;
+ AMQMethodBody* getMethod() { return this; }
+ const AMQMethodBody* getMethod() const { return this; }
+ void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
};
diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.h b/cpp/src/qpid/framing/BasicHeaderProperties.h
index a6347b37fd..a8ef401b50 100644
--- a/cpp/src/qpid/framing/BasicHeaderProperties.h
+++ b/cpp/src/qpid/framing/BasicHeaderProperties.h
@@ -57,7 +57,7 @@ class BasicHeaderProperties : public HeaderProperties
virtual void encode(Buffer& buffer) const;
virtual void decode(Buffer& buffer, uint32_t size);
- virtual uint8_t classId() { return BASIC; }
+ virtual uint8_t classId() const { return BASIC; }
string getContentType() const { return contentType; }
string getContentEncoding() const { return contentEncoding; }
diff --git a/cpp/src/qpid/framing/Blob.cpp b/cpp/src/qpid/framing/Blob.cpp
new file mode 100644
index 0000000000..0aaeb4138e
--- /dev/null
+++ b/cpp/src/qpid/framing/Blob.cpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Blob.h"
+
+
+namespace qpid {
+namespace framing {
+
+void BlobHelper<void>::destroy(void*) {}
+void BlobHelper<void>::copy(void*, const void*) {}
+
+}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/Blob.h b/cpp/src/qpid/framing/Blob.h
index 1754e851e5..f89bad55ea 100644
--- a/cpp/src/qpid/framing/Blob.h
+++ b/cpp/src/qpid/framing/Blob.h
@@ -33,6 +33,8 @@
namespace boost {
/**
+ * 0-arg typed_in_place_factory constructor and in_place() override.
+ *
* Boost doesn't provide the 0 arg version since it assumes
* in_place_factory will be used when there is no default ctor.
*/
@@ -48,20 +50,29 @@ typed_in_place_factory0<T> in_place() { return typed_in_place_factory0<T>(); }
} // namespace boost
+
namespace qpid {
namespace framing {
using boost::in_place;
-template <class T>
-void destroyT(void* ptr) { static_cast<T*>(ptr)->~T(); }
-inline void nullDestroy(void*) {}
+template <class T> struct BlobHelper {
+ static void destroy(void* ptr) { static_cast<T*>(ptr)->~T(); }
+ static void copy(void* dest, const void* src) {
+ new (dest) T(*static_cast<const T*>(src));
+ }
+};
+
+template <> struct BlobHelper<void> {
+ static void destroy(void*);
+ static void copy(void* to, const void* from);
+};
/**
* A "blob" is a chunk of memory which can contain a single object at
* a time arbitrary type, provided sizeof(T)<=blob.size(). Blob
* ensures proper construction and destruction of its contents,
- * nothing else.
+ * and proper copying between Blobs, but nothing else.
*
* In particular the user must ensure the blob is big enough for its
* contents and must know the type of object in the blob to cast get().
@@ -75,7 +86,13 @@ class Blob
{
boost::aligned_storage<Size> store;
void (*destroy)(void*);
+ void (*copy)(void*, const void*);
+ template <class T> void setType() {
+ destroy=&BlobHelper<T>::destroy;
+ copy=&BlobHelper<T>::copy;
+ }
+
template<class TypedInPlaceFactory>
void construct (const TypedInPlaceFactory& factory,
const boost::typed_in_place_factory_base* )
@@ -84,16 +101,28 @@ class Blob
assert(sizeof(T) <= Size);
clear(); // Destroy old object.
factory.apply(store.address());
- destroy=&destroyT<T>;
+ setType<T>();
}
public:
/** Construct an empty blob. */
- Blob() : destroy(&nullDestroy) {}
+ Blob() { setType<void>(); }
+
+ /** Copy a blob. */
+ Blob(const Blob& b) { *this = b; }
+
+ /** Assign a blob */
+ Blob& operator=(const Blob& b) {
+ setType<void>(); // Exception safety.
+ b.copy(this->get(), b.get());
+ copy = b.copy;
+ destroy = b.destroy;
+ return *this;
+ }
/** @see construct() */
template<class Expr>
- Blob( const Expr & expr ) : destroy(nullDestroy) { construct(expr,&expr); }
+ Blob( const Expr & expr ) { setType<void>(); construct(expr,&expr); }
~Blob() { clear(); }
@@ -106,7 +135,7 @@ class Blob
/** Copy construct an instance of T into the Blob. */
template<class T>
- Blob& operator=(const T& x) { construct(in_place<T>(x)); }
+ Blob& operator=(const T& x) { construct(in_place<T>(x)); return *this; }
/** Get pointer to blob contents. Caller must know how to cast it. */
void* get() { return store.address(); }
@@ -116,17 +145,15 @@ class Blob
/** Destroy the object in the blob making it empty. */
void clear() {
- void (*saveDestroy)(void*) = destroy; // Exception safety
- destroy = &nullDestroy;
- saveDestroy(store.address());
+ void (*oldDestroy)(void*) = destroy;
+ setType<void>();
+ oldDestroy(store.address());
}
- /** True if there is no object allocated in the blob */
- bool empty() { return destroy==nullDestroy; }
-
static size_t size() { return Size; }
};
+
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp
index 7a7bf22f15..53a01141c1 100644
--- a/cpp/src/qpid/framing/BodyHandler.cpp
+++ b/cpp/src/qpid/framing/BodyHandler.cpp
@@ -25,25 +25,28 @@
#include "AMQContentBody.h"
#include "AMQHeartbeatBody.h"
+#include <boost/cast.hpp>
+
using namespace qpid::framing;
using namespace boost;
BodyHandler::~BodyHandler() {}
-void BodyHandler::handleBody(shared_ptr<AMQBody> body) {
+// TODO aconway 2007-08-13: Replace with visitor.
+void BodyHandler::handleBody(AMQBody* body) {
switch(body->type())
{
case METHOD_BODY:
- handleMethod(shared_polymorphic_cast<AMQMethodBody>(body));
+ handleMethod(polymorphic_downcast<AMQMethodBody*>(body));
break;
case HEADER_BODY:
- handleHeader(shared_polymorphic_cast<AMQHeaderBody>(body));
+ handleHeader(polymorphic_downcast<AMQHeaderBody*>(body));
break;
case CONTENT_BODY:
- handleContent(shared_polymorphic_cast<AMQContentBody>(body));
+ handleContent(polymorphic_downcast<AMQContentBody*>(body));
break;
case HEARTBEAT_BODY:
- handleHeartbeat(shared_polymorphic_cast<AMQHeartbeatBody>(body));
+ handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body));
break;
default:
QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type());
diff --git a/cpp/src/qpid/framing/BodyHandler.h b/cpp/src/qpid/framing/BodyHandler.h
index 07d1658afa..9ded737195 100644
--- a/cpp/src/qpid/framing/BodyHandler.h
+++ b/cpp/src/qpid/framing/BodyHandler.h
@@ -32,6 +32,8 @@ class AMQHeaderBody;
class AMQContentBody;
class AMQHeartbeatBody;
+// TODO aconway 2007-08-10: rework using Visitor pattern?
+
/**
* Interface to handle incoming frame bodies.
* Derived classes provide logic for each frame type.
@@ -39,13 +41,13 @@ class AMQHeartbeatBody;
class BodyHandler {
public:
virtual ~BodyHandler();
- virtual void handleBody(boost::shared_ptr<AMQBody> body);
+ virtual void handleBody(AMQBody* body);
protected:
- virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0;
- virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0;
- virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0;
- virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) = 0;
+ virtual void handleMethod(AMQMethodBody*) = 0;
+ virtual void handleHeader(AMQHeaderBody*) = 0;
+ virtual void handleContent(AMQContentBody*) = 0;
+ virtual void handleHeartbeat(AMQHeartbeatBody*) = 0;
};
}}
diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp
index d61126bc7f..25ff46acdd 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.cpp
+++ b/cpp/src/qpid/framing/ChannelAdapter.cpp
@@ -23,6 +23,9 @@
#include "FrameHandler.h"
#include "qpid/Exception.h"
+#include "AMQMethodBody.h"
+#include "qpid/framing/ConnectionOpenBody.h"
+
using boost::format;
namespace qpid {
@@ -45,7 +48,7 @@ void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v)
handlers.out= make_shared_ptr(new OutputHandlerFrameHandler(out));
}
-void ChannelAdapter::send(shared_ptr<AMQBody> body)
+void ChannelAdapter::send(const AMQBody& body)
{
assertChannelOpen();
AMQFrame frame(getVersion(), getId(), body);
diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h
index 9e418013eb..729f5e7b47 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.h
+++ b/cpp/src/qpid/framing/ChannelAdapter.h
@@ -25,11 +25,11 @@
*
*/
-#include "qpid/shared_ptr.h"
#include "BodyHandler.h"
#include "ProtocolVersion.h"
#include "amqp_types.h"
#include "FrameHandler.h"
+#include "OutputHandler.h"
namespace qpid {
namespace framing {
@@ -66,7 +66,7 @@ class ChannelAdapter : protected BodyHandler {
ChannelId getId() const { return id; }
ProtocolVersion getVersion() const { return version; }
- virtual void send(shared_ptr<AMQBody> body);
+ virtual void send(const AMQBody& body);
virtual bool isOpen() const = 0;
@@ -75,7 +75,7 @@ class ChannelAdapter : protected BodyHandler {
void assertChannelOpen() const;
void assertChannelNotOpen() const;
- virtual void handleMethod(shared_ptr<AMQMethodBody>) = 0;
+ virtual void handleMethod(AMQMethodBody*) = 0;
private:
class ChannelAdapterHandler;
diff --git a/cpp/src/qpid/framing/Frame.cpp b/cpp/src/qpid/framing/Frame.cpp
deleted file mode 100644
index 1ba8112faa..0000000000
--- a/cpp/src/qpid/framing/Frame.cpp
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <boost/format.hpp>
-
-#include "Frame.h"
-#include "qpid/QpidError.h"
-
-
-namespace qpid {
-namespace framing {
-
-namespace {
-struct GetBodyVisitor : public NoBlankVisitor<AMQBody*> {
- QPID_USING_NOBLANK(AMQBody*);
- AMQBody* operator()(MethodHolder& h) const { return h.getMethod(); }
- template <class T> AMQBody* operator()(T& t) const { return &t; }
-};
-}
-
-AMQBody* Frame::getBody() {
- return boost::apply_visitor(GetBodyVisitor(), body);
-}
-
-const AMQBody* Frame::getBody() const {
- return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body));
-}
-
-void Frame::encode(Buffer& buffer)
-{
- buffer.putOctet(getBody()->type());
- buffer.putShort(channel);
- buffer.putLong(getBody()->size());
- getBody()->encode(buffer);
- buffer.putOctet(0xCE);
-}
-
-uint32_t Frame::size() const{
- return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + getBody()->size()
- + 1/*0xCE*/;
-}
-
-bool Frame::decode(Buffer& buffer)
-{
- if(buffer.available() < 7)
- return false;
- buffer.record();
- uint32_t frameSize = decodeHead(buffer);
- if(buffer.available() < frameSize + 1){
- buffer.restore();
- return false;
- }
- decodeBody(buffer, frameSize);
- uint8_t end = buffer.getOctet();
- if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
- return true;
-}
-
-uint32_t Frame::decodeHead(Buffer& buffer){
- type = buffer.getOctet();
- channel = buffer.getShort();
- return buffer.getLong();
-}
-
-void Frame::decodeBody(Buffer& buffer, uint32_t size)
-{
- switch(type)
- {
- case METHOD_BODY:
- case REQUEST_BODY:
- case RESPONSE_BODY: {
- ClassId c=buffer.getShort();
- MethodId m=buffer.getShort();
- body = MethodHolder(c,m);
- break;
- }
- case HEADER_BODY:
- body = AMQHeaderBody();
- break;
- case CONTENT_BODY:
- body = AMQContentBody();
- break;
- case HEARTBEAT_BODY:
- body = AMQHeartbeatBody();
- break;
- default:
- THROW_QPID_ERROR(
- FRAMING_ERROR,
- boost::format("Unknown frame type %d") % type);
- }
- getBody()->decode(buffer, size);
-}
-
-std::ostream& operator<<(std::ostream& out, const Frame& f)
-{
- return out << "Frame[channel=" << f.getChannel() << "; " << f.body << "]";
-}
-
-
-}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/Frame.h b/cpp/src/qpid/framing/Frame.h
deleted file mode 100644
index 02ab15a828..0000000000
--- a/cpp/src/qpid/framing/Frame.h
+++ /dev/null
@@ -1,72 +0,0 @@
-#ifndef QPID_FRAMING_FRAME_H
-#define QPID_FRAMING_FRAME_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "AMQDataBlock.h"
-#include "AMQHeaderBody.h"
-#include "AMQContentBody.h"
-#include "AMQHeartbeatBody.h"
-#include "MethodHolder.h"
-
-namespace qpid {
-namespace framing {
-
-class Frame : public AMQDataBlock {
- public:
- typedef boost::variant<boost::blank,
- AMQHeaderBody,
- AMQContentBody,
- AMQHeartbeatBody,
- MethodHolder> Variant;
-
- Frame(ChannelId channel_=0, const Variant& body_=Variant())
- : body(body_), channel(channel_) {}
-
- void encode(Buffer& buffer);
- bool decode(Buffer& buffer);
- uint32_t size() const;
-
- uint16_t getChannel() const { return channel; }
-
- AMQBody* getBody();
- const AMQBody* getBody() const;
-
- template <class T> T* castBody() {
- return boost::polymorphic_downcast<T*>(getBody());
- }
-
- Variant body;
-
- private:
- uint32_t decodeHead(Buffer& buffer);
- void decodeBody(Buffer& buffer, uint32_t size);
-
- uint8_t type;
- uint16_t channel;
-};
-
-std::ostream& operator<<(std::ostream&, const Frame&);
-
-}} // namespace qpid::framing
-
-
-#endif /*!QPID_FRAMING_FRAME_H*/
diff --git a/cpp/src/qpid/framing/HeaderProperties.h b/cpp/src/qpid/framing/HeaderProperties.h
index ae8b796aa9..0c805922e8 100644
--- a/cpp/src/qpid/framing/HeaderProperties.h
+++ b/cpp/src/qpid/framing/HeaderProperties.h
@@ -34,7 +34,7 @@ namespace framing {
public:
inline virtual ~HeaderProperties(){}
- virtual uint8_t classId() = 0;
+ virtual uint8_t classId() const = 0;
virtual uint32_t size() const = 0;
virtual void encode(Buffer& buffer) const = 0;
virtual void decode(Buffer& buffer, uint32_t size) = 0;
diff --git a/cpp/src/qpid/framing/MethodHolder.cpp b/cpp/src/qpid/framing/MethodHolder.cpp
index 43997e6d55..de8f0da6d4 100644
--- a/cpp/src/qpid/framing/MethodHolder.cpp
+++ b/cpp/src/qpid/framing/MethodHolder.cpp
@@ -23,8 +23,8 @@
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/Buffer.h"
-// Note: MethodHolder::construct is in a separate generated file
-// MethodHolder_construct.cpp
+// Note: MethodHolder::construct is and operator= are code-generated
+// in file MethodHolder_construct.cpp.
using namespace boost;
@@ -35,16 +35,18 @@ void MethodHolder::encode(Buffer& b) const {
const AMQMethodBody* body = get();
b.putShort(body->amqpClassId());
b.putShort(body->amqpMethodId());
- body->encodeContent(b);
+ body->encode(b);
}
void MethodHolder::decode(Buffer& b) {
- construct(std::make_pair(b.getShort(), b.getShort()));
- get()->decodeContent(b);
+ ClassId c=b.getShort();
+ MethodId m=b.getShort();
+ construct(c,m);
+ get()->decode(b);
}
uint32_t MethodHolder::size() const {
- return sizeof(Id)+get()->size();
+ return sizeof(ClassId)+sizeof(MethodId)+get()->size();
}
std::ostream& operator<<(std::ostream& out, const MethodHolder& h) {
diff --git a/cpp/src/qpid/framing/MethodHolder.h b/cpp/src/qpid/framing/MethodHolder.h
index fa14db2f79..59dbb1a708 100644
--- a/cpp/src/qpid/framing/MethodHolder.h
+++ b/cpp/src/qpid/framing/MethodHolder.h
@@ -22,64 +22,74 @@
*
*/
+#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/amqp_types.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/Blob.h"
#include "qpid/framing/MethodHolderMaxSize.h" // Generated file.
+#include <boost/type_traits/is_base_and_derived.hpp>
+#include <boost/utility/enable_if.hpp>
+
#include <utility>
namespace qpid {
namespace framing {
class AMQMethodBody;
+class AMQBody;
class Buffer;
+class MethodHolder;
+std::ostream& operator<<(std::ostream& out, const MethodHolder& h);
+
/**
* Holder for arbitrary method body.
*/
+// TODO aconway 2007-08-14: Fix up naming, this class should really be
+// called AMQMethodBody and use a different name for the root of
+// the concrete method body tree, which should not inherit AMQBody.
+//
class MethodHolder
{
- public:
- typedef std::pair<ClassId, MethodId> Id;
-
- template <class T>static Id idOf() {
- return std::make_pair(T::CLASS_ID, T::METHOD_ID); }
+ template <class T> struct EnableIfMethod:
+ public boost::enable_if<boost::is_base_and_derived<AMQMethodBody,T>,T>
+ {};
+ template <class T> EnableIfMethod<T>& assertMethod(T& t) { return t; }
+
+ public:
MethodHolder() {}
- MethodHolder(const Id& id) { construct(id); }
- MethodHolder(ClassId& c, MethodId& m) { construct(std::make_pair(c,m)); }
+ MethodHolder(ClassId& c, MethodId& m) { construct(c,m); }
- template <class M>
- MethodHolder(const M& m) : blob(m), id(idOf<M>()) {}
+ /** Construct with a copy of a method body. */
+ MethodHolder(const AMQMethodBody& m) { *this = m; }
- template <class M>
- MethodHolder& operator=(const M& m) { blob=m; id=idOf<M>(); return *this; }
+ /** Copy method body into holder. */
+ MethodHolder& operator=(const AMQMethodBody&);
- /** Construct the method body corresponding to Id */
- void construct(const Id&);
+ /** Construct the method body corresponding to class/method id */
+ void construct(ClassId c, MethodId m);
+ uint8_t type() const { return 1; }
void encode(Buffer&) const;
void decode(Buffer&);
uint32_t size() const;
AMQMethodBody* get() {
- return static_cast<AMQMethodBody*>(blob.get());
+ return reinterpret_cast<AMQMethodBody*>(blob.get());
}
const AMQMethodBody* get() const {
- return static_cast<const AMQMethodBody*>(blob.get());
+ return reinterpret_cast<const AMQMethodBody*>(blob.get());
}
- AMQMethodBody* operator* () { return get(); }
- const AMQMethodBody* operator*() const { return get(); }
- AMQMethodBody* operator-> () { return get(); }
- const AMQMethodBody* operator->() const { return get(); }
-
private:
Blob<MAX_METHODBODY_SIZE> blob;
- Id id;
+ class CopyVisitor;
+ friend struct CopyVisitor;
};
-std::ostream& operator<<(std::ostream& out, const MethodHolder& h);
+
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h
index 888c6a7382..eec28333bc 100644
--- a/cpp/src/qpid/framing/amqp_framing.h
+++ b/cpp/src/qpid/framing/amqp_framing.h
@@ -26,7 +26,6 @@
#include "AMQHeaderBody.h"
#include "AMQContentBody.h"
#include "AMQHeartbeatBody.h"
-#include "qpid/framing/AMQP_MethodVersionMap.h"
#include "InputHandler.h"
#include "OutputHandler.h"
#include "InitiationHandler.h"
diff --git a/cpp/src/qpid/framing/amqp_types.h b/cpp/src/qpid/framing/amqp_types.h
index 5ea08a69af..bfd5b2206f 100644
--- a/cpp/src/qpid/framing/amqp_types.h
+++ b/cpp/src/qpid/framing/amqp_types.h
@@ -44,8 +44,6 @@ namespace framing {
using std::string;
typedef uint8_t FrameType;
typedef uint16_t ChannelId;
-typedef uint64_t RequestId;
-typedef uint64_t ResponseId;
typedef uint32_t BatchOffset;
typedef uint16_t ClassId;
typedef uint16_t MethodId;
diff --git a/cpp/src/qpid/shared_ptr.h b/cpp/src/qpid/shared_ptr.h
index eb5f3f906a..0c933ea6a6 100644
--- a/cpp/src/qpid/shared_ptr.h
+++ b/cpp/src/qpid/shared_ptr.h
@@ -42,6 +42,8 @@ shared_ptr<T> make_shared_ptr(T* ptr, D deleter) {
return shared_ptr<T>(ptr, deleter);
}
+inline void nullDeleter(void const *) {}
+
} // namespace qpid
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index 6ee7a84beb..062564d8ed 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -134,7 +134,9 @@ Socket::recv(void* data, size_t size) const
int Socket::listen(int port, int backlog) const
{
- const int& socket = impl->fd;
+ const int& socket = impl->fd;
+ int yes=1;
+ QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
struct sockaddr_in name;
name.sin_family = AF_INET;
name.sin_port = htons(port);
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index eb67601875..6c9b910637 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -31,6 +31,7 @@
#include "MockChannel.h"
#include "qpid/broker/Connection.h"
#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/ConnectionStartBody.h"
#include <vector>
using namespace boost;
@@ -233,18 +234,18 @@ class BrokerChannelTest : public CppUnit::TestCase
Queue::shared_ptr queue(new Queue("my_queue"));
exchange->bind(queue, "", 0);
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ AMQHeaderBody header(BASIC);
uint64_t contentSize(0);
for (int i = 0; i < 3; i++) {
contentSize += data[i].size();
}
- header->setContentSize(contentSize);
+ header.setContentSize(contentSize);
channel.handlePublish(msg);
- channel.handleHeader(header);
+ channel.handleHeader(&header);
for (int i = 0; i < 3; i++) {
- AMQContentBody::shared_ptr body(new AMQContentBody(data[i]));
- channel.handleContent(body);
+ AMQContentBody body(data[i]);
+ channel.handleContent(&body);
}
Message::shared_ptr msg2 = queue->dequeue();
CPPUNIT_ASSERT_EQUAL(msg, msg2.get());
@@ -312,8 +313,7 @@ class BrokerChannelTest : public CppUnit::TestCase
//there will always be a connection-start frame
CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
- CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
- handler.frames[0].getBody().get()));
+ CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(handler.frames[0].getBody()));
const string data("abcdefghijklmn");
@@ -340,17 +340,17 @@ class BrokerChannelTest : public CppUnit::TestCase
{
BasicMessage* msg = new BasicMessage(
0, exchange, routingKey, false, false);
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(contentSize);
- msg->setHeader(header);
+ AMQHeaderBody header(BASIC);
+ header.setContentSize(contentSize);
+ msg->setHeader(&header);
msg->getHeaderProperties()->setMessageId(messageId);
return msg;
}
void addContent(Message::shared_ptr msg, const string& data)
{
- AMQContentBody::shared_ptr body(new AMQContentBody(data));
- msg->addContent(body);
+ AMQContentBody body(data);
+ msg->addContent(&body);
}
};
diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp
index 811fc0133e..2d23b87627 100644
--- a/cpp/src/tests/Cluster.cpp
+++ b/cpp/src/tests/Cluster.cpp
@@ -33,7 +33,7 @@ static const ProtocolVersion VER;
/** Verify membership in a cluster with one member. */
BOOST_AUTO_TEST_CASE(testClusterOne) {
TestCluster cluster("clusterOne", "amqp:one:1");
- AMQFrame send(VER, 1, new SessionPingBody(VER));
+ AMQFrame send(VER, 1, SessionPingBody(VER));
cluster.handle(send);
AMQFrame received;
BOOST_REQUIRE(cluster.received.waitPop(received));
@@ -59,7 +59,7 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) {
BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
// Exchange frames with child.
- AMQFrame send(VER, 1, new SessionPingBody(VER));
+ AMQFrame send(VER, 1, SessionPingBody(VER));
cluster.handle(send);
AMQFrame received;
BOOST_REQUIRE(cluster.received.waitPop(received));
@@ -90,8 +90,8 @@ struct CountHandler : public FrameHandler {
/** Test the ClassifierHandler */
BOOST_AUTO_TEST_CASE(testClassifierHandlerWiring) {
- AMQFrame queueDecl(VER, 0, new QueueDeclareBody(VER));
- AMQFrame messageTrans(VER, 0, new MessageTransferBody(VER));
+ AMQFrame queueDecl(VER, 0, QueueDeclareBody(VER));
+ AMQFrame messageTrans(VER, 0, MessageTransferBody(VER));
shared_ptr<CountHandler> wiring(new CountHandler());
shared_ptr<CountHandler> other(new CountHandler());
diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h
index 510a788f7c..366ea92a8b 100644
--- a/cpp/src/tests/Cluster.h
+++ b/cpp/src/tests/Cluster.h
@@ -55,8 +55,6 @@ class TestHandler : public Handler<T&>, public ConcurrentQueue<T>
typedef TestHandler<AMQFrame> TestFrameHandler;
-void nullDeleter(void*) {}
-
struct TestCluster : public Cluster
{
TestCluster(string name, string url)
diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp
index 7bea9d0490..62ccb9bd72 100644
--- a/cpp/src/tests/Cluster_child.cpp
+++ b/cpp/src/tests/Cluster_child.cpp
@@ -40,7 +40,7 @@ void clusterTwo() {
BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *frame.getBody());
BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
- AMQFrame send(VER, 1, new SessionPongBody(VER));
+ AMQFrame send(VER, 1, SessionPongBody(VER));
cluster.handle(send);
BOOST_REQUIRE(cluster.received.waitPop(frame));
BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *frame.getBody());
diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp
index b0aeb9db6f..a0dd8d37f6 100644
--- a/cpp/src/tests/FramingTest.cpp
+++ b/cpp/src/tests/FramingTest.cpp
@@ -18,24 +18,27 @@
* under the License.
*
*/
+#include "InProcessBroker.h"
+#include "qpid/QpidError.h"
+#include "qpid/client/ClientExchange.h"
+#include "qpid/client/ClientQueue.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Connector.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/framing/BasicGetOkBody.h"
#include "qpid/framing/ConnectionRedirectBody.h"
#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/all_method_bodies.h"
#include "qpid/framing/amqp_framing.h"
-#include <iostream>
#include "qpid_test_plugin.h"
+
+#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
+#include <iostream>
+
+#include <memory>
#include <sstream>
#include <typeinfo>
-#include "qpid/QpidError.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include "InProcessBroker.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/Connector.h"
-#include "qpid/client/ClientExchange.h"
-#include "qpid/client/ClientQueue.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include <memory>
-#include <boost/lexical_cast.hpp>
-#include <boost/bind.hpp>
using namespace qpid;
using namespace qpid::framing;
@@ -62,13 +65,11 @@ class FramingTest : public CppUnit::TestCase
CPPUNIT_TEST(testInlineContent);
CPPUNIT_TEST(testContentReference);
CPPUNIT_TEST(testContentValidation);
- CPPUNIT_TEST(testRequestResponseRoundtrip);
CPPUNIT_TEST_SUITE_END();
private:
Buffer buffer;
ProtocolVersion version;
- AMQP_MethodVersionMap versionMap;
public:
@@ -77,10 +78,10 @@ class FramingTest : public CppUnit::TestCase
void testBasicQosBody()
{
BasicQosBody in(version, 0xCAFEBABE, 0xABBA, true);
- in.encodeContent(buffer);
+ in.encode(buffer);
buffer.flip();
BasicQosBody out(version);
- out.decodeContent(buffer);
+ out.decode(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -88,10 +89,10 @@ class FramingTest : public CppUnit::TestCase
{
std::string s = "security credential";
ConnectionSecureBody in(version, s);
- in.encodeContent(buffer);
+ in.encode(buffer);
buffer.flip();
ConnectionSecureBody out(version);
- out.decodeContent(buffer);
+ out.decode(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -100,10 +101,10 @@ class FramingTest : public CppUnit::TestCase
std::string a = "hostA";
std::string b = "hostB";
ConnectionRedirectBody in(version, a, b);
- in.encodeContent(buffer);
+ in.encode(buffer);
buffer.flip();
ConnectionRedirectBody out(version);
- out.decodeContent(buffer);
+ out.decode(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -111,10 +112,10 @@ class FramingTest : public CppUnit::TestCase
{
std::string s = "text";
AccessRequestBody in(version, s, true, false, true, false, true);
- in.encodeContent(buffer);
+ in.encode(buffer);
buffer.flip();
AccessRequestBody out(version);
- out.decodeContent(buffer);
+ out.decode(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -124,10 +125,10 @@ class FramingTest : public CppUnit::TestCase
std::string t = "tag";
BasicConsumeBody in(version, 0, q, t, false, true, false, false,
FieldTable());
- in.encodeContent(buffer);
+ in.encode(buffer);
buffer.flip();
BasicConsumeBody out(version);
- out.decodeContent(buffer);
+ out.decode(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -137,7 +138,7 @@ class FramingTest : public CppUnit::TestCase
std::string a = "hostA";
std::string b = "hostB";
AMQFrame in(version, 999,
- new ConnectionRedirectBody(version, a, b));
+ ConnectionRedirectBody(version, a, b));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -148,7 +149,7 @@ class FramingTest : public CppUnit::TestCase
void testBasicConsumeOkBodyFrame()
{
std::string s = "hostA";
- AMQFrame in(version, 999, new BasicConsumeOkBody(version, s));
+ AMQFrame in(version, 999, BasicConsumeOkBody(version, s));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -211,46 +212,6 @@ class FramingTest : public CppUnit::TestCase
}
- // expect may contain null chars so use string(ptr,size) constructor
- // Use sizeof(expect)-1 to strip the trailing null.
-#define ASSERT_FRAME(expect, frame) \
- CPPUNIT_ASSERT_EQUAL(string(expect, sizeof(expect)-1), boost::lexical_cast<string>(frame))
-
- void testRequestResponseRoundtrip() {
- boost::shared_ptr<broker::InProcessBroker> ibroker(new broker::InProcessBroker(version));
- client::Connection clientConnection(boost::static_pointer_cast<client::Connector>(ibroker));
- clientConnection.open("");
- client::Channel c;
- clientConnection.openChannel(c);
-
- client::Exchange exchange(
- "MyExchange", client::Exchange::TOPIC_EXCHANGE);
- client::Queue queue("MyQueue", true);
- c.declareExchange(exchange);
- c.declareQueue(queue);
- c.bind(exchange, queue, "MyTopic", framing::FieldTable());
- c.close();
- clientConnection.close();
- broker::InProcessBroker::Conversation::const_iterator i = ibroker->conversation.begin();
- ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionOpenOk: knownHosts=]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; ChannelOpen: outOfBand=]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; ChannelOpenOk: ]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=1; rangedExecutionSet={}]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=3; rangedExecutionSet={}]", *i++);
- }
};
diff --git a/cpp/src/tests/InMemoryContentTest.cpp b/cpp/src/tests/InMemoryContentTest.cpp
index cb729f9930..bc95548d45 100644
--- a/cpp/src/tests/InMemoryContentTest.cpp
+++ b/cpp/src/tests/InMemoryContentTest.cpp
@@ -65,9 +65,8 @@ public:
CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size());
for (unsigned int i = 0; i < outCount; i++) {
- AMQContentBody::shared_ptr chunk(
- dynamic_pointer_cast<AMQContentBody>(
- channel.out.frames[i].getBody()));
+ AMQContentBody* chunk = dynamic_cast<AMQContentBody*>(
+ channel.out.frames[i].getBody());
CPPUNIT_ASSERT(chunk);
CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData());
CPPUNIT_ASSERT_EQUAL(
@@ -78,8 +77,8 @@ public:
void addframes(InMemoryContent& content, size_t frameCount, string* frameData)
{
for (unsigned int i = 0; i < frameCount; i++) {
- AMQContentBody::shared_ptr frame(new AMQContentBody(frameData[i]));
- content.add(frame);
+ AMQContentBody frame(frameData[i]);
+ content.add(&frame);
}
}
diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h
index 9f30ee584d..0e5f3895f9 100644
--- a/cpp/src/tests/InProcessBroker.h
+++ b/cpp/src/tests/InProcessBroker.h
@@ -54,7 +54,7 @@ class InProcessBroker : public client::Connector {
template <class MethodType>
MethodType* asMethod() {
- return dynamic_cast<MethodType*>(frame.getBody().get());
+ return dynamic_cast<MethodType*>(frame.getBody());
}
framing::AMQFrame frame;
Sender sender;
diff --git a/cpp/src/tests/LazyLoadedContentTest.cpp b/cpp/src/tests/LazyLoadedContentTest.cpp
index 7bac3613ad..df46f6b48e 100644
--- a/cpp/src/tests/LazyLoadedContentTest.cpp
+++ b/cpp/src/tests/LazyLoadedContentTest.cpp
@@ -97,7 +97,8 @@ public:
CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size());
for (unsigned int i = 0; i < outCount; i++) {
- AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[i].getBody()));
+ AMQContentBody* chunk(dynamic_cast<AMQContentBody*>(
+ channel.out.frames[i].getBody()));
CPPUNIT_ASSERT(chunk);
CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData());
CPPUNIT_ASSERT_EQUAL(
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 67b173af33..c9a08e6075 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -43,7 +43,6 @@ check_PROGRAMS+=Shlib
Shlib_SOURCES=Shlib.cpp
Shlib_LDADD=-lboost_unit_test_framework $(lib_common)
-
# TODO aconway 2007-08-07: Why aren't these tests run automatically?
check_PROGRAMS+=ConcurrentQueue
@@ -54,10 +53,6 @@ check_PROGRAMS+=Serializer
Serializer_SOURCES=Serializer.cpp
Serializer_LDADD=-lboost_unit_test_framework $(lib_common)
-check_PROGRAMS+=Visitor
-Visitor_SOURCES=Visitor.cpp
-Visitor_LDADD=-lboost_unit_test_framework
-
include cluster.mk
#
diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp
index 3d2ee1aaea..a12fc603ce 100644
--- a/cpp/src/tests/MessageBuilderTest.cpp
+++ b/cpp/src/tests/MessageBuilderTest.cpp
@@ -119,12 +119,12 @@ class MessageBuilderTest : public CppUnit::TestCase
Message::shared_ptr message(
new BasicMessage(
0, "test", "my_routing_key", false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(0);
+ AMQHeaderBody header(BASIC);
+ header.setContentSize(0);
builder.initialise(message);
CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(header);
+ builder.setHeader(&header);
CPPUNIT_ASSERT(handler.msg);
CPPUNIT_ASSERT_EQUAL(message, handler.msg);
}
@@ -137,15 +137,15 @@ class MessageBuilderTest : public CppUnit::TestCase
Message::shared_ptr message(
new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(7);
- AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
+ AMQHeaderBody header(BASIC);
+ header.setContentSize(7);
+ AMQContentBody part1(data1);
builder.initialise(message);
CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(header);
+ builder.setHeader(&header);
CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(part1);
+ builder.addContent(&part1);
CPPUNIT_ASSERT(handler.msg);
CPPUNIT_ASSERT_EQUAL(message, handler.msg);
}
@@ -159,18 +159,18 @@ class MessageBuilderTest : public CppUnit::TestCase
Message::shared_ptr message(
new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(14);
- AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
- AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
+ AMQHeaderBody header(BASIC);
+ header.setContentSize(14);
+ AMQContentBody part1(data1);
+ AMQContentBody part2(data2);
builder.initialise(message);
CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(header);
+ builder.setHeader(&header);
CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(part1);
+ builder.addContent(&part1);
CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(part2);
+ builder.addContent(&part2);
CPPUNIT_ASSERT(handler.msg);
CPPUNIT_ASSERT_EQUAL(message, handler.msg);
}
@@ -189,19 +189,19 @@ class MessageBuilderTest : public CppUnit::TestCase
Message::shared_ptr message(
new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(14);
- BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+ AMQHeaderBody header(BASIC);
+ header.setContentSize(14);
+ BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header.getProperties());
properties->setMessageId("MyMessage");
properties->getHeaders().setString("abc", "xyz");
- AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
- AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
+ AMQContentBody part1(data1);
+ AMQContentBody part2(data2);
builder.initialise(message);
- builder.setHeader(header);
- builder.addContent(part1);
- builder.addContent(part2);
+ builder.setHeader(&header);
+ builder.addContent(&part1);
+ builder.addContent(&part2);
CPPUNIT_ASSERT(handler.msg);
CPPUNIT_ASSERT_EQUAL(message, handler.msg);
diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp
index 1e976312be..1fbb18b7d3 100644
--- a/cpp/src/tests/MessageTest.cpp
+++ b/cpp/src/tests/MessageTest.cpp
@@ -47,13 +47,13 @@ class MessageTest : public CppUnit::TestCase
BasicMessage::shared_ptr msg(
new BasicMessage(0, exchange, routingKey, false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(14);
- AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
- AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
- msg->setHeader(header);
- msg->addContent(part1);
- msg->addContent(part2);
+ AMQHeaderBody header(BASIC);
+ header.setContentSize(14);
+ AMQContentBody part1(data1);
+ AMQContentBody part2(data2);
+ msg->setHeader(&header);
+ msg->addContent(&part1);
+ msg->addContent(&part2);
msg->getHeaderProperties()->setMessageId(messageId);
msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
@@ -75,7 +75,8 @@ class MessageTest : public CppUnit::TestCase
MockChannel channel(1);
msg->deliver(channel, "ignore", 0, 100);
CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size());
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[2].getBody()));
+ AMQContentBody* contentBody(
+ dynamic_cast<AMQContentBody*>(channel.out.frames[2].getBody()));
CPPUNIT_ASSERT(contentBody);
CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData());
}
diff --git a/cpp/src/tests/MockChannel.h b/cpp/src/tests/MockChannel.h
index fb2de98d2a..b9a7c0a2a2 100644
--- a/cpp/src/tests/MockChannel.h
+++ b/cpp/src/tests/MockChannel.h
@@ -51,13 +51,10 @@ struct MockChannel : public qpid::framing::ChannelAdapter
bool isOpen() const { return true; }
- void handleHeader(
- boost::shared_ptr<qpid::framing::AMQHeaderBody> b) { send(b); }
- void handleContent(
- boost::shared_ptr<qpid::framing::AMQContentBody> b) { send(b); }
- void handleHeartbeat(
- boost::shared_ptr<qpid::framing::AMQHeartbeatBody> b) { send(b); }
- void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method) { send(method); };
+ void handleHeader(qpid::framing::AMQHeaderBody* b) { send(*b); }
+ void handleContent(qpid::framing::AMQContentBody* b) { send(*b); }
+ void handleHeartbeat(qpid::framing::AMQHeartbeatBody* b) { send(*b); }
+ void handleMethod(qpid::framing::AMQMethodBody* b) { send(*b); };
};
diff --git a/cpp/src/tests/ReferenceTest.cpp b/cpp/src/tests/ReferenceTest.cpp
index b3dd44bf7d..411462564a 100644
--- a/cpp/src/tests/ReferenceTest.cpp
+++ b/cpp/src/tests/ReferenceTest.cpp
@@ -67,17 +67,17 @@ class ReferenceTest : public CppUnit::TestCase
Reference::shared_ptr r1(registry.open("bar"));
- MessageTransferBody::shared_ptr t1(new MessageTransferBody(v));
+ MessageTransferBody t1(v);
// TODO aconway 2007-04-03: hack around lack of generated setters. Clean this up.
- const_cast<framing::Content&>(t1->getBody()) = framing::Content(REFERENCE,"bar");
- MessageMessage::shared_ptr m1(new MessageMessage(0, t1, r1));
+ const_cast<framing::Content&>(t1.getBody()) = framing::Content(REFERENCE,"bar");
+ MessageMessage::shared_ptr m1(new MessageMessage(0, &t1, r1));
- MessageTransferBody::shared_ptr t2(new MessageTransferBody(v));
- const_cast<framing::Content&>(t2->getBody()) = framing::Content(REFERENCE,"bar");
- MessageMessage::shared_ptr m2(new MessageMessage(0, t2, r1));
+ MessageTransferBody t2(v);
+ const_cast<framing::Content&>(t2.getBody()) = framing::Content(REFERENCE,"bar");
+ MessageMessage::shared_ptr m2(new MessageMessage(0, &t2, r1));
- MessageAppendBody::shared_ptr a1(new MessageAppendBody(v));
- MessageAppendBody::shared_ptr a2(new MessageAppendBody(v));
+ MessageAppendBody a1(v);
+ MessageAppendBody a2(v);
r1->addMessage(m1);
r1->addMessage(m2);
@@ -86,12 +86,6 @@ class ReferenceTest : public CppUnit::TestCase
r1->append(a2);
CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getAppends().size());
r1->close();
-
- CPPUNIT_ASSERT_EQUAL(m1->getReference()->getAppends()[0], a1);
- CPPUNIT_ASSERT_EQUAL(m1->getReference()->getAppends()[1], a2);
-
- CPPUNIT_ASSERT_EQUAL(m2->getReference()->getAppends()[0], a1);
- CPPUNIT_ASSERT_EQUAL(m2->getReference()->getAppends()[1], a2);
}
};
diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp
index a5d9eb69a5..24e8aac701 100644
--- a/cpp/src/tests/TxAckTest.cpp
+++ b/cpp/src/tests/TxAckTest.cpp
@@ -70,7 +70,8 @@ public:
for(int i = 0; i < 10; i++){
Message::shared_ptr msg(
new BasicMessage(0, "exchange", "routing_key", false, false));
- msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
+ AMQHeaderBody body(BASIC);
+ msg->setHeader(&body);
msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
messages.push_back(msg);
deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp
index 05c03754f9..3391be5128 100644
--- a/cpp/src/tests/TxPublishTest.cpp
+++ b/cpp/src/tests/TxPublishTest.cpp
@@ -72,7 +72,8 @@ public:
msg(new BasicMessage(0, "exchange", "routing_key", false, false)),
op(msg)
{
- msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
+ AMQHeaderBody body(BASIC);
+ msg->setHeader(&body);
msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
op.deliverTo(queue1);
op.deliverTo(queue2);
diff --git a/cpp/src/tests/Visitor.cpp b/cpp/src/tests/Visitor.cpp
deleted file mode 100644
index 0cb3cf15bb..0000000000
--- a/cpp/src/tests/Visitor.cpp
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/framing/Visitor.h"
-
-#define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*>
-#include <boost/test/auto_unit_test.hpp>
-#include <boost/tuple/tuple.hpp>
-
-using namespace std;
-using namespace qpid::framing;
-
-struct DummyA;
-struct DummyB;
-struct DummyC;
-
-QPID_VISITOR(DummyVisitor, (DummyA)(DummyB)(DummyC));
-
-struct DummyFrame : public VisitableRoot<DummyVisitor> {};
-
-struct DummyA : public Visitable<DummyA, DummyFrame> {};
-struct DummyB : public Visitable<DummyB, DummyFrame> {};
-struct DummyC : public Visitable<DummyC, DummyFrame> {};
-
-struct TestDummyVisitor : public DummyVisitor {
- boost::tuple<DummyA*, DummyB*, DummyC*> dummies;
- void visit(DummyA& a) { dummies.get<0>() = &a; }
- void visit(DummyB& b) { dummies.get<1>() = &b; }
- void visit(DummyC& c) { dummies.get<2>() = &c; }
-};
-
-BOOST_AUTO_TEST_CASE(Visitor_accept) {
- TestDummyVisitor v;
- DummyA a;
- DummyB b;
- DummyC c;
- a.accept(v);
- BOOST_CHECK_EQUAL(&a, v.dummies.get<0>());
- b.accept(v);
- BOOST_CHECK_EQUAL(&b, v.dummies.get<1>());
- c.accept(v);
- BOOST_CHECK_EQUAL(&c, v.dummies.get<2>());
-}