summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-08-16 20:12:33 +0000
committerAlan Conway <aconway@apache.org>2007-08-16 20:12:33 +0000
commit49c7a491c98c26fe7d4f017a7ba655dfc029278c (patch)
tree304d51ba039a5391b4ebde08caab3da978b465fb /cpp
parentdc13ca80ff893f74ab57fee6543de6543aa366bc (diff)
downloadqpid-python-49c7a491c98c26fe7d4f017a7ba655dfc029278c.tar.gz
AMQBodies are no longer allocated on the heap and passed with shared_ptr.
AMQFrame contains a boost::variant of AMQHeaderBody,AMQContentBody, AMQHeatbeatBody, and MethodHolder. A variant is basically a type-safe union, it can allocate any of the types in-place. MethodHolder contains a Blob, a less sophisticated kind of variant, which can contain any of the concrete method body types. Using variants for all the method types causes outrageous compile times and bloated library symbol names. Blob lacks some of the finer features of variant and needs help from generated code. For now both are hidden to the rest of the code base behind AMQFrame and MethodBody classes so if/when we decide to settle on just one "variant" type solution we can do so. This commit touches nearly 100 files, mostly converting method signatures with shared_ptr<FooBody> to FooBody* or FooBody&, and converting stored shared_ptr<AMQBody> to AMQFrame and share_ptr<AMQMethodBody> to MethodHolder. There is one outstanding client memory leak, which I will fix in my next commit. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566822 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java13
-rw-r--r--cpp/gentools/src/org/apache/qpid/gentools/Main.java4
-rw-r--r--cpp/gentools/templ.cpp/AMQP_MethodVersionMap.h.tmpl57
-rw-r--r--cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl24
-rwxr-xr-xcpp/rubygen/amqpgen.rb9
-rwxr-xr-xcpp/rubygen/cppgen.rb9
-rwxr-xr-xcpp/rubygen/templates/MethodBodyConstVisitor.rb27
-rwxr-xr-xcpp/rubygen/templates/MethodHolder.rb37
-rwxr-xr-xcpp/rubygen/templates/Proxy.rb2
-rw-r--r--cpp/rubygen/templates/Session.rb4
-rwxr-xr-xcpp/rubygen/templates/all_method_bodies.rb21
-rw-r--r--cpp/src/Makefile.am8
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp6
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h6
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.cpp59
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.h8
-rw-r--r--cpp/src/qpid/broker/BrokerMessageBase.h13
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.cpp232
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.h15
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.cpp4
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.h8
-rw-r--r--cpp/src/qpid/broker/Content.h2
-rw-r--r--cpp/src/qpid/broker/InMemoryContent.cpp26
-rw-r--r--cpp/src/qpid/broker/InMemoryContent.h7
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.cpp6
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.h2
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp4
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.h4
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp5
-rw-r--r--cpp/src/qpid/broker/Reference.cpp6
-rw-r--r--cpp/src/qpid/broker/Reference.h7
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp24
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h20
-rw-r--r--cpp/src/qpid/client/ChannelHandler.cpp44
-rw-r--r--cpp/src/qpid/client/ChannelHandler.h8
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp10
-rw-r--r--cpp/src/qpid/client/Connection.h1
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp65
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.h8
-rw-r--r--cpp/src/qpid/client/Correlator.cpp2
-rw-r--r--cpp/src/qpid/client/Correlator.h4
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp40
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h4
-rw-r--r--cpp/src/qpid/client/FutureResponse.cpp8
-rw-r--r--cpp/src/qpid/client/FutureResponse.h8
-rw-r--r--cpp/src/qpid/client/ReceivedContent.cpp21
-rw-r--r--cpp/src/qpid/client/ReceivedContent.h28
-rw-r--r--cpp/src/qpid/client/Response.h4
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp4
-rw-r--r--cpp/src/qpid/client/SessionCore.h4
-rw-r--r--cpp/src/qpid/cluster/ClassifierHandler.cpp6
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/SessionManager.cpp12
-rw-r--r--cpp/src/qpid/framing/AMQBody.h77
-rw-r--r--cpp/src/qpid/framing/AMQContentBody.cpp2
-rw-r--r--cpp/src/qpid/framing/AMQContentBody.h5
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp117
-rw-r--r--cpp/src/qpid/framing/AMQFrame.h87
-rw-r--r--cpp/src/qpid/framing/AMQHeaderBody.cpp42
-rw-r--r--cpp/src/qpid/framing/AMQHeaderBody.h16
-rw-r--r--cpp/src/qpid/framing/AMQHeartbeatBody.h3
-rw-r--r--cpp/src/qpid/framing/AMQMethodBody.cpp42
-rw-r--r--cpp/src/qpid/framing/AMQMethodBody.h67
-rw-r--r--cpp/src/qpid/framing/BasicHeaderProperties.h2
-rw-r--r--cpp/src/qpid/framing/Blob.cpp30
-rw-r--r--cpp/src/qpid/framing/Blob.h55
-rw-r--r--cpp/src/qpid/framing/BodyHandler.cpp13
-rw-r--r--cpp/src/qpid/framing/BodyHandler.h12
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.cpp5
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.h6
-rw-r--r--cpp/src/qpid/framing/Frame.cpp117
-rw-r--r--cpp/src/qpid/framing/Frame.h72
-rw-r--r--cpp/src/qpid/framing/HeaderProperties.h2
-rw-r--r--cpp/src/qpid/framing/MethodHolder.cpp14
-rw-r--r--cpp/src/qpid/framing/MethodHolder.h54
-rw-r--r--cpp/src/qpid/framing/amqp_framing.h1
-rw-r--r--cpp/src/qpid/framing/amqp_types.h2
-rw-r--r--cpp/src/qpid/shared_ptr.h2
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp4
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp24
-rw-r--r--cpp/src/tests/Cluster.cpp8
-rw-r--r--cpp/src/tests/Cluster.h2
-rw-r--r--cpp/src/tests/Cluster_child.cpp2
-rw-r--r--cpp/src/tests/FramingTest.cpp93
-rw-r--r--cpp/src/tests/InMemoryContentTest.cpp9
-rw-r--r--cpp/src/tests/InProcessBroker.h2
-rw-r--r--cpp/src/tests/LazyLoadedContentTest.cpp3
-rw-r--r--cpp/src/tests/Makefile.am5
-rw-r--r--cpp/src/tests/MessageBuilderTest.cpp46
-rw-r--r--cpp/src/tests/MessageTest.cpp17
-rw-r--r--cpp/src/tests/MockChannel.h11
-rw-r--r--cpp/src/tests/ReferenceTest.cpp22
-rw-r--r--cpp/src/tests/TxAckTest.cpp3
-rw-r--r--cpp/src/tests/TxPublishTest.cpp3
-rw-r--r--cpp/src/tests/Visitor.cpp58
95 files changed, 934 insertions, 1185 deletions
diff --git a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
index 80ce266f39..114e3308d3 100644
--- a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
+++ b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
@@ -1164,7 +1164,7 @@ public class CppGenerator extends Generator
{
String[] fieldDomainPair = ordinalFieldMap.get(thisOrdinal);
sb.append(indent + "" + setRef(fieldDomainPair[FIELD_CODE_TYPE]) + " get" +
- Utils.firstUpper(fieldDomainPair[FIELD_NAME]) + "() { return " +
+ Utils.firstUpper(fieldDomainPair[FIELD_NAME]) + "() const { return " +
fieldDomainPair[FIELD_NAME] + "; }" + cr);
}
return sb.toString();
@@ -1443,12 +1443,13 @@ public class CppGenerator extends Generator
StringBuffer sb = new StringBuffer();
if (method.fieldMap.size() > 0)
{
- sb.append(indent + thisClass.name + Utils.firstUpper(method.name) + "Body(ProtocolVersion version," + cr);
+ sb.append(indent + thisClass.name + Utils.firstUpper(method.name) + "Body(ProtocolVersion," + cr);
sb.append(generateFieldList(method.fieldMap, version, true, false, 8));
- sb.append(indent + tab + ") : " + baseClass(method, version) + "(version");
- sb.append(")");
- if (method.fieldMap.size() > 0)
- sb.append(", \n" + generateFieldList(method.fieldMap, version, false, true, 8));
+ sb.append(indent + tab + ")");
+ if (method.fieldMap.size() > 0) {
+ sb.append(" : ");
+ sb.append(generateFieldList(method.fieldMap, version, false, true, 8));
+ }
sb.append(indent + "{ }\n");
}
return sb.toString();
diff --git a/cpp/gentools/src/org/apache/qpid/gentools/Main.java b/cpp/gentools/src/org/apache/qpid/gentools/Main.java
index 74e1ce1ab9..a25ddd103c 100644
--- a/cpp/gentools/src/org/apache/qpid/gentools/Main.java
+++ b/cpp/gentools/src/org/apache/qpid/gentools/Main.java
@@ -234,9 +234,7 @@ public class Main
new File(tmplDir + Utils.fileSeparator + "AMQP_ServerOperations.h.tmpl"),
new File(tmplDir + Utils.fileSeparator + "AMQP_ClientOperations.h.tmpl"),
new File(tmplDir + Utils.fileSeparator + "AMQP_Constants.h.tmpl"),
- new File(tmplDir + Utils.fileSeparator + "AMQP_MethodVersionMap.h.tmpl"),
- new File(tmplDir + Utils.fileSeparator + "AMQP_MethodVersionMap.cpp.tmpl"),
- new File(tmplDir + Utils.fileSeparator + "AMQP_HighestVersion.h.tmpl")
+ new File(tmplDir + Utils.fileSeparator + "AMQP_HighestVersion.h.tmpl")
};
methodTemplateFiles = new File[]
{
diff --git a/cpp/gentools/templ.cpp/AMQP_MethodVersionMap.h.tmpl b/cpp/gentools/templ.cpp/AMQP_MethodVersionMap.h.tmpl
deleted file mode 100644
index 83c3065a68..0000000000
--- a/cpp/gentools/templ.cpp/AMQP_MethodVersionMap.h.tmpl
+++ /dev/null
@@ -1,57 +0,0 @@
-&{AMQP_MethodVersionMap.h}
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/*
- * This file is auto-generated by ${GENERATOR} - do not modify.
- * Supported AMQP versions:
-%{VLIST} * ${major}-${minor}
- */
-
-#ifndef qpid_framing_AMQP_MethodVersionMap__
-#define qpid_framing_AMQP_MethodVersionMap__
-
-#include <map>
-#include "qpid/framing/AMQMethodBody.h"
-
-%{MLIST} ${mc_method_body_include}
-
-namespace qpid
-{
-namespace framing
-{
-
-template <class T> AMQMethodBody* createMethodBodyFn(u_int8_t major, u_int8_t minor) { return new T(ProtocolVersion(major, minor)); }
-typedef AMQMethodBody* (*fnPtr)(u_int8_t, u_int8_t);
-
-class AMQP_MethodVersionMap: public std::map<u_int64_t, fnPtr>
-{
-protected:
- u_int64_t createMapKey(u_int16_t classId, u_int16_t methodId, u_int8_t major, u_int8_t minor);
-public:
- AMQP_MethodVersionMap();
- AMQMethodBody* createMethodBody(u_int16_t classId, u_int16_t methodId, u_int8_t major, u_int8_t minor);
-};
-
-} /* namespace framing */
-} /* namespace qpid */
-
-#endif
diff --git a/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl b/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl
index 605b09ac94..5ffd2f2b4d 100644
--- a/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl
+++ b/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl
@@ -38,6 +38,9 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FramingContent.h"
#include "qpid/framing/SequenceNumberSet.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/MethodBodyConstVisitor.h"
+
namespace qpid
{
@@ -45,7 +48,7 @@ namespace framing
{
${version_namespace_start}
-class ${CLASS}${METHOD}Body : public ${mb_base_class}
+class ${CLASS}${METHOD}Body : public AMQMethodBody
{
// Method field declarations
@@ -57,13 +60,11 @@ public:
static const ClassId CLASS_ID= ${CLASS_ID_INIT};
static const MethodId METHOD_ID = ${METHOD_ID_INIT};
- typedef boost::shared_ptr<${CLASS}${METHOD}Body> shared_ptr;
-
- // Constructors and destructors
+ // Constructors and destructors
${mb_constructor_with_initializers}
- ${CLASS}${METHOD}Body(ProtocolVersion version=ProtocolVersion() ): ${mb_base_class}(version) {}
+ ${CLASS}${METHOD}Body(ProtocolVersion=ProtocolVersion()) {}
virtual ~${CLASS}${METHOD}Body() {}
// Attribute get methods
@@ -71,10 +72,11 @@ ${mb_constructor_with_initializers}
%{FLIST} ${mb_field_get_method}
// Helper methods
-
- inline void print(std::ostream& out) const
+ using AMQMethodBody::accept;
+ void accept(MethodBodyConstVisitor& v) const { v.visit(*this); }
+
+ void print(std::ostream& out) const
{
- printPrefix(out);
out << "${CLASS}${METHOD}: ";
%{FLIST} ${mb_field_print}
}
@@ -84,17 +86,17 @@ ${mb_constructor_with_initializers}
u_int32_t size() const
{
- u_int32_t sz = baseSize();
+ u_int32_t sz = 0;
%{FLIST} ${mb_body_size}
return sz;
}
- void encodeContent(Buffer& ${mb_buffer_param}) const
+ void encode(Buffer& ${mb_buffer_param}) const
{
%{FLIST} ${mb_encode}
}
- inline void decodeContent(Buffer& ${mb_buffer_param})
+ inline void decode(Buffer& ${mb_buffer_param}, uint32_t=0)
{
%{FLIST} ${mb_decode}
}
diff --git a/cpp/rubygen/amqpgen.rb b/cpp/rubygen/amqpgen.rb
index a144825f08..333c7dd654 100755
--- a/cpp/rubygen/amqpgen.rb
+++ b/cpp/rubygen/amqpgen.rb
@@ -115,11 +115,7 @@ class AmqpClass < AmqpElement
# chassis should be "client" or "server"
def amqp_methods_on(chassis)
@cache_amqp_methods_on ||= { }
-
- els = elements.collect("method/chassis[@name='#{chassis}']/..") { |m|
- AmqpMethod.new(m,self)
- }.sort_by_name
- @cache_amqp_methods_on[chassis] ||= els
+ @cache_amqp_methods_on[chassis] ||= elements.collect("method/chassis[@name='#{chassis}']/..") { |m| AmqpMethod.new(m,self) }.sort_by_name
end
end
@@ -153,7 +149,8 @@ class AmqpRoot < AmqpElement
end
def amqp_classes()
- @cache_amqp_classes ||= elements.collect("class") { |c| AmqpClass.new(c,self) }.sort_by_name
+ @cache_amqp_classes ||= elements.collect("class") { |c|
+ AmqpClass.new(c,self) }.sort_by_name
end
# Return all methods on all classes.
diff --git a/cpp/rubygen/cppgen.rb b/cpp/rubygen/cppgen.rb
index 724634e514..a3314c7e11 100755
--- a/cpp/rubygen/cppgen.rb
+++ b/cpp/rubygen/cppgen.rb
@@ -63,6 +63,7 @@ end
class AmqpField
def cppname() @cache_cppname ||= name.lcaps.cppsafe; end
def cpptype() @cache_cpptype ||= amqp_root.param_type(field_type); end
+ def cppret_type() @cache_cpptype ||= amqp_root.return_type(field_type); end
def type_name () @type_name ||= cpptype+" "+cppname; end
end
@@ -159,8 +160,12 @@ class CppGen < Generator
def struct_class(type, name, bases, &block)
genl
gen "#{type} #{name}"
- gen ": #{bases.join(', ')}" unless bases.empty?
- scope(" {","};") { yield }
+ if (!bases.empty?)
+ genl ":"
+ indent { gen "#{bases.join(",\n")}" }
+ end
+ genl
+ scope("{","};") { yield }
end
def struct(name, *bases, &block) struct_class("struct", name, bases, &block); end
diff --git a/cpp/rubygen/templates/MethodBodyConstVisitor.rb b/cpp/rubygen/templates/MethodBodyConstVisitor.rb
new file mode 100755
index 0000000000..6fd7fe8ead
--- /dev/null
+++ b/cpp/rubygen/templates/MethodBodyConstVisitor.rb
@@ -0,0 +1,27 @@
+#!/usr/bin/env ruby
+$: << ".." # Include .. in load path
+require 'cppgen'
+
+class MethodBodyConstVisitorGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @namespace="qpid::framing"
+ @classname="MethodBodyConstVisitor"
+ @filename="qpid/framing/MethodBodyConstVisitor"
+ end
+
+ def generate()
+ h_file("#{@filename}") {
+ namespace(@namespace) {
+ @amqp.amqp_methods.each { |m| genl "class #{m.body_name};" }
+ cpp_class("MethodBodyConstVisitor") {
+ genl "public:"
+ genl "virtual ~MethodBodyConstVisitor() {}"
+ @amqp.amqp_methods.each { |m| genl "virtual void visit(const #{m.body_name}&) = 0;" }
+ }}}
+ end
+end
+
+MethodBodyConstVisitorGen.new(Outdir, Amqp).generate();
+
diff --git a/cpp/rubygen/templates/MethodHolder.rb b/cpp/rubygen/templates/MethodHolder.rb
index 65fa824fd4..39a570c982 100755
--- a/cpp/rubygen/templates/MethodHolder.rb
+++ b/cpp/rubygen/templates/MethodHolder.rb
@@ -40,23 +40,42 @@ EOS
def gen_construct
cpp_file(@filename+"_construct") {
include @filename
+ include "qpid/framing/MethodBodyConstVisitor.h"
@amqp.amqp_methods.each { |m| include "qpid/framing/#{m.body_name}" }
genl
- namespace(@namespace) {
- scope("void #{@classname}::construct(const Id& newId) {") {
- scope("switch (newId.first) {") {
+ include "qpid/Exception.h"
+ genl
+ namespace(@namespace) {
+ # construct function
+ scope("void #{@classname}::construct(ClassId c, MethodId m) {") {
+ scope("switch (c) {") {
@amqp.amqp_classes.each { |c|
- scope("case #{c.index}: switch(newId.second) {") {
+ scope("case #{c.index}: switch(m) {") {
c.amqp_methods.each { |m|
genl "case #{m.index}: blob.construct(in_place<#{m.body_name}>()); break;"
- }}
+ }
+ genl "default: throw Exception(QPID_MSG(\"Invalid method id \" << m << \" for class #{c.name} \"));"
+ }
genl "break;"
- }}
- genl "id=newId;";
- }}}
+ }
+ genl "default: throw Exception(QPID_MSG(\"Invalid class id \" << c));"
+ }
+ }
+ # CopyVisitor
+ struct("#{@classname}::CopyVisitor", "public MethodBodyConstVisitor") { genl "MethodHolder& holder;"
+ genl "CopyVisitor(MethodHolder& h) : holder(h) {}"
+ @amqp.amqp_methods.each { |m|
+ genl "void visit(const #{m.body_name}& x) { holder.blob=x; }"
+ }
+ }
+ genl
+ # operator=
+ scope("#{@classname}& MethodHolder::operator=(const AMQMethodBody& m) {") {
+ genl "CopyVisitor cv(*this); m.accept(cv); return *this;"
+ }
+ }}
end
-
def generate
gen_max_size
gen_construct
diff --git a/cpp/rubygen/templates/Proxy.rb b/cpp/rubygen/templates/Proxy.rb
index f36d6bcd99..a6c0763a8a 100755
--- a/cpp/rubygen/templates/Proxy.rb
+++ b/cpp/rubygen/templates/Proxy.rb
@@ -38,7 +38,7 @@ EOS
genl "void #{@classname}::#{cname}::#{m.cppname}(#{m.signature.join(", ")})"
scope {
params=(["channel.getVersion()"]+m.param_names).join(", ")
- genl "channel.send(make_shared_ptr(new #{m.body_name}(#{params})));"
+ genl "channel.send(#{m.body_name}(#{params}));"
}}
end
diff --git a/cpp/rubygen/templates/Session.rb b/cpp/rubygen/templates/Session.rb
index eaa4347974..4265731d32 100644
--- a/cpp/rubygen/templates/Session.rb
+++ b/cpp/rubygen/templates/Session.rb
@@ -37,7 +37,7 @@ class SessionGen < CppGen
indent { gen params.join(",\n") }
gen "){\n\n"
indent (2) {
- gen "return impl->send(AMQMethodBody::shared_ptr(new #{m.body_name}("
+ gen "return impl->send(#{m.body_name}("
params = ["version"] + m.param_names
gen params.join(", ")
other_params=[]
@@ -49,7 +49,7 @@ class SessionGen < CppGen
else
other_params << "true"
end
- gen ")), #{other_params.join(", ")});\n"
+ gen "), #{other_params.join(", ")});\n"
}
gen "}\n\n"
end
diff --git a/cpp/rubygen/templates/all_method_bodies.rb b/cpp/rubygen/templates/all_method_bodies.rb
new file mode 100755
index 0000000000..38fbc31593
--- /dev/null
+++ b/cpp/rubygen/templates/all_method_bodies.rb
@@ -0,0 +1,21 @@
+#!/usr/bin/env ruby
+$: << ".." # Include .. in load path
+require 'cppgen'
+
+class AllMethodBodiesGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @namespace="qpid::framing"
+ @filename="qpid/framing/all_method_bodies"
+ end
+
+ def generate()
+ h_file(@filename) {
+ @amqp.amqp_methods.each { |m| include "qpid/framing/"+m.body_name }
+ }
+ end
+end
+
+AllMethodBodiesGen.new(Outdir, Amqp).generate();
+
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 0f91faba63..977db7ea36 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -46,8 +46,10 @@ rgen_tdir=$(rgen_dir)/templates
rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)/generate
rgen_templates=$(rgen_tdir)/MethodHolder.rb \
+ $(rgen_tdir)/MethodBodyConstVisitor.rb \
$(rgen_tdir)/frame_body_lists.rb \
$(rgen_tdir)/Session.rb \
+ $(rgen_tdir)/all_method_bodies.rb \
$(rgen_tdir)/Proxy.rb
rgen_generator=$(rgen_dir)/generate \
@@ -135,11 +137,11 @@ libqpidcommon_la_LIBADD = \
libqpidcommon_la_SOURCES = \
$(platform_src) \
qpid/framing/AMQBody.cpp \
+ qpid/framing/AMQMethodBody.cpp \
qpid/framing/AMQContentBody.cpp \
qpid/framing/AMQFrame.cpp \
qpid/framing/AMQHeaderBody.cpp \
qpid/framing/AMQHeartbeatBody.cpp \
- qpid/framing/AMQMethodBody.cpp \
qpid/framing/FrameHandler.h \
qpid/framing/BasicHeaderProperties.cpp \
qpid/framing/BodyHandler.cpp \
@@ -162,10 +164,12 @@ libqpidcommon_la_SOURCES = \
qpid/framing/Blob.h \
qpid/framing/AMQP_ClientProxy.cpp \
qpid/framing/AMQP_ServerProxy.cpp \
+ qpid/framing/variant.h \
gen/qpid/framing/AMQP_HighestVersion.h \
- gen/qpid/framing/AMQP_MethodVersionMap.cpp \
+ qpid/framing/Blob.cpp \
qpid/framing/MethodHolder.h qpid/framing/MethodHolder.cpp \
qpid/framing/MethodHolder_construct.cpp \
+ qpid/framing/MethodHolderMaxSize.h \
qpid/Exception.cpp \
qpid/Plugin.h \
qpid/Plugin.cpp \
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index 6e577ab354..e135e960c4 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -307,19 +307,19 @@ void Channel::handlePublish(Message* _message)
messageBuilder.initialise(message);
}
-void Channel::handleHeader(AMQHeaderBody::shared_ptr header)
+void Channel::handleHeader(AMQHeaderBody* header)
{
messageBuilder.setHeader(header);
//at this point, decide based on the size of the message whether we want
//to stage it by saving content directly to disk as it arrives
}
-void Channel::handleContent(AMQContentBody::shared_ptr content)
+void Channel::handleContent(AMQContentBody* content)
{
messageBuilder.addContent(content);
}
-void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {
+void Channel::handleHeartbeat(AMQHeartbeatBody*) {
// TODO aconway 2007-01-17: Implement heartbeating.
}
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index 1f4f6f35e7..021110cf8c 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -170,9 +170,9 @@ class Channel : public CompletionHandler
void flow(bool active);
void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);
void handlePublish(Message* msg);
- void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
- void handleContent(boost::shared_ptr<framing::AMQContentBody>);
- void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
+ void handleHeader(framing::AMQHeaderBody*);
+ void handleContent(framing::AMQContentBody*);
+ void handleHeartbeat(framing::AMQHeartbeatBody*);
void handleInlineTransfer(Message::shared_ptr msg);
};
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp
index 244bee4a92..bddd5802cf 100644
--- a/cpp/src/qpid/broker/BrokerMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessage.cpp
@@ -72,22 +72,25 @@ BasicMessage::BasicMessage(
const string& _exchange, const string& _routingKey,
bool _mandatory, bool _immediate
) :
- Message(_publisher, _exchange, _routingKey, _mandatory,
- _immediate, framing::AMQMethodBody::shared_ptr(new BasicPublishBody(ProtocolVersion(0,9)))),
+ Message(_publisher, _exchange, _routingKey, _mandatory, _immediate),
size(0)
{}
// For tests only.
-BasicMessage::BasicMessage() : size(0)
-{}
+BasicMessage::BasicMessage() : isHeaderSet(false), size(0) {}
BasicMessage::~BasicMessage(){}
-void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){
- this->header = _header;
+void BasicMessage::setHeader(AMQHeaderBody* _header){
+ if (_header) {
+ this->header = *_header;
+ isHeaderSet = true;
+ }
+ else
+ isHeaderSet = false;
}
-void BasicMessage::addContent(AMQContentBody::shared_ptr data){
+void BasicMessage::addContent(AMQContentBody* data){
if (!content.get()) {
content = std::auto_ptr<Content>(new InMemoryContent());
}
@@ -96,7 +99,7 @@ void BasicMessage::addContent(AMQContentBody::shared_ptr data){
}
bool BasicMessage::isComplete(){
- return header.get() && (header->getContentSize() == contentSize());
+ return isHeaderSet && (header.getContentSize() == contentSize());
}
DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue)
@@ -113,10 +116,9 @@ void BasicMessage::deliver(ChannelAdapter& channel,
const string& consumerTag, DeliveryId id,
uint32_t framesize)
{
- channel.send(make_shared_ptr(
- new BasicDeliverBody(
+ channel.send(BasicDeliverBody(
channel.getVersion(), consumerTag, id.getValue(),
- getRedelivered(), getExchange(), getRoutingKey())));
+ getRedelivered(), getExchange(), getRoutingKey()));
sendContent(channel, framesize);
}
@@ -125,11 +127,11 @@ void BasicMessage::sendGetOk(ChannelAdapter& channel,
DeliveryId id,
uint32_t framesize)
{
- channel.send(make_shared_ptr(
- new BasicGetOkBody(
+ channel.send(
+ BasicGetOkBody(
channel.getVersion(),
id.getValue(), getRedelivered(), getExchange(),
- getRoutingKey(), messageCount)));
+ getRoutingKey(), messageCount));
sendContent(channel, framesize);
}
@@ -156,12 +158,11 @@ void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize)
channel.send(header);
Mutex::ScopedLock locker(contentLock);
if (content.get())
- content->send(channel, framesize);
+ content->send(channel, framesize);
}
BasicHeaderProperties* BasicMessage::getHeaderProperties(){
- return boost::polymorphic_downcast<BasicHeaderProperties*>(
- header->getProperties());
+ return isHeaderSet ? dynamic_cast<BasicHeaderProperties*>(header.getProperties()) : 0;
}
const FieldTable& BasicMessage::getApplicationHeaders(){
@@ -170,7 +171,7 @@ const FieldTable& BasicMessage::getApplicationHeaders(){
bool BasicMessage::isPersistent()
{
- if(!header) return false;
+ if(!isHeaderSet) return false;
BasicHeaderProperties* props = getHeaderProperties();
return props && props->getDeliveryMode() == PERSISTENT;
}
@@ -194,9 +195,9 @@ void BasicMessage::decodeHeader(Buffer& buffer)
setRouting(exchange, routingKey);
uint32_t headerSize = buffer.getLong();
- AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody());
- headerBody->decode(buffer, headerSize);
- setHeader(headerBody);
+ AMQHeaderBody headerBody;
+ headerBody.decode(buffer, headerSize);
+ setHeader(&headerBody);
}
void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize)
@@ -214,9 +215,9 @@ void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize)
uint64_t total = 0;
while (total < expectedContentSize()) {
uint64_t remaining = expected - total;
- AMQContentBody::shared_ptr contentBody(new AMQContentBody());
- contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize);
- addContent(contentBody);
+ AMQContentBody contentBody;
+ contentBody.decode(buffer, remaining < chunkSize ? remaining : chunkSize);
+ addContent(&contentBody);
total += chunkSize;
}
}
@@ -232,8 +233,8 @@ void BasicMessage::encodeHeader(Buffer& buffer) const
RecoveryManagerImpl::encodeMessageType(*this, buffer);
buffer.putShortString(getExchange());
buffer.putShortString(getRoutingKey());
- buffer.putLong(header->size());
- header->encode(buffer);
+ buffer.putLong(header.size());
+ header.encode(buffer);
}
void BasicMessage::encodeContent(Buffer& buffer) const
@@ -258,12 +259,12 @@ uint32_t BasicMessage::encodedHeaderSize() const
return RecoveryManagerImpl::encodedMessageTypeSize()
+getExchange().size() + 1
+ getRoutingKey().size() + 1
- + header->size() + 4;//4 extra bytes for size
+ + header.size() + 4;//4 extra bytes for size
}
uint64_t BasicMessage::expectedContentSize()
{
- return header.get() ? header->getContentSize() : 0;
+ return isHeaderSet ? header.getContentSize() : 0;
}
void BasicMessage::releaseContent(MessageStore* store)
@@ -294,5 +295,5 @@ void BasicMessage::setContent(std::auto_ptr<Content>& _content)
uint32_t BasicMessage::getRequiredCredit() const
{
- return header->size() + contentSize();
+ return header.size() + contentSize();
}
diff --git a/cpp/src/qpid/broker/BrokerMessage.h b/cpp/src/qpid/broker/BrokerMessage.h
index af8e4e62e9..0f46ff2e83 100644
--- a/cpp/src/qpid/broker/BrokerMessage.h
+++ b/cpp/src/qpid/broker/BrokerMessage.h
@@ -27,6 +27,7 @@
#include "BrokerMessageBase.h"
#include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/AMQHeaderBody.h"
#include "ConnectionToken.h"
#include "Content.h"
#include "qpid/sys/Mutex.h"
@@ -51,7 +52,8 @@ using framing::string;
* request.
*/
class BasicMessage : public Message {
- boost::shared_ptr<framing::AMQHeaderBody> header;
+ framing::AMQHeaderBody header;
+ bool isHeaderSet;
std::auto_ptr<Content> content;
mutable sys::Mutex contentLock;
uint64_t size;
@@ -66,8 +68,8 @@ class BasicMessage : public Message {
bool mandatory, bool immediate);
BasicMessage();
~BasicMessage();
- void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header);
- void addContent(framing::AMQContentBody::shared_ptr data);
+ void setHeader(framing::AMQHeaderBody* header);
+ void addContent(framing::AMQContentBody* data);
bool isComplete();
static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue);
diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h
index 94035905ce..bac5dc6386 100644
--- a/cpp/src/qpid/broker/BrokerMessageBase.h
+++ b/cpp/src/qpid/broker/BrokerMessageBase.h
@@ -54,22 +54,18 @@ class MessageStore;
class Message : public PersistableMessage{
public:
typedef boost::shared_ptr<Message> shared_ptr;
- typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr;
-
Message(const ConnectionToken* publisher_,
const std::string& _exchange,
const std::string& _routingKey,
- bool _mandatory, bool _immediate,
- AMQMethodBodyPtr respondTo_) :
+ bool _mandatory, bool _immediate) :
publisher(publisher_),
exchange(_exchange),
routingKey(_routingKey),
mandatory(_mandatory),
immediate(_immediate),
persistenceId(0),
- redelivered(false),
- respondTo(respondTo_)
+ redelivered(false)
{}
Message() :
@@ -145,8 +141,8 @@ class Message : public PersistableMessage{
* it uses).
*/
virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
- virtual void setHeader(boost::shared_ptr<framing::AMQHeaderBody>) {};
- virtual void addContent(boost::shared_ptr<framing::AMQContentBody>) {};
+ virtual void setHeader(framing::AMQHeaderBody*) {};
+ virtual void addContent(framing::AMQContentBody*) {};
/**
* Releases the in-memory content data held by this
* message. Must pass in a store from which the data can
@@ -164,7 +160,6 @@ class Message : public PersistableMessage{
const bool immediate;
mutable uint64_t persistenceId;
bool redelivered;
- AMQMethodBodyPtr respondTo;
};
}}
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
index 0da5f3d8f5..1184885aeb 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
@@ -48,35 +48,34 @@ struct MessageDeliveryToken : public DeliveryToken
};
MessageMessage::MessageMessage(
- ConnectionToken* publisher, TransferPtr transfer_
+ ConnectionToken* publisher, const MessageTransferBody* transfer_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
transfer_->getRejectUnroutable(),
- transfer_->getImmediate(),
- transfer_),
- transfer(transfer_)
+ transfer_->getImmediate()),
+ transfer(*transfer_)
{
- assert(transfer->getBody().isInline());
+ assert(transfer.getBody().isInline());
}
MessageMessage::MessageMessage(
- ConnectionToken* publisher, TransferPtr transfer_, ReferencePtr reference_
+ ConnectionToken* publisher, const MessageTransferBody* transfer_,
+ ReferencePtr reference_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
transfer_->getRejectUnroutable(),
- transfer_->getImmediate(),
- transfer_),
- transfer(transfer_),
+ transfer_->getImmediate()),
+ transfer(*transfer_),
reference(reference_)
{
- assert(!transfer->getBody().isInline());
+ assert(!transfer.getBody().isInline());
assert(reference_);
}
/**
* Currently used by message store impls to recover messages
*/
-MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {}
+MessageMessage::MessageMessage() {}
// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring
void MessageMessage::transferMessage(
@@ -84,27 +83,28 @@ void MessageMessage::transferMessage(
const std::string& consumerTag,
uint32_t framesize)
{
- const framing::Content& body = transfer->getBody();
+ const framing::Content& body = transfer.getBody();
// Send any reference data
ReferencePtr ref= getReference();
if (ref){
// Open
- channel.send(make_shared_ptr(new MessageOpenBody(channel.getVersion(), ref->getId())));
+ channel.send(MessageOpenBody(channel.getVersion(), ref->getId()));
// Appends
for(Reference::Appends::const_iterator a = ref->getAppends().begin();
a != ref->getAppends().end();
++a) {
- uint32_t sizeleft = (*a)->size();
- const string& content = (*a)->getBytes();
+ uint32_t sizeleft = a->size();
+ const string& content = a->getBytes();
// Calculate overhead bytes
// Assume that the overhead is constant as the reference name doesn't change
uint32_t overhead = sizeleft - content.size();
string::size_type contentStart = 0;
while (sizeleft) {
string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
- channel.send(make_shared_ptr(new MessageAppendBody(channel.getVersion(), ref->getId(),
- string(content, contentStart, contentSize))));
+
+ channel.send(MessageAppendBody(channel.getVersion(), ref->getId(),
+ string(content, contentStart, contentSize)));
sizeleft -= contentSize;
contentStart += contentSize;
}
@@ -112,76 +112,73 @@ void MessageMessage::transferMessage(
}
// The transfer
- if ( transfer->size()<=framesize ) {
- channel.send(make_shared_ptr(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- body)));
+ if ( transfer.size()<=framesize ) {
+ channel.send(MessageTransferBody(ProtocolVersion(),
+ transfer.getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ body));
} else {
// Thing to do here is to construct a simple reference message then deliver that instead
// fragmentation will be taken care of in the delivery if necessary;
string content = body.getValue();
string refname = "dummy";
- TransferPtr newTransfer(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- framing::Content(REFERENCE, refname)));
+ MessageTransferBody newTransfer(channel.getVersion(),
+ transfer.getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ framing::Content(REFERENCE, refname));
ReferencePtr newRef(new Reference(refname));
- Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
- newRef->append(newAppend);
- MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), newTransfer, newRef);
+ newRef->append(MessageAppendBody(channel.getVersion(), refname, content));
+ MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), &newTransfer, newRef);
newMsg.transferMessage(channel, consumerTag, framesize);
return;
}
// Close any reference data
if (ref)
- channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId())));
+ channel.send(MessageCloseBody(ProtocolVersion(), ref->getId()));
}
@@ -202,8 +199,8 @@ bool MessageMessage::isComplete()
uint64_t MessageMessage::contentSize() const
{
- if (transfer->getBody().isInline())
- return transfer->getBody().getValue().size();
+ if (transfer.getBody().isInline())
+ return transfer.getBody().getValue().size();
else {
assert(getReference());
return getReference()->getSize();
@@ -217,11 +214,11 @@ qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
const FieldTable& MessageMessage::getApplicationHeaders()
{
- return transfer->getApplicationHeaders();
+ return transfer.getApplicationHeaders();
}
bool MessageMessage::isPersistent()
{
- return transfer->getDeliveryMode() == PERSISTENT;
+ return transfer.getDeliveryMode() == PERSISTENT;
}
uint32_t MessageMessage::encodedSize() const
@@ -231,7 +228,7 @@ uint32_t MessageMessage::encodedSize() const
uint32_t MessageMessage::encodedHeaderSize() const
{
- return RecoveryManagerImpl::encodedMessageTypeSize() + transfer->size() - transfer->baseSize();
+ return RecoveryManagerImpl::encodedMessageTypeSize() + transfer.size();
}
uint32_t MessageMessage::encodedContentSize() const
@@ -252,18 +249,17 @@ void MessageMessage::encode(Buffer& buffer) const
void MessageMessage::encodeHeader(Buffer& buffer) const
{
RecoveryManagerImpl::encodeMessageType(*this, buffer);
- if (transfer->getBody().isInline()) {
- transfer->encodeContent(buffer);
+ if (transfer.getBody().isInline()) {
+ transfer.encode(buffer);
} else {
assert(getReference());
string data;
const Reference::Appends& appends = getReference()->getAppends();
for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) {
- data += (*a)->getBytes();
+ data += a->getBytes();
}
framing::Content body(INLINE, data);
- std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body));
- copy->encodeContent(buffer);
+ copyTransfer(ProtocolVersion(), transfer.getDestination(), body).encode(buffer);
}
}
@@ -271,8 +267,7 @@ void MessageMessage::decodeHeader(Buffer& buffer)
{
//don't care about the type here, but want encode/decode to be symmetric
RecoveryManagerImpl::decodeMessageType(buffer);
-
- transfer->decodeContent(buffer);
+ transfer.decode(buffer);
}
void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
@@ -280,37 +275,36 @@ void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
}
-MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version,
- const string& destination,
- const framing::Content& body) const
+MessageTransferBody MessageMessage::copyTransfer(const ProtocolVersion& version,
+ const string& destination,
+ const framing::Content& body) const
{
- return new MessageTransferBody(version,
- transfer->getTicket(),
- destination,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- body);
-
+ return MessageTransferBody(version,
+ transfer.getTicket(),
+ destination,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ body);
}
MessageMessage::ReferencePtr MessageMessage::getReference() const {
@@ -321,7 +315,7 @@ uint32_t MessageMessage::getRequiredCredit() const
{
//TODO: change when encoding changes. Should be the payload of any
//header & body frames.
- return transfer->size();
+ return transfer.size();
}
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.h b/cpp/src/qpid/broker/BrokerMessageMessage.h
index 6b1bd9ab5d..6bfd0e045d 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.h
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.h
@@ -29,10 +29,6 @@
namespace qpid {
-namespace framing {
-class MessageTransferBody;
-}
-
namespace broker {
class ConnectionToken;
class Reference;
@@ -40,16 +36,15 @@ class Reference;
class MessageMessage: public Message{
public:
typedef boost::shared_ptr<MessageMessage> shared_ptr;
- typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
typedef boost::shared_ptr<Reference> ReferencePtr;
- MessageMessage(ConnectionToken* publisher, TransferPtr transfer);
- MessageMessage(ConnectionToken* publisher, TransferPtr transfer, ReferencePtr reference);
+ MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer);
+ MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer, ReferencePtr reference);
MessageMessage();
// Default destructor okay
- TransferPtr getTransfer() const { return transfer; }
+ framing::MessageTransferBody* getTransfer() const { return const_cast<framing::MessageTransferBody*>(&transfer); }
ReferencePtr getReference() const ;
void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
@@ -80,12 +75,12 @@ class MessageMessage: public Message{
const std::string& consumerTag,
uint32_t framesize);
- framing::MessageTransferBody* copyTransfer(
+ framing::MessageTransferBody copyTransfer(
const framing::ProtocolVersion& version,
const std::string& destination,
const framing::Content& body) const;
- const TransferPtr transfer;
+ framing::MessageTransferBody transfer;
const boost::shared_ptr<Reference> reference;
};
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp
index a67a5557c6..175f57df7d 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.cpp
+++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp
@@ -1,3 +1,4 @@
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -42,8 +43,7 @@ void ConnectionAdapter::close(ReplyCode code, const string& text, ClassId classI
handler->client.close(code, text, classId, methodId);
}
-void ConnectionAdapter::handleMethod(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method)
+void ConnectionAdapter::handleMethod(framing::AMQMethodBody* method)
{
try{
method->invoke(*this);
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h
index 1ce850a659..9aa3d130e8 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.h
+++ b/cpp/src/qpid/broker/ConnectionAdapter.h
@@ -47,12 +47,12 @@ public:
void handle(framing::AMQFrame& frame);
//ChannelAdapter virtual methods:
- void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method);
+ void handleMethod(framing::AMQMethodBody* method);
bool isOpen() const { return true; } //channel 0 is always open
//never needed:
- void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>) {}
- void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>) {}
- void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>) {}
+ void handleHeader(framing::AMQHeaderBody*) {}
+ void handleContent(framing::AMQContentBody*) {}
+ void handleHeartbeat(framing::AMQHeartbeatBody*) {}
//AMQP_ServerOperations:
ConnectionHandler* getConnectionHandler();
diff --git a/cpp/src/qpid/broker/Content.h b/cpp/src/qpid/broker/Content.h
index df9e7a1132..97dce0d3f7 100644
--- a/cpp/src/qpid/broker/Content.h
+++ b/cpp/src/qpid/broker/Content.h
@@ -42,7 +42,7 @@ class Content{
virtual ~Content(){}
/** Add a block of data to the content */
- virtual void add(framing::AMQContentBody::shared_ptr data) = 0;
+ virtual void add(framing::AMQContentBody* data) = 0;
/** Total size of content in bytes */
virtual uint32_t size() = 0;
diff --git a/cpp/src/qpid/broker/InMemoryContent.cpp b/cpp/src/qpid/broker/InMemoryContent.cpp
index a6ce820f7e..d69dcfafe7 100644
--- a/cpp/src/qpid/broker/InMemoryContent.cpp
+++ b/cpp/src/qpid/broker/InMemoryContent.cpp
@@ -26,16 +26,16 @@ using namespace qpid::broker;
using namespace qpid::framing;
using boost::static_pointer_cast;
-void InMemoryContent::add(AMQContentBody::shared_ptr data)
+void InMemoryContent::add(AMQContentBody* data)
{
- content.push_back(data);
+ content.push_back(*data);
}
uint32_t InMemoryContent::size()
{
int sum(0);
for (content_iterator i = content.begin(); i != content.end(); i++) {
- sum += (*i)->size();
+ sum += i->size();
}
return sum;
}
@@ -43,22 +43,20 @@ uint32_t InMemoryContent::size()
void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize)
{
for (content_iterator i = content.begin(); i != content.end(); i++) {
- if ((*i)->size() > framesize) {
+ if (i->size() > framesize) {
uint32_t offset = 0;
- for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
- string data = (*i)->getData().substr(offset, framesize);
- channel.send(make_shared_ptr(new AMQContentBody(data)));
+ for (int chunk = i->size() / framesize; chunk > 0; chunk--) {
+ string data = i->getData().substr(offset, framesize);
+ channel.send(AMQContentBody(data));
offset += framesize;
}
- uint32_t remainder = (*i)->size() % framesize;
+ uint32_t remainder = i->size() % framesize;
if (remainder) {
- string data = (*i)->getData().substr(offset, remainder);
- channel.send(make_shared_ptr(new AMQContentBody(data)));
+ string data = i->getData().substr(offset, remainder);
+ channel.send(AMQContentBody(data));
}
} else {
- AMQBody::shared_ptr contentBody =
- static_pointer_cast<AMQBody, AMQContentBody>(*i);
- channel.send(contentBody);
+ channel.send(*i);
}
}
}
@@ -66,7 +64,7 @@ void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize)
void InMemoryContent::encode(Buffer& buffer)
{
for (content_iterator i = content.begin(); i != content.end(); i++) {
- (*i)->encode(buffer);
+ i->encode(buffer);
}
}
diff --git a/cpp/src/qpid/broker/InMemoryContent.h b/cpp/src/qpid/broker/InMemoryContent.h
index 425f0e4e26..a6fca7ca98 100644
--- a/cpp/src/qpid/broker/InMemoryContent.h
+++ b/cpp/src/qpid/broker/InMemoryContent.h
@@ -22,21 +22,22 @@
#define _InMemoryContent_
#include "Content.h"
+#include "qpid/framing/AMQContentBody.h"
#include <vector>
namespace qpid {
namespace broker {
class InMemoryContent : public Content{
- typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
+ typedef std::vector<framing::AMQContentBody> content_list;
typedef content_list::iterator content_iterator;
content_list content;
public:
- void add(qpid::framing::AMQContentBody::shared_ptr data);
+ void add(framing::AMQContentBody* data);
uint32_t size();
void send(framing::ChannelAdapter&, uint32_t framesize);
- void encode(qpid::framing::Buffer& buffer);
+ void encode(framing::Buffer& buffer);
};
}
}
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp
index 80d06ebf2b..b8b5b37f45 100644
--- a/cpp/src/qpid/broker/LazyLoadedContent.cpp
+++ b/cpp/src/qpid/broker/LazyLoadedContent.cpp
@@ -33,7 +33,7 @@ LazyLoadedContent::~LazyLoadedContent()
LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, uint64_t _expectedSize) :
store(_store), msg(_msg), expectedSize(_expectedSize) {}
-void LazyLoadedContent::add(AMQContentBody::shared_ptr data)
+void LazyLoadedContent::add(AMQContentBody* data)
{
store->appendContent(*msg, data->getData());
}
@@ -52,12 +52,12 @@ void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize)
string data;
store->loadContent(*msg, data, offset,
remaining > framesize ? framesize : remaining);
- channel.send(make_shared_ptr(new AMQContentBody(data)));
+ channel.send(AMQContentBody(data));
}
} else {
string data;
store->loadContent(*msg, data, 0, expectedSize);
- channel.send(make_shared_ptr(new AMQContentBody(data)));
+ channel.send(AMQContentBody(data));
}
}
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.h b/cpp/src/qpid/broker/LazyLoadedContent.h
index 9dff6158a5..79a33ed7a9 100644
--- a/cpp/src/qpid/broker/LazyLoadedContent.h
+++ b/cpp/src/qpid/broker/LazyLoadedContent.h
@@ -36,7 +36,7 @@ namespace qpid {
MessageStore* const store, Message* const msg,
uint64_t expectedSize);
~LazyLoadedContent();
- void add(qpid::framing::AMQContentBody::shared_ptr data);
+ void add(qpid::framing::AMQContentBody* data);
uint32_t size();
void send(
framing::ChannelAdapter&,
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index 6c33b38e72..f19927b708 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -50,7 +50,7 @@ void MessageBuilder::initialise(Message::shared_ptr& msg){
message = msg;
}
-void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
+void MessageBuilder::setHeader(AMQHeaderBody* header){
if(!message.get()){
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
}
@@ -65,7 +65,7 @@ void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
route();
}
-void MessageBuilder::addContent(AMQContentBody::shared_ptr& content){
+void MessageBuilder::addContent(AMQContentBody* content){
if(!message.get()){
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
}
diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h
index 7c4a529a64..18e85d7383 100644
--- a/cpp/src/qpid/broker/MessageBuilder.h
+++ b/cpp/src/qpid/broker/MessageBuilder.h
@@ -39,8 +39,8 @@ namespace qpid {
MessageStore* const store = 0,
uint64_t stagingThreshold = 0);
void initialise(Message::shared_ptr& msg);
- void setHeader(framing::AMQHeaderBody::shared_ptr& header);
- void addContent(framing::AMQContentBody::shared_ptr& content);
+ void setHeader(framing::AMQHeaderBody* header);
+ void addContent(framing::AMQContentBody* content);
Message::shared_ptr getMessage() { return message; }
private:
Message::shared_ptr message;
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index b5bea05eac..3f407c11f7 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -28,6 +28,7 @@
#include "BrokerAdapter.h"
#include <boost/format.hpp>
+#include <boost/cast.hpp>
namespace qpid {
namespace broker {
@@ -159,9 +160,7 @@ MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ )
void
MessageHandlerImpl::transfer(const framing::AMQMethodBody& context)
{
- MessageTransferBody::shared_ptr transfer(
- make_shared_ptr(new MessageTransferBody(static_cast<const MessageTransferBody&>(context))));
-
+ const MessageTransferBody* transfer = boost::polymorphic_downcast<const MessageTransferBody*>(&context);
if (transfer->getBody().isInline()) {
MessageMessage::shared_ptr message(new MessageMessage(&connection, transfer));
channel.handleInlineTransfer(message);
diff --git a/cpp/src/qpid/broker/Reference.cpp b/cpp/src/qpid/broker/Reference.cpp
index c2060f8fee..283b231b60 100644
--- a/cpp/src/qpid/broker/Reference.cpp
+++ b/cpp/src/qpid/broker/Reference.cpp
@@ -40,9 +40,9 @@ Reference::shared_ptr ReferenceRegistry::get(const Reference::Id& id) {
return i->second;
}
-void Reference::append(AppendPtr ptr) {
- appends.push_back(ptr);
- size += ptr->getBytes().length();
+void Reference::append(const framing::MessageAppendBody& app) {
+ appends.push_back(app);
+ size += app.getBytes().length();
}
void Reference::close() {
diff --git a/cpp/src/qpid/broker/Reference.h b/cpp/src/qpid/broker/Reference.h
index 277eb7b917..5a373fbeba 100644
--- a/cpp/src/qpid/broker/Reference.h
+++ b/cpp/src/qpid/broker/Reference.h
@@ -19,6 +19,8 @@
*
*/
+#include "qpid/framing/MessageAppendBody.h"
+
#include <string>
#include <vector>
#include <map>
@@ -56,8 +58,7 @@ class Reference
typedef boost::shared_ptr<Reference> shared_ptr;
typedef boost::shared_ptr<MessageMessage> MessagePtr;
typedef std::vector<MessagePtr> Messages;
- typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr;
- typedef std::vector<AppendPtr> Appends;
+ typedef std::vector<framing::MessageAppendBody> Appends;
Reference(const Id& id_=Id(), ReferenceRegistry* reg=0)
: id(id_), size(0), registry(reg) {}
@@ -69,7 +70,7 @@ class Reference
void addMessage(MessagePtr message) { messages.push_back(message); }
/** Append more data to the reference */
- void append(AppendPtr ptr);
+ void append(const framing::MessageAppendBody&);
/** Close the reference, complete each associated message */
void close();
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index 6ef2162a4a..b7aa2aad25 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -22,6 +22,8 @@
#include "SemanticHandler.h"
#include "BrokerAdapter.h"
#include "qpid/framing/ChannelAdapter.h"
+#include "qpid/framing/ExecutionCompleteBody.h"
+#include "qpid/framing/ChannelCloseOkBody.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -60,7 +62,7 @@ void SemanticHandler::handle(framing::AMQFrame& frame)
}
//ChannelAdapter virtual methods:
-void SemanticHandler::handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method)
+void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
{
try {
if (!method->invoke(this)) {
@@ -108,11 +110,11 @@ void SemanticHandler::flush()
incoming.lwm = incoming.hwm;
if (isOpen()) {
Mutex::ScopedLock l(outLock);
- ChannelAdapter::send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
+ ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()));
}
}
-void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method)
+void SemanticHandler::handleL4(framing::AMQMethodBody* method)
{
try{
if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
@@ -139,17 +141,17 @@ bool SemanticHandler::isOpen() const
return channel.isOpen();
}
-void SemanticHandler::handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody> body)
+void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody* body)
{
channel.handleHeader(body);
}
-void SemanticHandler::handleContent(boost::shared_ptr<qpid::framing::AMQContentBody> body)
+void SemanticHandler::handleContent(qpid::framing::AMQContentBody* body)
{
channel.handleContent(body);
}
-void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody> body)
+void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody* body)
{
channel.handleHeartbeat(body);
}
@@ -169,16 +171,12 @@ void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_
msg->deliver(*this, tag, token, connection.getFrameMax());
}
-void SemanticHandler::send(shared_ptr<AMQBody> body)
+void SemanticHandler::send(const AMQBody& body)
{
Mutex::ScopedLock l(outLock);
- uint8_t type(body->type());
- if (type == METHOD_BODY) {
+ if (body.getMethod() && body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID) {
//temporary hack until channel management is moved to its own handler:
- if (dynamic_pointer_cast<AMQMethodBody>(body)->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- ++outgoing.hwm;
- //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl;
- }
+ ++outgoing.hwm;
}
ChannelAdapter::send(body);
}
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index 016c94738d..6748da8500 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -31,6 +31,14 @@
#include "qpid/framing/SequenceNumber.h"
namespace qpid {
+
+namespace framing {
+class AMQMethodBody;
+class AMQHeaderBody;
+class AMQContentBody;
+class AMQHeaderBody;
+}
+
namespace broker {
class BrokerAdapter;
@@ -48,16 +56,16 @@ class SemanticHandler : private framing::ChannelAdapter,
framing::Window outgoing;
sys::Mutex outLock;
- void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method);
+ void handleL4(framing::AMQMethodBody* method);
//ChannelAdapter virtual methods:
- void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method);
+ void handleMethod(framing::AMQMethodBody* method);
bool isOpen() const;
- void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
- void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
- void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
+ void handleHeader(framing::AMQHeaderBody*);
+ void handleContent(framing::AMQContentBody*);
+ void handleHeartbeat(framing::AMQHeartbeatBody*);
- void send(shared_ptr<framing::AMQBody> body);
+ void send(const framing::AMQBody& body);
//delivery adapter methods:
diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp
index a6aea438f0..b3d720baf0 100644
--- a/cpp/src/qpid/client/ChannelHandler.cpp
+++ b/cpp/src/qpid/client/ChannelHandler.cpp
@@ -21,6 +21,7 @@
#include "ChannelHandler.h"
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/all_method_bodies.h"
using namespace qpid::client;
using namespace qpid::framing;
@@ -30,40 +31,39 @@ ChannelHandler::ChannelHandler() : StateManager(CLOSED), id(0) {}
void ChannelHandler::incoming(AMQFrame& frame)
{
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
if (getState() == OPEN) {
- if (isA<ChannelCloseBody>(body)) {
- ChannelCloseBody::shared_ptr method(shared_polymorphic_cast<ChannelCloseBody>(body));
+ ChannelCloseBody* closeBody=
+ dynamic_cast<ChannelCloseBody*>(body->getMethod());
+ if (closeBody) {
setState(CLOSED);
if (onClose) {
- onClose(method->getReplyCode(), method->getReplyText());
+ onClose(closeBody->getReplyCode(), closeBody->getReplyText());
}
} else {
try {
in(frame);
}catch(ChannelException& e){
- if (body->type() == METHOD_BODY) {
- AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body));
- close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
- } else {
+ AMQMethodBody* method=body->getMethod();
+ if (method)
+ close(e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ else
close(e.code, e.toString(), 0, 0);
- }
}
}
} else {
- if (body->type() == METHOD_BODY) {
- handleMethod(shared_polymorphic_cast<AMQMethodBody>(body));
- } else {
+ if (body->getMethod())
+ handleMethod(body->getMethod());
+ else
throw new ConnectionException(504, "Channel not open.");
- }
-
}
}
void ChannelHandler::outgoing(AMQFrame& frame)
{
if (getState() == OPEN) {
- frame.channel = id;
+ frame.setChannel(id);
out(frame);
} else {
throw Exception("Channel not open");
@@ -75,7 +75,7 @@ void ChannelHandler::open(uint16_t _id)
id = _id;
setState(OPENING);
- AMQFrame f(version, id, make_shared_ptr(new ChannelOpenBody(version)));
+ AMQFrame f(version, id, ChannelOpenBody(version));
out(f);
std::set<int> states;
@@ -90,7 +90,7 @@ void ChannelHandler::open(uint16_t _id)
void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
{
setState(CLOSING);
- AMQFrame f(version, id, make_shared_ptr(new ChannelCloseBody(version, code, message, classId, methodId)));
+ AMQFrame f(version, id, ChannelCloseBody(version, code, message, classId, methodId));
out(f);
}
@@ -100,24 +100,24 @@ void ChannelHandler::close()
waitFor(CLOSED);
}
-void ChannelHandler::handleMethod(AMQMethodBody::shared_ptr method)
+void ChannelHandler::handleMethod(AMQMethodBody* method)
{
switch (getState()) {
- case OPENING:
+ case OPENING:
if (method->isA<ChannelOpenOkBody>()) {
setState(OPEN);
} else {
throw ConnectionException(504, "Channel not opened.");
}
break;
- case CLOSING:
+ case CLOSING:
if (method->isA<ChannelCloseOkBody>()) {
setState(CLOSED);
} //else just ignore it
break;
- case CLOSED:
+ case CLOSED:
throw ConnectionException(504, "Channel not opened.");
- default:
+ default:
throw Exception("Unexpected state encountered in ChannelHandler!");
}
}
diff --git a/cpp/src/qpid/client/ChannelHandler.h b/cpp/src/qpid/client/ChannelHandler.h
index eaa7e7cc72..556e13a4f8 100644
--- a/cpp/src/qpid/client/ChannelHandler.h
+++ b/cpp/src/qpid/client/ChannelHandler.h
@@ -34,13 +34,7 @@ class ChannelHandler : private StateManager, public ChainableFrameHandler
framing::ProtocolVersion version;
uint16_t id;
- void handleMethod(framing::AMQMethodBody::shared_ptr method);
-
- template <class T> bool isA(framing::AMQBody::shared_ptr body) {
- return body->type() == framing::METHOD_BODY &&
- boost::shared_polymorphic_cast<framing::AMQMethodBody>(body)->isA<T>();
- }
-
+ void handleMethod(framing::AMQMethodBody* method);
void close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId);
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 8ffddd0dbf..aa73e83328 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -25,11 +25,11 @@
#include "ClientMessage.h"
#include "qpid/QpidError.h"
#include "Connection.h"
-#include "ConnectionHandler.h"
#include "FutureResponse.h"
#include "MessageListener.h"
#include <boost/format.hpp>
#include <boost/bind.hpp>
+#include "qpid/framing/all_method_bodies.h"
// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
// handling of errors that should close the connection or the channel.
@@ -37,7 +37,6 @@
//
using namespace std;
using namespace boost;
-using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
@@ -49,13 +48,11 @@ const std::string empty;
class ScopedSync
{
Session& session;
-public:
+ public:
ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); }
~ScopedSync() { session.setSynchronous(false); }
};
-}}
-
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
prefetch(_prefetch), transactional(_transactional), running(false)
{
@@ -250,3 +247,6 @@ void Channel::run() {
}
} catch (const QueueClosed&) {}
}
+
+}}
+
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index e2e83c8caf..e41ab363b5 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -27,6 +27,7 @@
#include "ClientChannel.h"
#include "ConnectionImpl.h"
#include "Session.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
namespace qpid {
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index f47506d977..66db9384e2 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -22,6 +22,8 @@
#include "ConnectionHandler.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/framing/all_method_bodies.h"
using namespace qpid::client;
using namespace qpid::framing;
@@ -53,16 +55,16 @@ void ConnectionHandler::incoming(AMQFrame& frame)
throw Exception("Connection is closed.");
}
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
if (frame.getChannel() == 0) {
- if (body->type() == METHOD_BODY) {
- handle(shared_polymorphic_cast<AMQMethodBody>(body));
+ if (body->getMethod()) {
+ handle(body->getMethod());
} else {
error(503, "Cannot send content on channel zero.");
}
} else {
switch(getState()) {
- case OPEN:
+ case OPEN:
try {
in(frame);
}catch(ConnectionException& e){
@@ -71,10 +73,10 @@ void ConnectionHandler::incoming(AMQFrame& frame)
error(541/*internal error*/, e.what(), body);
}
break;
- case CLOSING:
+ case CLOSING:
QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored.");
break;
- default:
+ default:
//must be in connection initialisation:
fail("Cannot receive frames on non-zero channel until connection is established.");
}
@@ -101,32 +103,29 @@ void ConnectionHandler::waitForOpen()
void ConnectionHandler::close()
{
setState(CLOSING);
- send(make_shared_ptr(new ConnectionCloseBody(version, 200, OK, 0, 0)));
-
+ send(ConnectionCloseBody(version, 200, OK, 0, 0));
waitFor(CLOSED);
}
-void ConnectionHandler::send(framing::AMQBody::shared_ptr body)
+void ConnectionHandler::send(const framing::AMQBody& body)
{
- AMQFrame f;
- f.setBody(body);
+ AMQFrame f(ProtocolVersion(), 0, body);
out(f);
}
void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
{
setState(CLOSING);
- send(make_shared_ptr(new ConnectionCloseBody(version, code, message, classId, methodId)));
+ send(ConnectionCloseBody(version, code, message, classId, methodId));
}
-void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody::shared_ptr body)
+void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body)
{
- if (body->type() == METHOD_BODY) {
- AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body));
+ AMQMethodBody* method = body->getMethod();
+ if (method)
error(code, message, method->amqpClassId(), method->amqpMethodId());
- } else {
+ else
error(code, message);
- }
}
@@ -136,54 +135,54 @@ void ConnectionHandler::fail(const std::string& message)
setState(FAILED);
}
-void ConnectionHandler::handle(AMQMethodBody::shared_ptr method)
+void ConnectionHandler::handle(AMQMethodBody* method)
{
switch (getState()) {
- case NOT_STARTED:
+ case NOT_STARTED:
if (method->isA<ConnectionStartBody>()) {
setState(NEGOTIATING);
string response = ((char)0) + uid + ((char)0) + pwd;
- send(make_shared_ptr(new ConnectionStartOkBody(version, properties, mechanism, response, locale)));
+ send(ConnectionStartOkBody(version, properties, mechanism, response, locale));
} else {
fail("Bad method sequence, expected connection-start.");
}
break;
- case NEGOTIATING:
+ case NEGOTIATING:
if (method->isA<ConnectionTuneBody>()) {
- ConnectionTuneBody::shared_ptr proposal(shared_polymorphic_cast<ConnectionTuneBody>(method));
+ ConnectionTuneBody* proposal=polymorphic_downcast<ConnectionTuneBody*>(method);
heartbeat = proposal->getHeartbeat();
maxChannels = proposal->getChannelMax();
- send(make_shared_ptr(new ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat)));
+ send(ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat));
setState(OPENING);
- send(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, insist)));
- //TODO: support for further security challenges
- //} else if (method->isA<ConnectionSecureBody>()) {
+ send(ConnectionOpenBody(version, vhost, capabilities, insist));
+ //TODO: support for further security challenges
+ //} else if (method->isA<ConnectionSecureBody>()) {
} else {
fail("Unexpected method sequence, expected connection-tune.");
}
break;
- case OPENING:
+ case OPENING:
if (method->isA<ConnectionOpenOkBody>()) {
setState(OPEN);
- //TODO: support for redirection
- //} else if (method->isA<ConnectionRedirectBody>()) {
+ //TODO: support for redirection
+ //} else if (method->isA<ConnectionRedirectBody>()) {
} else {
fail("Unexpected method sequence, expected connection-open-ok.");
}
break;
- case OPEN:
+ case OPEN:
if (method->isA<ConnectionCloseBody>()) {
- send(make_shared_ptr(new ConnectionCloseOkBody(version)));
+ send(ConnectionCloseOkBody(version));
setState(CLOSED);
if (onError) {
- ConnectionCloseBody::shared_ptr c(shared_polymorphic_cast<ConnectionCloseBody>(method));
+ ConnectionCloseBody* c=polymorphic_downcast<ConnectionCloseBody*>(method);
onError(c->getReplyCode(), c->getReplyText());
}
} else {
error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId());
}
break;
- case CLOSING:
+ case CLOSING:
if (method->isA<ConnectionCloseOkBody>()) {
if (onClose) {
onClose();
diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h
index 464d0ca26d..d05ae1428b 100644
--- a/cpp/src/qpid/client/ConnectionHandler.h
+++ b/cpp/src/qpid/client/ConnectionHandler.h
@@ -25,6 +25,8 @@
#include "StateManager.h"
#include "ChainableFrameHandler.h"
#include "qpid/framing/InputHandler.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/AMQMethodBody.h"
namespace qpid {
namespace client {
@@ -53,10 +55,10 @@ class ConnectionHandler : private StateManager,
enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
std::set<int> ESTABLISHED;
- void handle(framing::AMQMethodBody::shared_ptr method);
- void send(framing::AMQBody::shared_ptr body);
+ void handle(framing::AMQMethodBody* method);
+ void send(const framing::AMQBody& body);
void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0);
- void error(uint16_t code, const std::string& message, framing::AMQBody::shared_ptr body);
+ void error(uint16_t code, const std::string& message, framing::AMQBody* body);
void fail(const std::string& message);
public:
diff --git a/cpp/src/qpid/client/Correlator.cpp b/cpp/src/qpid/client/Correlator.cpp
index edb16bbcee..9ef6857957 100644
--- a/cpp/src/qpid/client/Correlator.cpp
+++ b/cpp/src/qpid/client/Correlator.cpp
@@ -25,7 +25,7 @@ using qpid::client::Correlator;
using namespace qpid::framing;
using namespace boost;
-void Correlator::receive(AMQMethodBody::shared_ptr response)
+void Correlator::receive(AMQMethodBody* response)
{
if (listeners.empty()) {
throw ConnectionException(503, "Unexpected method!");//TODO: include the method & class name
diff --git a/cpp/src/qpid/client/Correlator.h b/cpp/src/qpid/client/Correlator.h
index 339c9bd0c4..d93e7b66cd 100644
--- a/cpp/src/qpid/client/Correlator.h
+++ b/cpp/src/qpid/client/Correlator.h
@@ -36,9 +36,9 @@ namespace client {
class Correlator
{
public:
- typedef boost::function<void(framing::AMQMethodBody::shared_ptr)> Listener;
+ typedef boost::function<void(framing::AMQMethodBody*)> Listener;
- void receive(framing::AMQMethodBody::shared_ptr);
+ void receive(framing::AMQMethodBody*);
void listen(Listener l);
private:
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index abfce4f9d1..6ee6429b6b 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -23,31 +23,35 @@
#include "qpid/Exception.h"
#include "qpid/framing/BasicDeliverBody.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/framing/all_method_bodies.h"
using namespace qpid::client;
using namespace qpid::framing;
using namespace boost;
-bool isMessageMethod(AMQMethodBody::shared_ptr method)
+bool isMessageMethod(AMQMethodBody* method)
{
return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>();
}
-bool isMessageMethod(AMQBody::shared_ptr body)
+bool isMessageMethod(AMQBody* body)
{
- return body->type() == METHOD_BODY && isMessageMethod(shared_polymorphic_cast<AMQMethodBody>(body));
+ AMQMethodBody* method=body->getMethod();
+ return method && isMessageMethod(method);
}
bool isContentFrame(AMQFrame& frame)
{
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
uint8_t type = body->type();
return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body);
}
-bool invoke(AMQBody::shared_ptr body, Invocable* target)
+bool invoke(AMQBody* body, Invocable* target)
{
- return body->type() == METHOD_BODY && shared_polymorphic_cast<AMQMethodBody>(body)->invoke(target);
+ AMQMethodBody* method=body->getMethod();
+ return method && method->invoke(target);
}
ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) :
@@ -56,7 +60,7 @@ ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) :
//incoming:
void ExecutionHandler::handle(AMQFrame& frame)
{
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
if (!invoke(body, this)) {
if (isContentFrame(frame)) {
if (!arriving) {
@@ -69,7 +73,7 @@ void ExecutionHandler::handle(AMQFrame& frame)
}
} else {
++incoming.hwm;
- correlation.receive(shared_polymorphic_cast<AMQMethodBody>(body));
+ correlation.receive(body->getMethod());
}
}
}
@@ -95,16 +99,15 @@ void ExecutionHandler::flush()
{
//send completion
incoming.lwm = incoming.hwm;
- //make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
}
void ExecutionHandler::sendFlush()
{
- AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version)));
+ AMQFrame frame(version, 0, ExecutionFlushBody());
out(frame);
}
-void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::Listener f, Correlator::Listener g)
+void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g)
{
//allocate id:
++outgoing.hwm;
@@ -116,18 +119,19 @@ void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::List
correlation.listen(g);
}
- AMQFrame frame(version, 0/*id will be filled in be channel handler*/, command);
+ AMQFrame frame(version, 0/*id will be filled in be channel handler*/,
+ command);
out(frame);
}
-void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeaderProperties& headers, const std::string& data,
+void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data,
CompletionTracker::Listener f, Correlator::Listener g)
{
send(command, f, g);
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header->getProperties()), headers);
- header->setContentSize(data.size());
+ AMQHeaderBody header(BASIC);
+ BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header.getProperties()), headers);
+ header.setContentSize(data.size());
AMQFrame h(version, 0, header);
out(h);
@@ -136,7 +140,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade
//frame itself uses 8 bytes
u_int32_t frag_size = maxFrameSize - 8;
if(data_length < frag_size){
- AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(data)));
+ AMQFrame frame(version, 0, AMQContentBody(data));
out(frame);
}else{
u_int32_t offset = 0;
@@ -144,7 +148,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
- AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(frag)));
+ AMQFrame frame(version, 0, AMQContentBody(frag));
out(frame);
offset += length;
remaining = data_length - offset;
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index f62598ef95..21613df779 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -56,10 +56,10 @@ public:
void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
void handle(framing::AMQFrame& frame);
- void send(framing::AMQBody::shared_ptr command,
+ void send(const framing::AMQBody& command,
CompletionTracker::Listener f = CompletionTracker::Listener(),
Correlator::Listener g = Correlator::Listener());
- void sendContent(framing::AMQBody::shared_ptr command,
+ void sendContent(const framing::AMQBody& command,
const framing::BasicHeaderProperties& headers, const std::string& data,
CompletionTracker::Listener f = CompletionTracker::Listener(),
Correlator::Listener g = Correlator::Listener());
diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp
index 6b1246a449..e63dc9c192 100644
--- a/cpp/src/qpid/client/FutureResponse.cpp
+++ b/cpp/src/qpid/client/FutureResponse.cpp
@@ -26,16 +26,16 @@ using namespace qpid::framing;
using namespace qpid::sys;
-AMQMethodBody::shared_ptr FutureResponse::getResponse()
+AMQMethodBody* FutureResponse::getResponse()
{
waitForCompletion();
- return response;
+ return response.get();
}
-void FutureResponse::received(AMQMethodBody::shared_ptr r)
+void FutureResponse::received(AMQMethodBody* r)
{
Monitor::ScopedLock l(lock);
- response = r;
+ response = *r;
complete = true;
lock.notifyAll();
}
diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/FutureResponse.h
index ccc6fb5894..75b1f72c04 100644
--- a/cpp/src/qpid/client/FutureResponse.h
+++ b/cpp/src/qpid/client/FutureResponse.h
@@ -23,6 +23,7 @@
#define _FutureResponse_
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/MethodHolder.h"
#include "FutureCompletion.h"
namespace qpid {
@@ -30,11 +31,10 @@ namespace client {
class FutureResponse : public FutureCompletion
{
- framing::AMQMethodBody::shared_ptr response;
-
+ framing::MethodHolder response;
public:
- framing::AMQMethodBody::shared_ptr getResponse();
- void received(framing::AMQMethodBody::shared_ptr response);
+ framing::AMQMethodBody* getResponse();
+ void received(framing::AMQMethodBody* response);
};
}}
diff --git a/cpp/src/qpid/client/ReceivedContent.cpp b/cpp/src/qpid/client/ReceivedContent.cpp
index 9cfee21c3c..5a1f48901a 100644
--- a/cpp/src/qpid/client/ReceivedContent.cpp
+++ b/cpp/src/qpid/client/ReceivedContent.cpp
@@ -20,6 +20,7 @@
*/
#include "ReceivedContent.h"
+#include "qpid/framing/all_method_bodies.h"
using qpid::client::ReceivedContent;
using namespace qpid::framing;
@@ -27,9 +28,9 @@ using namespace boost;
ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {}
-void ReceivedContent::append(AMQBody::shared_ptr part)
+void ReceivedContent::append(AMQBody* part)
{
- parts.push_back(part);
+ parts.push_back(AMQFrame(ProtocolVersion(), 0, part));
}
bool ReceivedContent::isComplete() const
@@ -37,7 +38,7 @@ bool ReceivedContent::isComplete() const
if (parts.empty()) {
return false;
} else if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- AMQHeaderBody::shared_ptr headers(getHeaders());
+ const AMQHeaderBody* headers(getHeaders());
return headers && headers->getContentSize() == getContentSize();
} else if (isA<MessageTransferBody>()) {
//no longer support references, headers and data are still method fields
@@ -48,14 +49,14 @@ bool ReceivedContent::isComplete() const
}
-AMQMethodBody::shared_ptr ReceivedContent::getMethod() const
+const AMQMethodBody* ReceivedContent::getMethod() const
{
- return parts.empty() ? AMQMethodBody::shared_ptr() : dynamic_pointer_cast<AMQMethodBody>(parts[0]);
+ return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody());
}
-AMQHeaderBody::shared_ptr ReceivedContent::getHeaders() const
+const AMQHeaderBody* ReceivedContent::getHeaders() const
{
- return parts.size() < 2 ? AMQHeaderBody::shared_ptr() : dynamic_pointer_cast<AMQHeaderBody>(parts[1]);
+ return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody());
}
uint64_t ReceivedContent::getContentSize() const
@@ -63,7 +64,7 @@ uint64_t ReceivedContent::getContentSize() const
if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
uint64_t size(0);
for (uint i = 2; i < parts.size(); i++) {
- size += parts[i]->size();
+ size += parts[i].getBody()->size();
}
return size;
} else if (isA<MessageTransferBody>()) {
@@ -78,7 +79,7 @@ std::string ReceivedContent::getContent() const
if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
string data;
for (uint i = 2; i < parts.size(); i++) {
- data += dynamic_pointer_cast<AMQContentBody>(parts[i])->getData();
+ data += static_cast<const AMQContentBody*>(parts[i].getBody())->getData();
}
return data;
} else if (isA<MessageTransferBody>()) {
@@ -93,7 +94,7 @@ void ReceivedContent::populate(Message& msg)
if (!isComplete()) throw Exception("Incomplete message");
if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(getHeaders()->getProperties());
+ const BasicHeaderProperties* properties = dynamic_cast<const BasicHeaderProperties*>(getHeaders()->getProperties());
BasicHeaderProperties::copy<Message, BasicHeaderProperties>(msg, *properties);
msg.setData(getContent());
} else if (isA<MessageTransferBody>()) {
diff --git a/cpp/src/qpid/client/ReceivedContent.h b/cpp/src/qpid/client/ReceivedContent.h
index 1886034f9b..4f84039c10 100644
--- a/cpp/src/qpid/client/ReceivedContent.h
+++ b/cpp/src/qpid/client/ReceivedContent.h
@@ -20,8 +20,8 @@
*/
#include <string>
#include <vector>
-#include <boost/shared_ptr.hpp>
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/SequenceNumber.h"
#include "ClientMessage.h"
@@ -38,37 +38,29 @@ namespace client {
class ReceivedContent
{
const framing::SequenceNumber id;
- std::vector<framing::AMQBody::shared_ptr> parts;
+ std::vector<framing::AMQFrame> parts;
public:
typedef boost::shared_ptr<ReceivedContent> shared_ptr;
ReceivedContent(const framing::SequenceNumber& id);
- void append(framing::AMQBody::shared_ptr part);
+ void append(framing::AMQBody* part);
bool isComplete() const;
uint64_t getContentSize() const;
std::string getContent() const;
- framing::AMQMethodBody::shared_ptr getMethod() const;
- framing::AMQHeaderBody::shared_ptr getHeaders() const;
+ const framing::AMQMethodBody* getMethod() const;
+ const framing::AMQHeaderBody* getHeaders() const;
template <class T> bool isA() const {
- framing::AMQMethodBody::shared_ptr method = getMethod();
- if (!method) {
- return false;
- } else {
- return method->isA<T>();
- }
+ const framing::AMQMethodBody* method=getMethod();
+ return method && method->isA<T>();
}
- template <class T> boost::shared_ptr<T> as() const {
- framing::AMQMethodBody::shared_ptr method = getMethod();
- if (method && method->isA<T>()) {
- return boost::dynamic_pointer_cast<T>(method);
- } else {
- return boost::shared_ptr<T>();
- }
+ template <class T> const T* as() const {
+ const framing::AMQMethodBody* method=getMethod();
+ return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0;
}
const framing::SequenceNumber& getId() const { return id; }
diff --git a/cpp/src/qpid/client/Response.h b/cpp/src/qpid/client/Response.h
index 745d4648ad..ad37d7c0f4 100644
--- a/cpp/src/qpid/client/Response.h
+++ b/cpp/src/qpid/client/Response.h
@@ -38,12 +38,12 @@ public:
template <class T> T& as()
{
- framing::AMQMethodBody::shared_ptr response(future->getResponse());
+ framing::AMQMethodBody* response(future->getResponse());
return dynamic_cast<T&>(*response);
}
template <class T> bool isA()
{
- framing::AMQMethodBody::shared_ptr response(future->getResponse());
+ framing::AMQMethodBody* response(future->getResponse());
return response && response->isA<T>();
}
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index 391dcd909d..f7ed7416cd 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -44,7 +44,7 @@ void SessionCore::flush()
l3.sendFlush();
}
-Response SessionCore::send(AMQMethodBody::shared_ptr method, bool expectResponse)
+Response SessionCore::send(const AMQMethodBody& method, bool expectResponse)
{
boost::shared_ptr<FutureResponse> f(futures.createResponse());
if (expectResponse) {
@@ -59,7 +59,7 @@ Response SessionCore::send(AMQMethodBody::shared_ptr method, bool expectResponse
return Response(f);
}
-Response SessionCore::send(AMQMethodBody::shared_ptr method, const MethodContent& content, bool expectResponse)
+Response SessionCore::send(const AMQMethodBody& method, const MethodContent& content, bool expectResponse)
{
//TODO: lots of duplication between these two send methods; refactor
boost::shared_ptr<FutureResponse> f(futures.createResponse());
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
index 15cd36114f..bcbaf0028d 100644
--- a/cpp/src/qpid/client/SessionCore.h
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -47,8 +47,8 @@ public:
typedef boost::shared_ptr<SessionCore> shared_ptr;
SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
- Response send(framing::AMQMethodBody::shared_ptr method, bool expectResponse = false);
- Response send(framing::AMQMethodBody::shared_ptr method, const framing::MethodContent& content, bool expectResponse = false);
+ Response send(const framing::AMQMethodBody& method, bool expectResponse = false);
+ Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false);
ReceivedContent::shared_ptr get();
uint16_t getId() const { return id; }
void setSync(bool);
diff --git a/cpp/src/qpid/cluster/ClassifierHandler.cpp b/cpp/src/qpid/cluster/ClassifierHandler.cpp
index 1cce126800..9410a3cc38 100644
--- a/cpp/src/qpid/cluster/ClassifierHandler.cpp
+++ b/cpp/src/qpid/cluster/ClassifierHandler.cpp
@@ -36,7 +36,7 @@ typedef uint32_t FullMethodId; // Combind class & method ID.
FullMethodId fullId(ClassId c, MethodId m) { return c<<16+m; }
-FullMethodId fullId(const shared_ptr<AMQMethodBody>& body) {
+FullMethodId fullId(const AMQMethodBody*& body) {
return fullId(body->amqpClassId(), body->amqpMethodId());
}
@@ -59,8 +59,8 @@ void ClassifierHandler::handle(AMQFrame& frame) {
// TODO aconway 2007-07-03: Flatten the frame hierarchy so we
// can do a single lookup to dispatch a frame.
Chain chosen;
- shared_ptr<AMQMethodBody> method =
- dynamic_pointer_cast<AMQMethodBody>(frame.getBody());
+ AMQMethodBody* method = dynamic_cast<AMQMethodBody*>(frame.getBody());
+
// FIXME aconway 2007-07-05: Need to stop bypassed frames
// from overtaking mcast frames.
//
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index e5af237bd2..d9f1679fdf 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -95,7 +95,7 @@ void Cluster::handle(AMQFrame& frame) {
void Cluster::notify() {
AMQFrame frame(ProtocolVersion(), 0,
- new ClusterNotifyBody(ProtocolVersion(), url));
+ ClusterNotifyBody(ProtocolVersion(), url));
handle(frame);
}
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp
index a4efa939fc..b8290ad953 100644
--- a/cpp/src/qpid/cluster/SessionManager.cpp
+++ b/cpp/src/qpid/cluster/SessionManager.cpp
@@ -36,7 +36,7 @@ using namespace sys;
using namespace broker;
/** Handler to send frames direct to local broker (bypass correlation etc.) */
- struct BrokerHandler : public FrameHandler, private ChannelAdapter, private DeliveryAdapter {
+struct BrokerHandler : public FrameHandler, private ChannelAdapter, private DeliveryAdapter {
Connection connection;
Channel channel;
BrokerAdapter adapter;
@@ -61,13 +61,13 @@ using namespace broker;
}
// Dummy methods.
- virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>){}
- virtual void handleContent(boost::shared_ptr<AMQContentBody>){}
- virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>){}
+ virtual void handleHeader(AMQHeaderBody*){}
+ virtual void handleContent(AMQContentBody*){}
+ virtual void handleHeartbeat(AMQHeartbeatBody*){}
virtual bool isOpen() const{ return true; }
- virtual void handleMethod(shared_ptr<AMQMethodBody>){}
+ virtual void handleMethod(AMQMethodBody*){}
// No-op send.
- virtual void send(shared_ptr<AMQBody>) {}
+ virtual void send(const AMQBody&) {}
//delivery adapter methods, also no-ops:
virtual DeliveryId deliver(Message::shared_ptr&, DeliveryToken::shared_ptr) { return 0; }
diff --git a/cpp/src/qpid/framing/AMQBody.h b/cpp/src/qpid/framing/AMQBody.h
index eaa568c06a..0b2718bef3 100644
--- a/cpp/src/qpid/framing/AMQBody.h
+++ b/cpp/src/qpid/framing/AMQBody.h
@@ -1,3 +1,6 @@
+#ifndef QPID_FRAMING_AMQBODY_H
+#define QPID_FRAMING_AMQBODY_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,42 +21,58 @@
* under the License.
*
*/
-#include <boost/shared_ptr.hpp>
-#include "amqp_types.h"
-#include "Buffer.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/shared_ptr.h"
-#ifndef _AMQBody_
-#define _AMQBody_
+#include <ostream>
namespace qpid {
- namespace framing {
+namespace framing {
+
+class Buffer;
+
+class AMQMethodBody;
+class AMQHeaderBody;
+class AMQContentBody;
+class AMQHeartbeatBody;
+
+struct AMQBodyConstVisitor {
+ virtual ~AMQBodyConstVisitor() {}
+ virtual void visit(const AMQHeaderBody&) = 0;
+ virtual void visit(const AMQContentBody&) = 0;
+ virtual void visit(const AMQHeartbeatBody&) = 0;
+ virtual void visit(const AMQMethodBody&) = 0;
+};
+
+class AMQBody
+{
+ public:
+ typedef shared_ptr<AMQBody> shared_ptr;
+
+ virtual ~AMQBody();
+
+ virtual uint8_t type() const = 0;
- class AMQBody
- {
- public:
- typedef boost::shared_ptr<AMQBody> shared_ptr;
+ virtual void encode(Buffer& buffer) const = 0;
+ virtual void decode(Buffer& buffer, uint32_t=0) = 0;
+ virtual uint32_t size() const = 0;
- virtual ~AMQBody();
- virtual uint32_t size() const = 0;
- virtual uint8_t type() const = 0;
- virtual void encode(Buffer& buffer) const = 0;
- virtual void decode(Buffer& buffer, uint32_t size) = 0;
+ virtual void print(std::ostream& out) const = 0;
+ virtual void accept(AMQBodyConstVisitor&) const = 0;
- virtual void print(std::ostream& out) const = 0;
- };
+ virtual AMQMethodBody* getMethod() { return 0; }
+ virtual const AMQMethodBody* getMethod() const { return 0; }
+};
- std::ostream& operator<<(std::ostream& out, const AMQBody& body) ;
+std::ostream& operator<<(std::ostream& out, const AMQBody& body) ;
- enum BodyTypes {
- METHOD_BODY = 1,
- HEADER_BODY = 2,
- CONTENT_BODY = 3,
- HEARTBEAT_BODY = 8,
- REQUEST_BODY = 9,
- RESPONSE_BODY = 10
- };
- }
-}
+enum BodyTypes {
+ METHOD_BODY = 1,
+ HEADER_BODY = 2,
+ CONTENT_BODY = 3,
+ HEARTBEAT_BODY = 8
+};
+}} // namespace qpid::framing
-#endif
+#endif /*!QPID_FRAMING_AMQBODY_H*/
diff --git a/cpp/src/qpid/framing/AMQContentBody.cpp b/cpp/src/qpid/framing/AMQContentBody.cpp
index 19d2e5714a..c472af555d 100644
--- a/cpp/src/qpid/framing/AMQContentBody.cpp
+++ b/cpp/src/qpid/framing/AMQContentBody.cpp
@@ -12,7 +12,7 @@
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+n * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
diff --git a/cpp/src/qpid/framing/AMQContentBody.h b/cpp/src/qpid/framing/AMQContentBody.h
index 6d4c1ef4b6..a476796015 100644
--- a/cpp/src/qpid/framing/AMQContentBody.h
+++ b/cpp/src/qpid/framing/AMQContentBody.h
@@ -28,13 +28,11 @@
namespace qpid {
namespace framing {
-class AMQContentBody : public AMQBody
+class AMQContentBody : public AMQBody
{
string data;
public:
- typedef boost::shared_ptr<AMQContentBody> shared_ptr;
-
AMQContentBody();
AMQContentBody(const string& data);
inline virtual ~AMQContentBody(){}
@@ -44,6 +42,7 @@ public:
void encode(Buffer& buffer) const;
void decode(Buffer& buffer, uint32_t size);
void print(std::ostream& out) const;
+ void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
};
}
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index f79eae3524..780af71be4 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -1,4 +1,3 @@
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,46 +18,70 @@
* under the License.
*
*/
-#include <boost/format.hpp>
-
#include "AMQFrame.h"
+
#include "qpid/QpidError.h"
-#include "AMQMethodBody.h"
+#include "qpid/framing/variant.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include <boost/format.hpp>
+
+#include <iostream>
namespace qpid {
namespace framing {
+namespace {
+struct GetBodyVisitor : public NoBlankVisitor<AMQBody*> {
+ QPID_USING_NOBLANK(AMQBody*);
+ AMQBody* operator()(MethodHolder& t) const { return t.get(); }
+ template <class T> AMQBody* operator()(T& t) const { return &t; }
+};
+
+struct EncodeVisitor : public NoBlankVisitor<void> {
+ Buffer& buffer;
+ EncodeVisitor(Buffer& b) : buffer(b) {}
+
+ QPID_USING_NOBLANK(void);
+ template <class T> void operator()(const T& t) const { return t.encode(buffer); }
+};
+
+struct SizeVisitor : public NoBlankVisitor<uint32_t> {
+ QPID_USING_NOBLANK(uint32_t);
+ template <class T> uint32_t operator()(const T& t) const { return t.size(); }
+};
+
+struct DecodeVisitor : public NoBlankVisitor<void> {
+ Buffer& buffer;
+ uint32_t size;
+ DecodeVisitor(Buffer& b, uint32_t s) : buffer(b), size(s) {}
+ QPID_USING_NOBLANK(void);
+ void operator()(MethodHolder& t) const { return t.decode(buffer); }
+ template <class T> void operator()(T& t) const { return t.decode(buffer, size); }
+};
-AMQP_MethodVersionMap AMQFrame::versionMap;
-
-AMQFrame::AMQFrame(ProtocolVersion _version)
- : channel(0), type(0), version(_version)
- {
- assert(version != ProtocolVersion(0,0));
- }
-
-AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, AMQBody* _body) : channel(_channel), body(_body),version(_version) {}
+}
-AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, const AMQBody::shared_ptr& _body) :
- channel(_channel), body(_body), version(_version)
-{}
+AMQBody* AMQFrame::getBody() {
+ return boost::apply_visitor(GetBodyVisitor(), body);
+}
-AMQFrame::~AMQFrame() {}
+const AMQBody* AMQFrame::getBody() const {
+ return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body));
+}
void AMQFrame::encode(Buffer& buffer)
{
- buffer.putOctet(body->type());
+ buffer.putOctet(getBody()->type());
buffer.putShort(channel);
- buffer.putLong(body->size());
- body->encode(buffer);
+ buffer.putLong(boost::apply_visitor(SizeVisitor(), body));
+ boost::apply_visitor(EncodeVisitor(buffer), body);
buffer.putOctet(0xCE);
}
uint32_t AMQFrame::size() const{
- assert(body.get());
- return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size()
- + 1/*0xCE*/;
+ return 1/*type*/ + 2/*channel*/ + 4/*body size*/ +
+ boost::apply_visitor(SizeVisitor(), body) + 1/*0xCE*/;
}
bool AMQFrame::decode(Buffer& buffer)
@@ -66,56 +89,42 @@ bool AMQFrame::decode(Buffer& buffer)
if(buffer.available() < 7)
return false;
buffer.record();
- uint32_t frameSize = decodeHead(buffer);
- if(buffer.available() < frameSize + 1){
+
+ uint8_t type = buffer.getOctet();
+ channel = buffer.getShort();
+ uint32_t size = buffer.getLong();
+
+ if(buffer.available() < size+1){
buffer.restore();
return false;
}
- decodeBody(buffer, frameSize);
+ decodeBody(buffer, size, type);
uint8_t end = buffer.getOctet();
if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
return true;
}
-uint32_t AMQFrame::decodeHead(Buffer& buffer){
- type = buffer.getOctet();
- channel = buffer.getShort();
- return buffer.getLong();
-}
-
-void AMQFrame::decodeBody(Buffer& buffer, uint32_t size)
+void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type)
{
switch(type)
{
- case METHOD_BODY:
- body = AMQMethodBody::create(versionMap, version, buffer);
- break;
- case HEADER_BODY:
- body = AMQBody::shared_ptr(new AMQHeaderBody());
- break;
- case CONTENT_BODY:
- body = AMQBody::shared_ptr(new AMQContentBody());
- break;
- case HEARTBEAT_BODY:
- body = AMQBody::shared_ptr(new AMQHeartbeatBody());
- break;
+ case METHOD_BODY: body = MethodHolder(); break;
+ case HEADER_BODY: body = AMQHeaderBody(); break;
+ case CONTENT_BODY: body = AMQContentBody(); break;
+ case HEARTBEAT_BODY: body = AMQHeartbeatBody(); break;
+
default:
THROW_QPID_ERROR(
FRAMING_ERROR,
boost::format("Unknown frame type %d") % type);
}
- body->decode(buffer, size);
+ boost::apply_visitor(DecodeVisitor(buffer,size), body);
}
-std::ostream& operator<<(std::ostream& out, const AMQFrame& t)
+std::ostream& operator<<(std::ostream& out, const AMQFrame& f)
{
- out << "Frame[channel=" << t.channel << "; ";
- if (t.body.get() == 0)
- out << "empty";
- else
- out << *t.body;
- out << "]";
- return out;
+ return out << "Frame[channel=" << f.getChannel() << "; " << *f.getBody()
+ << "]";
}
diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h
index 16c1427802..9e825a9936 100644
--- a/cpp/src/qpid/framing/AMQFrame.h
+++ b/cpp/src/qpid/framing/AMQFrame.h
@@ -21,55 +21,76 @@
* under the License.
*
*/
-#include <boost/cast.hpp>
-
-#include "amqp_types.h"
-#include "AMQBody.h"
#include "AMQDataBlock.h"
-#include "AMQMethodBody.h"
#include "AMQHeaderBody.h"
#include "AMQContentBody.h"
#include "AMQHeartbeatBody.h"
-#include "qpid/framing/AMQP_MethodVersionMap.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/shared_ptr.h"
+#include "MethodHolder.h"
+#include "ProtocolVersion.h"
+
+#include <boost/cast.hpp>
+#include <boost/variant.hpp>
namespace qpid {
namespace framing {
-
class AMQFrame : public AMQDataBlock
{
public:
- AMQFrame(ProtocolVersion _version = highestProtocolVersion);
- AMQFrame(ProtocolVersion _version, uint16_t channel, AMQBody* body);
- AMQFrame(ProtocolVersion _version, uint16_t channel, const AMQBody::shared_ptr& body);
- virtual ~AMQFrame();
- virtual void encode(Buffer& buffer);
- virtual bool decode(Buffer& buffer);
- virtual uint32_t size() const;
- uint16_t getChannel() const { return channel; }
-
- shared_ptr<AMQBody> getBody() { return body; }
- void setBody(const shared_ptr<AMQBody>& b) { body = b; }
-
- /** Convenience template to cast the body to an expected type */
- template <class T> boost::shared_ptr<T> castBody() {
- assert(dynamic_cast<T*>(getBody().get()));
- boost::static_pointer_cast<T>(getBody());
+ AMQFrame(ProtocolVersion=ProtocolVersion()) {}
+
+ /** Construct a frame with a copy of b */
+ AMQFrame(ProtocolVersion, ChannelId c, const AMQBody* b) : channel(c) {
+ setBody(*b);
+ }
+
+ AMQFrame(ProtocolVersion, ChannelId c, const AMQBody& b) : channel(c) {
+ setBody(b);
}
+
+ ChannelId getChannel() const { return channel; }
+ void setChannel(ChannelId c) { channel = c; }
- uint32_t decodeHead(Buffer& buffer);
- void decodeBody(Buffer& buffer, uint32_t size);
+ AMQBody* getBody();
+ const AMQBody* getBody() const;
- uint16_t channel;
- uint8_t type;
- AMQBody::shared_ptr body;
- ProtocolVersion version;
+ /** Copy a body instance to the frame */
+ void setBody(const AMQBody& b) { CopyVisitor cv(*this); b.accept(cv); }
+
+ /** Convenience template to cast the body to an expected type. */
+ template <class T> T* castBody() {
+ boost::polymorphic_downcast<T*>(getBody());
+ }
+
+ bool empty() { return boost::get<boost::blank>(&body); }
+
+ void encode(Buffer& buffer);
+ bool decode(Buffer& buffer);
+ uint32_t size() const;
private:
- static AMQP_MethodVersionMap versionMap;
+ struct CopyVisitor : public AMQBodyConstVisitor {
+ AMQFrame& frame;
+ CopyVisitor(AMQFrame& f) : frame(f) {}
+ void visit(const AMQHeaderBody& x) { frame.body=x; }
+ void visit(const AMQContentBody& x) { frame.body=x; }
+ void visit(const AMQHeartbeatBody& x) { frame.body=x; }
+ void visit(const AMQMethodBody& x) { frame.body=MethodHolder(x); }
+ };
+ friend struct CopyVisitor;
+
+ typedef boost::variant<boost::blank,
+ AMQHeaderBody,
+ AMQContentBody,
+ AMQHeartbeatBody,
+ MethodHolder> Variant;
+
+ void visit(AMQHeaderBody& x) { body=x; }
+
+ void decodeBody(Buffer& buffer, uint32_t size, uint8_t type);
+
+ uint16_t channel;
+ Variant body;
};
std::ostream& operator<<(std::ostream&, const AMQFrame&);
diff --git a/cpp/src/qpid/framing/AMQHeaderBody.cpp b/cpp/src/qpid/framing/AMQHeaderBody.cpp
index 4d3611eb40..6a3c8f27d1 100644
--- a/cpp/src/qpid/framing/AMQHeaderBody.cpp
+++ b/cpp/src/qpid/framing/AMQHeaderBody.cpp
@@ -22,54 +22,34 @@
#include "qpid/QpidError.h"
#include "BasicHeaderProperties.h"
-qpid::framing::AMQHeaderBody::AMQHeaderBody(int classId) : weight(0), contentSize(0){
- createProperties(classId);
-}
+qpid::framing::AMQHeaderBody::AMQHeaderBody(int) : weight(0), contentSize(0) {}
-qpid::framing::AMQHeaderBody::AMQHeaderBody() : properties(0), weight(0), contentSize(0){
-}
+qpid::framing::AMQHeaderBody::AMQHeaderBody() : weight(0), contentSize(0){}
-qpid::framing::AMQHeaderBody::~AMQHeaderBody(){
- delete properties;
-}
+qpid::framing::AMQHeaderBody::~AMQHeaderBody(){}
uint32_t qpid::framing::AMQHeaderBody::size() const{
- return 12 + properties->size();
+ return 12 + properties.size();
}
void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const {
- buffer.putShort(properties->classId());
+ buffer.putShort(properties.classId());
buffer.putShort(weight);
buffer.putLongLong(contentSize);
- properties->encode(buffer);
+ properties.encode(buffer);
}
void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t bufSize){
- uint16_t classId = buffer.getShort();
+ buffer.getShort(); // Ignore classId
weight = buffer.getShort();
contentSize = buffer.getLongLong();
- createProperties(classId);
- properties->decode(buffer, bufSize - 12);
-}
-
-void qpid::framing::AMQHeaderBody::createProperties(int classId){
- switch(classId){
- case BASIC:
- properties = new qpid::framing::BasicHeaderProperties();
- break;
- default:
- THROW_QPID_ERROR(FRAMING_ERROR, "Unknown header class");
- }
+ properties.decode(buffer, bufSize - 12);
}
void qpid::framing::AMQHeaderBody::print(std::ostream& out) const
{
out << "header (" << size() << " bytes)" << " content_size=" << getContentSize();
- const BasicHeaderProperties* props =
- dynamic_cast<const BasicHeaderProperties*>(getProperties());
- if (props) {
- out << ", message_id=" << props->getMessageId();
- out << ", delivery_mode=" << (int) props->getDeliveryMode();
- out << ", headers=" << const_cast<BasicHeaderProperties*>(props)->getHeaders();
- }
+ out << ", message_id=" << properties.getMessageId();
+ out << ", delivery_mode=" << (int) properties.getDeliveryMode();
+ out << ", headers=" << properties.getHeaders();
}
diff --git a/cpp/src/qpid/framing/AMQHeaderBody.h b/cpp/src/qpid/framing/AMQHeaderBody.h
index 691ceeff73..894936060c 100644
--- a/cpp/src/qpid/framing/AMQHeaderBody.h
+++ b/cpp/src/qpid/framing/AMQHeaderBody.h
@@ -21,7 +21,7 @@
#include "amqp_types.h"
#include "AMQBody.h"
#include "Buffer.h"
-#include "HeaderProperties.h"
+#include "BasicHeaderProperties.h"
#ifndef _AMQHeaderBody_
#define _AMQHeaderBody_
@@ -31,19 +31,15 @@ namespace framing {
class AMQHeaderBody : public AMQBody
{
- HeaderProperties* properties;
+ BasicHeaderProperties properties;
uint16_t weight;
uint64_t contentSize;
-
- void createProperties(int classId);
-public:
- typedef boost::shared_ptr<AMQHeaderBody> shared_ptr;
-
+ public:
AMQHeaderBody(int classId);
AMQHeaderBody();
inline uint8_t type() const { return HEADER_BODY; }
- HeaderProperties* getProperties(){ return properties; }
- const HeaderProperties* getProperties() const { return properties; }
+ BasicHeaderProperties* getProperties(){ return &properties; }
+ const BasicHeaderProperties* getProperties() const { return &properties; }
inline uint64_t getContentSize() const { return contentSize; }
inline void setContentSize(uint64_t _size) { contentSize = _size; }
virtual ~AMQHeaderBody();
@@ -51,6 +47,8 @@ public:
virtual void encode(Buffer& buffer) const;
virtual void decode(Buffer& buffer, uint32_t size);
virtual void print(std::ostream& out) const;
+
+ void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
};
}
diff --git a/cpp/src/qpid/framing/AMQHeartbeatBody.h b/cpp/src/qpid/framing/AMQHeartbeatBody.h
index 4c046b81a7..a2701c3398 100644
--- a/cpp/src/qpid/framing/AMQHeartbeatBody.h
+++ b/cpp/src/qpid/framing/AMQHeartbeatBody.h
@@ -31,14 +31,13 @@ namespace framing {
class AMQHeartbeatBody : public AMQBody
{
public:
- typedef boost::shared_ptr<AMQHeartbeatBody> shared_ptr;
-
virtual ~AMQHeartbeatBody();
inline uint32_t size() const { return 0; }
inline uint8_t type() const { return HEARTBEAT_BODY; }
inline void encode(Buffer& ) const {}
inline void decode(Buffer& , uint32_t /*size*/) {}
virtual void print(std::ostream& out) const;
+ void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
};
}
diff --git a/cpp/src/qpid/framing/AMQMethodBody.cpp b/cpp/src/qpid/framing/AMQMethodBody.cpp
index 0a2720e69a..924d906d43 100644
--- a/cpp/src/qpid/framing/AMQMethodBody.cpp
+++ b/cpp/src/qpid/framing/AMQMethodBody.cpp
@@ -18,51 +18,11 @@
* under the License.
*
*/
-#include "AMQFrame.h"
#include "AMQMethodBody.h"
-#include "qpid/QpidError.h"
-#include "qpid/framing/AMQP_MethodVersionMap.h"
namespace qpid {
namespace framing {
-void AMQMethodBody::encodeId(Buffer& buffer) const{
- buffer.putShort(amqpClassId());
- buffer.putShort(amqpMethodId());
-}
-
-void AMQMethodBody::invoke(AMQP_ServerOperations&){
- assert(0);
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server.");
-}
-
-bool AMQMethodBody::invoke(Invocable*) {
- return false;
-}
-
-AMQMethodBody::shared_ptr AMQMethodBody::create(
- AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
- Buffer& buffer)
-{
- ClassMethodId id;
- id.decode(buffer);
- return AMQMethodBody::shared_ptr(
- versionMap.createMethodBody(
- id.classId, id.methodId, version.getMajor(), version.getMinor()));
-}
-
-void AMQMethodBody::ClassMethodId::decode(Buffer& buffer) {
- classId = buffer.getShort();
- methodId = buffer.getShort();
-}
-
-void AMQMethodBody::decode(Buffer& buffer, uint32_t /*size*/) {
- decodeContent(buffer);
-}
-
-void AMQMethodBody::encode(Buffer& buffer) const {
- encodeId(buffer);
- encodeContent(buffer);
-}
+AMQMethodBody::~AMQMethodBody() {}
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h
index 73c5eb78a6..9c776e143b 100644
--- a/cpp/src/qpid/framing/AMQMethodBody.h
+++ b/cpp/src/qpid/framing/AMQMethodBody.h
@@ -21,65 +21,48 @@
* under the License.
*
*/
-#include <iostream>
#include "amqp_types.h"
#include "AMQBody.h"
-#include "Buffer.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/shared_ptr.h"
+
+#include <ostream>
+
+#include <assert.h>
namespace qpid {
namespace framing {
-class AMQP_MethodVersionMap;
+class Buffer;
+class AMQP_ServerOperations;
+class Invocable;
+class MethodBodyConstVisitor;
-class AMQMethodBody : public AMQBody
-{
+class AMQMethodBody : public AMQBody {
public:
- typedef boost::shared_ptr<AMQMethodBody> shared_ptr;
-
- static shared_ptr create(
- AMQP_MethodVersionMap& map, ProtocolVersion version, Buffer& buf);
-
- ProtocolVersion version;
- uint8_t type() const { return METHOD_BODY; }
- AMQMethodBody(uint8_t major, uint8_t minor) : version(major, minor) {}
- AMQMethodBody(ProtocolVersion ver) : version(ver) {}
- virtual ~AMQMethodBody() {}
- void decode(Buffer&, uint32_t);
- virtual void encode(Buffer& buffer) const;
+ AMQMethodBody() {}
+ AMQMethodBody(uint8_t, uint8_t) {}
+
+ virtual ~AMQMethodBody();
+ virtual void accept(MethodBodyConstVisitor&) const = 0;
+
virtual MethodId amqpMethodId() const = 0;
virtual ClassId amqpClassId() const = 0;
- virtual void invoke(AMQP_ServerOperations&);
- virtual bool invoke(Invocable* target);
+ virtual void invoke(AMQP_ServerOperations&) { assert(0); }
+ virtual bool invoke(Invocable*) { return false; }
- template <class T> bool isA() {
+ template <class T> bool isA() const {
return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID;
}
- /** Return request ID or response correlationID */
- virtual RequestId getRequestId() const { return 0; }
-
- virtual bool isRequest() const { return false; }
- virtual bool isResponse() const { return false; }
-
- static uint32_t baseSize() { return 4; }
- protected:
-
- struct ClassMethodId {
- uint16_t classId;
- uint16_t methodId;
- void decode(Buffer& b);
- };
-
- void encodeId(Buffer& buffer) const;
- virtual void encodeContent(Buffer& buffer) const = 0;
- virtual void decodeContent(Buffer& buffer) = 0;
-
- virtual void printPrefix(std::ostream&) const {}
+ virtual uint32_t size() const = 0;
+ virtual uint8_t type() const { return METHOD_BODY; }
- friend class MethodHolder;
+ AMQMethodBody* getMethod() { return this; }
+ const AMQMethodBody* getMethod() const { return this; }
+ void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
};
diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.h b/cpp/src/qpid/framing/BasicHeaderProperties.h
index a6347b37fd..a8ef401b50 100644
--- a/cpp/src/qpid/framing/BasicHeaderProperties.h
+++ b/cpp/src/qpid/framing/BasicHeaderProperties.h
@@ -57,7 +57,7 @@ class BasicHeaderProperties : public HeaderProperties
virtual void encode(Buffer& buffer) const;
virtual void decode(Buffer& buffer, uint32_t size);
- virtual uint8_t classId() { return BASIC; }
+ virtual uint8_t classId() const { return BASIC; }
string getContentType() const { return contentType; }
string getContentEncoding() const { return contentEncoding; }
diff --git a/cpp/src/qpid/framing/Blob.cpp b/cpp/src/qpid/framing/Blob.cpp
new file mode 100644
index 0000000000..0aaeb4138e
--- /dev/null
+++ b/cpp/src/qpid/framing/Blob.cpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Blob.h"
+
+
+namespace qpid {
+namespace framing {
+
+void BlobHelper<void>::destroy(void*) {}
+void BlobHelper<void>::copy(void*, const void*) {}
+
+}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/Blob.h b/cpp/src/qpid/framing/Blob.h
index 1754e851e5..f89bad55ea 100644
--- a/cpp/src/qpid/framing/Blob.h
+++ b/cpp/src/qpid/framing/Blob.h
@@ -33,6 +33,8 @@
namespace boost {
/**
+ * 0-arg typed_in_place_factory constructor and in_place() override.
+ *
* Boost doesn't provide the 0 arg version since it assumes
* in_place_factory will be used when there is no default ctor.
*/
@@ -48,20 +50,29 @@ typed_in_place_factory0<T> in_place() { return typed_in_place_factory0<T>(); }
} // namespace boost
+
namespace qpid {
namespace framing {
using boost::in_place;
-template <class T>
-void destroyT(void* ptr) { static_cast<T*>(ptr)->~T(); }
-inline void nullDestroy(void*) {}
+template <class T> struct BlobHelper {
+ static void destroy(void* ptr) { static_cast<T*>(ptr)->~T(); }
+ static void copy(void* dest, const void* src) {
+ new (dest) T(*static_cast<const T*>(src));
+ }
+};
+
+template <> struct BlobHelper<void> {
+ static void destroy(void*);
+ static void copy(void* to, const void* from);
+};
/**
* A "blob" is a chunk of memory which can contain a single object at
* a time arbitrary type, provided sizeof(T)<=blob.size(). Blob
* ensures proper construction and destruction of its contents,
- * nothing else.
+ * and proper copying between Blobs, but nothing else.
*
* In particular the user must ensure the blob is big enough for its
* contents and must know the type of object in the blob to cast get().
@@ -75,7 +86,13 @@ class Blob
{
boost::aligned_storage<Size> store;
void (*destroy)(void*);
+ void (*copy)(void*, const void*);
+ template <class T> void setType() {
+ destroy=&BlobHelper<T>::destroy;
+ copy=&BlobHelper<T>::copy;
+ }
+
template<class TypedInPlaceFactory>
void construct (const TypedInPlaceFactory& factory,
const boost::typed_in_place_factory_base* )
@@ -84,16 +101,28 @@ class Blob
assert(sizeof(T) <= Size);
clear(); // Destroy old object.
factory.apply(store.address());
- destroy=&destroyT<T>;
+ setType<T>();
}
public:
/** Construct an empty blob. */
- Blob() : destroy(&nullDestroy) {}
+ Blob() { setType<void>(); }
+
+ /** Copy a blob. */
+ Blob(const Blob& b) { *this = b; }
+
+ /** Assign a blob */
+ Blob& operator=(const Blob& b) {
+ setType<void>(); // Exception safety.
+ b.copy(this->get(), b.get());
+ copy = b.copy;
+ destroy = b.destroy;
+ return *this;
+ }
/** @see construct() */
template<class Expr>
- Blob( const Expr & expr ) : destroy(nullDestroy) { construct(expr,&expr); }
+ Blob( const Expr & expr ) { setType<void>(); construct(expr,&expr); }
~Blob() { clear(); }
@@ -106,7 +135,7 @@ class Blob
/** Copy construct an instance of T into the Blob. */
template<class T>
- Blob& operator=(const T& x) { construct(in_place<T>(x)); }
+ Blob& operator=(const T& x) { construct(in_place<T>(x)); return *this; }
/** Get pointer to blob contents. Caller must know how to cast it. */
void* get() { return store.address(); }
@@ -116,17 +145,15 @@ class Blob
/** Destroy the object in the blob making it empty. */
void clear() {
- void (*saveDestroy)(void*) = destroy; // Exception safety
- destroy = &nullDestroy;
- saveDestroy(store.address());
+ void (*oldDestroy)(void*) = destroy;
+ setType<void>();
+ oldDestroy(store.address());
}
- /** True if there is no object allocated in the blob */
- bool empty() { return destroy==nullDestroy; }
-
static size_t size() { return Size; }
};
+
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp
index 7a7bf22f15..53a01141c1 100644
--- a/cpp/src/qpid/framing/BodyHandler.cpp
+++ b/cpp/src/qpid/framing/BodyHandler.cpp
@@ -25,25 +25,28 @@
#include "AMQContentBody.h"
#include "AMQHeartbeatBody.h"
+#include <boost/cast.hpp>
+
using namespace qpid::framing;
using namespace boost;
BodyHandler::~BodyHandler() {}
-void BodyHandler::handleBody(shared_ptr<AMQBody> body) {
+// TODO aconway 2007-08-13: Replace with visitor.
+void BodyHandler::handleBody(AMQBody* body) {
switch(body->type())
{
case METHOD_BODY:
- handleMethod(shared_polymorphic_cast<AMQMethodBody>(body));
+ handleMethod(polymorphic_downcast<AMQMethodBody*>(body));
break;
case HEADER_BODY:
- handleHeader(shared_polymorphic_cast<AMQHeaderBody>(body));
+ handleHeader(polymorphic_downcast<AMQHeaderBody*>(body));
break;
case CONTENT_BODY:
- handleContent(shared_polymorphic_cast<AMQContentBody>(body));
+ handleContent(polymorphic_downcast<AMQContentBody*>(body));
break;
case HEARTBEAT_BODY:
- handleHeartbeat(shared_polymorphic_cast<AMQHeartbeatBody>(body));
+ handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body));
break;
default:
QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type());
diff --git a/cpp/src/qpid/framing/BodyHandler.h b/cpp/src/qpid/framing/BodyHandler.h
index 07d1658afa..9ded737195 100644
--- a/cpp/src/qpid/framing/BodyHandler.h
+++ b/cpp/src/qpid/framing/BodyHandler.h
@@ -32,6 +32,8 @@ class AMQHeaderBody;
class AMQContentBody;
class AMQHeartbeatBody;
+// TODO aconway 2007-08-10: rework using Visitor pattern?
+
/**
* Interface to handle incoming frame bodies.
* Derived classes provide logic for each frame type.
@@ -39,13 +41,13 @@ class AMQHeartbeatBody;
class BodyHandler {
public:
virtual ~BodyHandler();
- virtual void handleBody(boost::shared_ptr<AMQBody> body);
+ virtual void handleBody(AMQBody* body);
protected:
- virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0;
- virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0;
- virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0;
- virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) = 0;
+ virtual void handleMethod(AMQMethodBody*) = 0;
+ virtual void handleHeader(AMQHeaderBody*) = 0;
+ virtual void handleContent(AMQContentBody*) = 0;
+ virtual void handleHeartbeat(AMQHeartbeatBody*) = 0;
};
}}
diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp
index d61126bc7f..25ff46acdd 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.cpp
+++ b/cpp/src/qpid/framing/ChannelAdapter.cpp
@@ -23,6 +23,9 @@
#include "FrameHandler.h"
#include "qpid/Exception.h"
+#include "AMQMethodBody.h"
+#include "qpid/framing/ConnectionOpenBody.h"
+
using boost::format;
namespace qpid {
@@ -45,7 +48,7 @@ void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v)
handlers.out= make_shared_ptr(new OutputHandlerFrameHandler(out));
}
-void ChannelAdapter::send(shared_ptr<AMQBody> body)
+void ChannelAdapter::send(const AMQBody& body)
{
assertChannelOpen();
AMQFrame frame(getVersion(), getId(), body);
diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h
index 9e418013eb..729f5e7b47 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.h
+++ b/cpp/src/qpid/framing/ChannelAdapter.h
@@ -25,11 +25,11 @@
*
*/
-#include "qpid/shared_ptr.h"
#include "BodyHandler.h"
#include "ProtocolVersion.h"
#include "amqp_types.h"
#include "FrameHandler.h"
+#include "OutputHandler.h"
namespace qpid {
namespace framing {
@@ -66,7 +66,7 @@ class ChannelAdapter : protected BodyHandler {
ChannelId getId() const { return id; }
ProtocolVersion getVersion() const { return version; }
- virtual void send(shared_ptr<AMQBody> body);
+ virtual void send(const AMQBody& body);
virtual bool isOpen() const = 0;
@@ -75,7 +75,7 @@ class ChannelAdapter : protected BodyHandler {
void assertChannelOpen() const;
void assertChannelNotOpen() const;
- virtual void handleMethod(shared_ptr<AMQMethodBody>) = 0;
+ virtual void handleMethod(AMQMethodBody*) = 0;
private:
class ChannelAdapterHandler;
diff --git a/cpp/src/qpid/framing/Frame.cpp b/cpp/src/qpid/framing/Frame.cpp
deleted file mode 100644
index 1ba8112faa..0000000000
--- a/cpp/src/qpid/framing/Frame.cpp
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <boost/format.hpp>
-
-#include "Frame.h"
-#include "qpid/QpidError.h"
-
-
-namespace qpid {
-namespace framing {
-
-namespace {
-struct GetBodyVisitor : public NoBlankVisitor<AMQBody*> {
- QPID_USING_NOBLANK(AMQBody*);
- AMQBody* operator()(MethodHolder& h) const { return h.getMethod(); }
- template <class T> AMQBody* operator()(T& t) const { return &t; }
-};
-}
-
-AMQBody* Frame::getBody() {
- return boost::apply_visitor(GetBodyVisitor(), body);
-}
-
-const AMQBody* Frame::getBody() const {
- return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body));
-}
-
-void Frame::encode(Buffer& buffer)
-{
- buffer.putOctet(getBody()->type());
- buffer.putShort(channel);
- buffer.putLong(getBody()->size());
- getBody()->encode(buffer);
- buffer.putOctet(0xCE);
-}
-
-uint32_t Frame::size() const{
- return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + getBody()->size()
- + 1/*0xCE*/;
-}
-
-bool Frame::decode(Buffer& buffer)
-{
- if(buffer.available() < 7)
- return false;
- buffer.record();
- uint32_t frameSize = decodeHead(buffer);
- if(buffer.available() < frameSize + 1){
- buffer.restore();
- return false;
- }
- decodeBody(buffer, frameSize);
- uint8_t end = buffer.getOctet();
- if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
- return true;
-}
-
-uint32_t Frame::decodeHead(Buffer& buffer){
- type = buffer.getOctet();
- channel = buffer.getShort();
- return buffer.getLong();
-}
-
-void Frame::decodeBody(Buffer& buffer, uint32_t size)
-{
- switch(type)
- {
- case METHOD_BODY:
- case REQUEST_BODY:
- case RESPONSE_BODY: {
- ClassId c=buffer.getShort();
- MethodId m=buffer.getShort();
- body = MethodHolder(c,m);
- break;
- }
- case HEADER_BODY:
- body = AMQHeaderBody();
- break;
- case CONTENT_BODY:
- body = AMQContentBody();
- break;
- case HEARTBEAT_BODY:
- body = AMQHeartbeatBody();
- break;
- default:
- THROW_QPID_ERROR(
- FRAMING_ERROR,
- boost::format("Unknown frame type %d") % type);
- }
- getBody()->decode(buffer, size);
-}
-
-std::ostream& operator<<(std::ostream& out, const Frame& f)
-{
- return out << "Frame[channel=" << f.getChannel() << "; " << f.body << "]";
-}
-
-
-}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/Frame.h b/cpp/src/qpid/framing/Frame.h
deleted file mode 100644
index 02ab15a828..0000000000
--- a/cpp/src/qpid/framing/Frame.h
+++ /dev/null
@@ -1,72 +0,0 @@
-#ifndef QPID_FRAMING_FRAME_H
-#define QPID_FRAMING_FRAME_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "AMQDataBlock.h"
-#include "AMQHeaderBody.h"
-#include "AMQContentBody.h"
-#include "AMQHeartbeatBody.h"
-#include "MethodHolder.h"
-
-namespace qpid {
-namespace framing {
-
-class Frame : public AMQDataBlock {
- public:
- typedef boost::variant<boost::blank,
- AMQHeaderBody,
- AMQContentBody,
- AMQHeartbeatBody,
- MethodHolder> Variant;
-
- Frame(ChannelId channel_=0, const Variant& body_=Variant())
- : body(body_), channel(channel_) {}
-
- void encode(Buffer& buffer);
- bool decode(Buffer& buffer);
- uint32_t size() const;
-
- uint16_t getChannel() const { return channel; }
-
- AMQBody* getBody();
- const AMQBody* getBody() const;
-
- template <class T> T* castBody() {
- return boost::polymorphic_downcast<T*>(getBody());
- }
-
- Variant body;
-
- private:
- uint32_t decodeHead(Buffer& buffer);
- void decodeBody(Buffer& buffer, uint32_t size);
-
- uint8_t type;
- uint16_t channel;
-};
-
-std::ostream& operator<<(std::ostream&, const Frame&);
-
-}} // namespace qpid::framing
-
-
-#endif /*!QPID_FRAMING_FRAME_H*/
diff --git a/cpp/src/qpid/framing/HeaderProperties.h b/cpp/src/qpid/framing/HeaderProperties.h
index ae8b796aa9..0c805922e8 100644
--- a/cpp/src/qpid/framing/HeaderProperties.h
+++ b/cpp/src/qpid/framing/HeaderProperties.h
@@ -34,7 +34,7 @@ namespace framing {
public:
inline virtual ~HeaderProperties(){}
- virtual uint8_t classId() = 0;
+ virtual uint8_t classId() const = 0;
virtual uint32_t size() const = 0;
virtual void encode(Buffer& buffer) const = 0;
virtual void decode(Buffer& buffer, uint32_t size) = 0;
diff --git a/cpp/src/qpid/framing/MethodHolder.cpp b/cpp/src/qpid/framing/MethodHolder.cpp
index 43997e6d55..de8f0da6d4 100644
--- a/cpp/src/qpid/framing/MethodHolder.cpp
+++ b/cpp/src/qpid/framing/MethodHolder.cpp
@@ -23,8 +23,8 @@
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/Buffer.h"
-// Note: MethodHolder::construct is in a separate generated file
-// MethodHolder_construct.cpp
+// Note: MethodHolder::construct is and operator= are code-generated
+// in file MethodHolder_construct.cpp.
using namespace boost;
@@ -35,16 +35,18 @@ void MethodHolder::encode(Buffer& b) const {
const AMQMethodBody* body = get();
b.putShort(body->amqpClassId());
b.putShort(body->amqpMethodId());
- body->encodeContent(b);
+ body->encode(b);
}
void MethodHolder::decode(Buffer& b) {
- construct(std::make_pair(b.getShort(), b.getShort()));
- get()->decodeContent(b);
+ ClassId c=b.getShort();
+ MethodId m=b.getShort();
+ construct(c,m);
+ get()->decode(b);
}
uint32_t MethodHolder::size() const {
- return sizeof(Id)+get()->size();
+ return sizeof(ClassId)+sizeof(MethodId)+get()->size();
}
std::ostream& operator<<(std::ostream& out, const MethodHolder& h) {
diff --git a/cpp/src/qpid/framing/MethodHolder.h b/cpp/src/qpid/framing/MethodHolder.h
index fa14db2f79..59dbb1a708 100644
--- a/cpp/src/qpid/framing/MethodHolder.h
+++ b/cpp/src/qpid/framing/MethodHolder.h
@@ -22,64 +22,74 @@
*
*/
+#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/amqp_types.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/Blob.h"
#include "qpid/framing/MethodHolderMaxSize.h" // Generated file.
+#include <boost/type_traits/is_base_and_derived.hpp>
+#include <boost/utility/enable_if.hpp>
+
#include <utility>
namespace qpid {
namespace framing {
class AMQMethodBody;
+class AMQBody;
class Buffer;
+class MethodHolder;
+std::ostream& operator<<(std::ostream& out, const MethodHolder& h);
+
/**
* Holder for arbitrary method body.
*/
+// TODO aconway 2007-08-14: Fix up naming, this class should really be
+// called AMQMethodBody and use a different name for the root of
+// the concrete method body tree, which should not inherit AMQBody.
+//
class MethodHolder
{
- public:
- typedef std::pair<ClassId, MethodId> Id;
-
- template <class T>static Id idOf() {
- return std::make_pair(T::CLASS_ID, T::METHOD_ID); }
+ template <class T> struct EnableIfMethod:
+ public boost::enable_if<boost::is_base_and_derived<AMQMethodBody,T>,T>
+ {};
+ template <class T> EnableIfMethod<T>& assertMethod(T& t) { return t; }
+
+ public:
MethodHolder() {}
- MethodHolder(const Id& id) { construct(id); }
- MethodHolder(ClassId& c, MethodId& m) { construct(std::make_pair(c,m)); }
+ MethodHolder(ClassId& c, MethodId& m) { construct(c,m); }
- template <class M>
- MethodHolder(const M& m) : blob(m), id(idOf<M>()) {}
+ /** Construct with a copy of a method body. */
+ MethodHolder(const AMQMethodBody& m) { *this = m; }
- template <class M>
- MethodHolder& operator=(const M& m) { blob=m; id=idOf<M>(); return *this; }
+ /** Copy method body into holder. */
+ MethodHolder& operator=(const AMQMethodBody&);
- /** Construct the method body corresponding to Id */
- void construct(const Id&);
+ /** Construct the method body corresponding to class/method id */
+ void construct(ClassId c, MethodId m);
+ uint8_t type() const { return 1; }
void encode(Buffer&) const;
void decode(Buffer&);
uint32_t size() const;
AMQMethodBody* get() {
- return static_cast<AMQMethodBody*>(blob.get());
+ return reinterpret_cast<AMQMethodBody*>(blob.get());
}
const AMQMethodBody* get() const {
- return static_cast<const AMQMethodBody*>(blob.get());
+ return reinterpret_cast<const AMQMethodBody*>(blob.get());
}
- AMQMethodBody* operator* () { return get(); }
- const AMQMethodBody* operator*() const { return get(); }
- AMQMethodBody* operator-> () { return get(); }
- const AMQMethodBody* operator->() const { return get(); }
-
private:
Blob<MAX_METHODBODY_SIZE> blob;
- Id id;
+ class CopyVisitor;
+ friend struct CopyVisitor;
};
-std::ostream& operator<<(std::ostream& out, const MethodHolder& h);
+
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h
index 888c6a7382..eec28333bc 100644
--- a/cpp/src/qpid/framing/amqp_framing.h
+++ b/cpp/src/qpid/framing/amqp_framing.h
@@ -26,7 +26,6 @@
#include "AMQHeaderBody.h"
#include "AMQContentBody.h"
#include "AMQHeartbeatBody.h"
-#include "qpid/framing/AMQP_MethodVersionMap.h"
#include "InputHandler.h"
#include "OutputHandler.h"
#include "InitiationHandler.h"
diff --git a/cpp/src/qpid/framing/amqp_types.h b/cpp/src/qpid/framing/amqp_types.h
index 5ea08a69af..bfd5b2206f 100644
--- a/cpp/src/qpid/framing/amqp_types.h
+++ b/cpp/src/qpid/framing/amqp_types.h
@@ -44,8 +44,6 @@ namespace framing {
using std::string;
typedef uint8_t FrameType;
typedef uint16_t ChannelId;
-typedef uint64_t RequestId;
-typedef uint64_t ResponseId;
typedef uint32_t BatchOffset;
typedef uint16_t ClassId;
typedef uint16_t MethodId;
diff --git a/cpp/src/qpid/shared_ptr.h b/cpp/src/qpid/shared_ptr.h
index eb5f3f906a..0c933ea6a6 100644
--- a/cpp/src/qpid/shared_ptr.h
+++ b/cpp/src/qpid/shared_ptr.h
@@ -42,6 +42,8 @@ shared_ptr<T> make_shared_ptr(T* ptr, D deleter) {
return shared_ptr<T>(ptr, deleter);
}
+inline void nullDeleter(void const *) {}
+
} // namespace qpid
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index 6ee7a84beb..062564d8ed 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -134,7 +134,9 @@ Socket::recv(void* data, size_t size) const
int Socket::listen(int port, int backlog) const
{
- const int& socket = impl->fd;
+ const int& socket = impl->fd;
+ int yes=1;
+ QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
struct sockaddr_in name;
name.sin_family = AF_INET;
name.sin_port = htons(port);
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index eb67601875..6c9b910637 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -31,6 +31,7 @@
#include "MockChannel.h"
#include "qpid/broker/Connection.h"
#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/ConnectionStartBody.h"
#include <vector>
using namespace boost;
@@ -233,18 +234,18 @@ class BrokerChannelTest : public CppUnit::TestCase
Queue::shared_ptr queue(new Queue("my_queue"));
exchange->bind(queue, "", 0);
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ AMQHeaderBody header(BASIC);
uint64_t contentSize(0);
for (int i = 0; i < 3; i++) {
contentSize += data[i].size();
}
- header->setContentSize(contentSize);
+ header.setContentSize(contentSize);
channel.handlePublish(msg);
- channel.handleHeader(header);
+ channel.handleHeader(&header);
for (int i = 0; i < 3; i++) {
- AMQContentBody::shared_ptr body(new AMQContentBody(data[i]));
- channel.handleContent(body);
+ AMQContentBody body(data[i]);
+ channel.handleContent(&body);
}
Message::shared_ptr msg2 = queue->dequeue();
CPPUNIT_ASSERT_EQUAL(msg, msg2.get());
@@ -312,8 +313,7 @@ class BrokerChannelTest : public CppUnit::TestCase
//there will always be a connection-start frame
CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
- CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
- handler.frames[0].getBody().get()));
+ CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(handler.frames[0].getBody()));
const string data("abcdefghijklmn");
@@ -340,17 +340,17 @@ class BrokerChannelTest : public CppUnit::TestCase
{
BasicMessage* msg = new BasicMessage(
0, exchange, routingKey, false, false);
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(contentSize);
- msg->setHeader(header);
+ AMQHeaderBody header(BASIC);
+ header.setContentSize(contentSize);
+ msg->setHeader(&header);
msg->getHeaderProperties()->setMessageId(messageId);
return msg;
}
void addContent(Message::shared_ptr msg, const string& data)
{
- AMQContentBody::shared_ptr body(new AMQContentBody(data));
- msg->addContent(body);
+ AMQContentBody body(data);
+ msg->addContent(&body);
}
};
diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp
index 811fc0133e..2d23b87627 100644
--- a/cpp/src/tests/Cluster.cpp
+++ b/cpp/src/tests/Cluster.cpp
@@ -33,7 +33,7 @@ static const ProtocolVersion VER;
/** Verify membership in a cluster with one member. */
BOOST_AUTO_TEST_CASE(testClusterOne) {
TestCluster cluster("clusterOne", "amqp:one:1");
- AMQFrame send(VER, 1, new SessionPingBody(VER));
+ AMQFrame send(VER, 1, SessionPingBody(VER));
cluster.handle(send);
AMQFrame received;
BOOST_REQUIRE(cluster.received.waitPop(received));
@@ -59,7 +59,7 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) {
BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
// Exchange frames with child.
- AMQFrame send(VER, 1, new SessionPingBody(VER));
+ AMQFrame send(VER, 1, SessionPingBody(VER));
cluster.handle(send);
AMQFrame received;
BOOST_REQUIRE(cluster.received.waitPop(received));
@@ -90,8 +90,8 @@ struct CountHandler : public FrameHandler {
/** Test the ClassifierHandler */
BOOST_AUTO_TEST_CASE(testClassifierHandlerWiring) {
- AMQFrame queueDecl(VER, 0, new QueueDeclareBody(VER));
- AMQFrame messageTrans(VER, 0, new MessageTransferBody(VER));
+ AMQFrame queueDecl(VER, 0, QueueDeclareBody(VER));
+ AMQFrame messageTrans(VER, 0, MessageTransferBody(VER));
shared_ptr<CountHandler> wiring(new CountHandler());
shared_ptr<CountHandler> other(new CountHandler());
diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h
index 510a788f7c..366ea92a8b 100644
--- a/cpp/src/tests/Cluster.h
+++ b/cpp/src/tests/Cluster.h
@@ -55,8 +55,6 @@ class TestHandler : public Handler<T&>, public ConcurrentQueue<T>
typedef TestHandler<AMQFrame> TestFrameHandler;
-void nullDeleter(void*) {}
-
struct TestCluster : public Cluster
{
TestCluster(string name, string url)
diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp
index 7bea9d0490..62ccb9bd72 100644
--- a/cpp/src/tests/Cluster_child.cpp
+++ b/cpp/src/tests/Cluster_child.cpp
@@ -40,7 +40,7 @@ void clusterTwo() {
BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *frame.getBody());
BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
- AMQFrame send(VER, 1, new SessionPongBody(VER));
+ AMQFrame send(VER, 1, SessionPongBody(VER));
cluster.handle(send);
BOOST_REQUIRE(cluster.received.waitPop(frame));
BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *frame.getBody());
diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp
index b0aeb9db6f..a0dd8d37f6 100644
--- a/cpp/src/tests/FramingTest.cpp
+++ b/cpp/src/tests/FramingTest.cpp
@@ -18,24 +18,27 @@
* under the License.
*
*/
+#include "InProcessBroker.h"
+#include "qpid/QpidError.h"
+#include "qpid/client/ClientExchange.h"
+#include "qpid/client/ClientQueue.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Connector.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/framing/BasicGetOkBody.h"
#include "qpid/framing/ConnectionRedirectBody.h"
#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/all_method_bodies.h"
#include "qpid/framing/amqp_framing.h"
-#include <iostream>
#include "qpid_test_plugin.h"
+
+#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
+#include <iostream>
+
+#include <memory>
#include <sstream>
#include <typeinfo>
-#include "qpid/QpidError.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include "InProcessBroker.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/Connector.h"
-#include "qpid/client/ClientExchange.h"
-#include "qpid/client/ClientQueue.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include <memory>
-#include <boost/lexical_cast.hpp>
-#include <boost/bind.hpp>
using namespace qpid;
using namespace qpid::framing;
@@ -62,13 +65,11 @@ class FramingTest : public CppUnit::TestCase
CPPUNIT_TEST(testInlineContent);
CPPUNIT_TEST(testContentReference);
CPPUNIT_TEST(testContentValidation);
- CPPUNIT_TEST(testRequestResponseRoundtrip);
CPPUNIT_TEST_SUITE_END();
private:
Buffer buffer;
ProtocolVersion version;
- AMQP_MethodVersionMap versionMap;
public:
@@ -77,10 +78,10 @@ class FramingTest : public CppUnit::TestCase
void testBasicQosBody()
{
BasicQosBody in(version, 0xCAFEBABE, 0xABBA, true);
- in.encodeContent(buffer);
+ in.encode(buffer);
buffer.flip();
BasicQosBody out(version);
- out.decodeContent(buffer);
+ out.decode(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -88,10 +89,10 @@ class FramingTest : public CppUnit::TestCase
{
std::string s = "security credential";
ConnectionSecureBody in(version, s);
- in.encodeContent(buffer);
+ in.encode(buffer);
buffer.flip();
ConnectionSecureBody out(version);
- out.decodeContent(buffer);
+ out.decode(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -100,10 +101,10 @@ class FramingTest : public CppUnit::TestCase
std::string a = "hostA";
std::string b = "hostB";
ConnectionRedirectBody in(version, a, b);
- in.encodeContent(buffer);
+ in.encode(buffer);
buffer.flip();
ConnectionRedirectBody out(version);
- out.decodeContent(buffer);
+ out.decode(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -111,10 +112,10 @@ class FramingTest : public CppUnit::TestCase
{
std::string s = "text";
AccessRequestBody in(version, s, true, false, true, false, true);
- in.encodeContent(buffer);
+ in.encode(buffer);
buffer.flip();
AccessRequestBody out(version);
- out.decodeContent(buffer);
+ out.decode(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -124,10 +125,10 @@ class FramingTest : public CppUnit::TestCase
std::string t = "tag";
BasicConsumeBody in(version, 0, q, t, false, true, false, false,
FieldTable());
- in.encodeContent(buffer);
+ in.encode(buffer);
buffer.flip();
BasicConsumeBody out(version);
- out.decodeContent(buffer);
+ out.decode(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -137,7 +138,7 @@ class FramingTest : public CppUnit::TestCase
std::string a = "hostA";
std::string b = "hostB";
AMQFrame in(version, 999,
- new ConnectionRedirectBody(version, a, b));
+ ConnectionRedirectBody(version, a, b));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -148,7 +149,7 @@ class FramingTest : public CppUnit::TestCase
void testBasicConsumeOkBodyFrame()
{
std::string s = "hostA";
- AMQFrame in(version, 999, new BasicConsumeOkBody(version, s));
+ AMQFrame in(version, 999, BasicConsumeOkBody(version, s));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -211,46 +212,6 @@ class FramingTest : public CppUnit::TestCase
}
- // expect may contain null chars so use string(ptr,size) constructor
- // Use sizeof(expect)-1 to strip the trailing null.
-#define ASSERT_FRAME(expect, frame) \
- CPPUNIT_ASSERT_EQUAL(string(expect, sizeof(expect)-1), boost::lexical_cast<string>(frame))
-
- void testRequestResponseRoundtrip() {
- boost::shared_ptr<broker::InProcessBroker> ibroker(new broker::InProcessBroker(version));
- client::Connection clientConnection(boost::static_pointer_cast<client::Connector>(ibroker));
- clientConnection.open("");
- client::Channel c;
- clientConnection.openChannel(c);
-
- client::Exchange exchange(
- "MyExchange", client::Exchange::TOPIC_EXCHANGE);
- client::Queue queue("MyQueue", true);
- c.declareExchange(exchange);
- c.declareQueue(queue);
- c.bind(exchange, queue, "MyTopic", framing::FieldTable());
- c.close();
- clientConnection.close();
- broker::InProcessBroker::Conversation::const_iterator i = ibroker->conversation.begin();
- ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionOpenOk: knownHosts=]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; ChannelOpen: outOfBand=]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; ChannelOpenOk: ]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=1; rangedExecutionSet={}]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=3; rangedExecutionSet={}]", *i++);
- }
};
diff --git a/cpp/src/tests/InMemoryContentTest.cpp b/cpp/src/tests/InMemoryContentTest.cpp
index cb729f9930..bc95548d45 100644
--- a/cpp/src/tests/InMemoryContentTest.cpp
+++ b/cpp/src/tests/InMemoryContentTest.cpp
@@ -65,9 +65,8 @@ public:
CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size());
for (unsigned int i = 0; i < outCount; i++) {
- AMQContentBody::shared_ptr chunk(
- dynamic_pointer_cast<AMQContentBody>(
- channel.out.frames[i].getBody()));
+ AMQContentBody* chunk = dynamic_cast<AMQContentBody*>(
+ channel.out.frames[i].getBody());
CPPUNIT_ASSERT(chunk);
CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData());
CPPUNIT_ASSERT_EQUAL(
@@ -78,8 +77,8 @@ public:
void addframes(InMemoryContent& content, size_t frameCount, string* frameData)
{
for (unsigned int i = 0; i < frameCount; i++) {
- AMQContentBody::shared_ptr frame(new AMQContentBody(frameData[i]));
- content.add(frame);
+ AMQContentBody frame(frameData[i]);
+ content.add(&frame);
}
}
diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h
index 9f30ee584d..0e5f3895f9 100644
--- a/cpp/src/tests/InProcessBroker.h
+++ b/cpp/src/tests/InProcessBroker.h
@@ -54,7 +54,7 @@ class InProcessBroker : public client::Connector {
template <class MethodType>
MethodType* asMethod() {
- return dynamic_cast<MethodType*>(frame.getBody().get());
+ return dynamic_cast<MethodType*>(frame.getBody());
}
framing::AMQFrame frame;
Sender sender;
diff --git a/cpp/src/tests/LazyLoadedContentTest.cpp b/cpp/src/tests/LazyLoadedContentTest.cpp
index 7bac3613ad..df46f6b48e 100644
--- a/cpp/src/tests/LazyLoadedContentTest.cpp
+++ b/cpp/src/tests/LazyLoadedContentTest.cpp
@@ -97,7 +97,8 @@ public:
CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size());
for (unsigned int i = 0; i < outCount; i++) {
- AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[i].getBody()));
+ AMQContentBody* chunk(dynamic_cast<AMQContentBody*>(
+ channel.out.frames[i].getBody()));
CPPUNIT_ASSERT(chunk);
CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData());
CPPUNIT_ASSERT_EQUAL(
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 67b173af33..c9a08e6075 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -43,7 +43,6 @@ check_PROGRAMS+=Shlib
Shlib_SOURCES=Shlib.cpp
Shlib_LDADD=-lboost_unit_test_framework $(lib_common)
-
# TODO aconway 2007-08-07: Why aren't these tests run automatically?
check_PROGRAMS+=ConcurrentQueue
@@ -54,10 +53,6 @@ check_PROGRAMS+=Serializer
Serializer_SOURCES=Serializer.cpp
Serializer_LDADD=-lboost_unit_test_framework $(lib_common)
-check_PROGRAMS+=Visitor
-Visitor_SOURCES=Visitor.cpp
-Visitor_LDADD=-lboost_unit_test_framework
-
include cluster.mk
#
diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp
index 3d2ee1aaea..a12fc603ce 100644
--- a/cpp/src/tests/MessageBuilderTest.cpp
+++ b/cpp/src/tests/MessageBuilderTest.cpp
@@ -119,12 +119,12 @@ class MessageBuilderTest : public CppUnit::TestCase
Message::shared_ptr message(
new BasicMessage(
0, "test", "my_routing_key", false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(0);
+ AMQHeaderBody header(BASIC);
+ header.setContentSize(0);
builder.initialise(message);
CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(header);
+ builder.setHeader(&header);
CPPUNIT_ASSERT(handler.msg);
CPPUNIT_ASSERT_EQUAL(message, handler.msg);
}
@@ -137,15 +137,15 @@ class MessageBuilderTest : public CppUnit::TestCase
Message::shared_ptr message(
new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(7);
- AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
+ AMQHeaderBody header(BASIC);
+ header.setContentSize(7);
+ AMQContentBody part1(data1);
builder.initialise(message);
CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(header);
+ builder.setHeader(&header);
CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(part1);
+ builder.addContent(&part1);
CPPUNIT_ASSERT(handler.msg);
CPPUNIT_ASSERT_EQUAL(message, handler.msg);
}
@@ -159,18 +159,18 @@ class MessageBuilderTest : public CppUnit::TestCase
Message::shared_ptr message(
new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(14);
- AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
- AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
+ AMQHeaderBody header(BASIC);
+ header.setContentSize(14);
+ AMQContentBody part1(data1);
+ AMQContentBody part2(data2);
builder.initialise(message);
CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(header);
+ builder.setHeader(&header);
CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(part1);
+ builder.addContent(&part1);
CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(part2);
+ builder.addContent(&part2);
CPPUNIT_ASSERT(handler.msg);
CPPUNIT_ASSERT_EQUAL(message, handler.msg);
}
@@ -189,19 +189,19 @@ class MessageBuilderTest : public CppUnit::TestCase
Message::shared_ptr message(
new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(14);
- BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+ AMQHeaderBody header(BASIC);
+ header.setContentSize(14);
+ BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header.getProperties());
properties->setMessageId("MyMessage");
properties->getHeaders().setString("abc", "xyz");
- AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
- AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
+ AMQContentBody part1(data1);
+ AMQContentBody part2(data2);
builder.initialise(message);
- builder.setHeader(header);
- builder.addContent(part1);
- builder.addContent(part2);
+ builder.setHeader(&header);
+ builder.addContent(&part1);
+ builder.addContent(&part2);
CPPUNIT_ASSERT(handler.msg);
CPPUNIT_ASSERT_EQUAL(message, handler.msg);
diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp
index 1e976312be..1fbb18b7d3 100644
--- a/cpp/src/tests/MessageTest.cpp
+++ b/cpp/src/tests/MessageTest.cpp
@@ -47,13 +47,13 @@ class MessageTest : public CppUnit::TestCase
BasicMessage::shared_ptr msg(
new BasicMessage(0, exchange, routingKey, false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(14);
- AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
- AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
- msg->setHeader(header);
- msg->addContent(part1);
- msg->addContent(part2);
+ AMQHeaderBody header(BASIC);
+ header.setContentSize(14);
+ AMQContentBody part1(data1);
+ AMQContentBody part2(data2);
+ msg->setHeader(&header);
+ msg->addContent(&part1);
+ msg->addContent(&part2);
msg->getHeaderProperties()->setMessageId(messageId);
msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
@@ -75,7 +75,8 @@ class MessageTest : public CppUnit::TestCase
MockChannel channel(1);
msg->deliver(channel, "ignore", 0, 100);
CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size());
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[2].getBody()));
+ AMQContentBody* contentBody(
+ dynamic_cast<AMQContentBody*>(channel.out.frames[2].getBody()));
CPPUNIT_ASSERT(contentBody);
CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData());
}
diff --git a/cpp/src/tests/MockChannel.h b/cpp/src/tests/MockChannel.h
index fb2de98d2a..b9a7c0a2a2 100644
--- a/cpp/src/tests/MockChannel.h
+++ b/cpp/src/tests/MockChannel.h
@@ -51,13 +51,10 @@ struct MockChannel : public qpid::framing::ChannelAdapter
bool isOpen() const { return true; }
- void handleHeader(
- boost::shared_ptr<qpid::framing::AMQHeaderBody> b) { send(b); }
- void handleContent(
- boost::shared_ptr<qpid::framing::AMQContentBody> b) { send(b); }
- void handleHeartbeat(
- boost::shared_ptr<qpid::framing::AMQHeartbeatBody> b) { send(b); }
- void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method) { send(method); };
+ void handleHeader(qpid::framing::AMQHeaderBody* b) { send(*b); }
+ void handleContent(qpid::framing::AMQContentBody* b) { send(*b); }
+ void handleHeartbeat(qpid::framing::AMQHeartbeatBody* b) { send(*b); }
+ void handleMethod(qpid::framing::AMQMethodBody* b) { send(*b); };
};
diff --git a/cpp/src/tests/ReferenceTest.cpp b/cpp/src/tests/ReferenceTest.cpp
index b3dd44bf7d..411462564a 100644
--- a/cpp/src/tests/ReferenceTest.cpp
+++ b/cpp/src/tests/ReferenceTest.cpp
@@ -67,17 +67,17 @@ class ReferenceTest : public CppUnit::TestCase
Reference::shared_ptr r1(registry.open("bar"));
- MessageTransferBody::shared_ptr t1(new MessageTransferBody(v));
+ MessageTransferBody t1(v);
// TODO aconway 2007-04-03: hack around lack of generated setters. Clean this up.
- const_cast<framing::Content&>(t1->getBody()) = framing::Content(REFERENCE,"bar");
- MessageMessage::shared_ptr m1(new MessageMessage(0, t1, r1));
+ const_cast<framing::Content&>(t1.getBody()) = framing::Content(REFERENCE,"bar");
+ MessageMessage::shared_ptr m1(new MessageMessage(0, &t1, r1));
- MessageTransferBody::shared_ptr t2(new MessageTransferBody(v));
- const_cast<framing::Content&>(t2->getBody()) = framing::Content(REFERENCE,"bar");
- MessageMessage::shared_ptr m2(new MessageMessage(0, t2, r1));
+ MessageTransferBody t2(v);
+ const_cast<framing::Content&>(t2.getBody()) = framing::Content(REFERENCE,"bar");
+ MessageMessage::shared_ptr m2(new MessageMessage(0, &t2, r1));
- MessageAppendBody::shared_ptr a1(new MessageAppendBody(v));
- MessageAppendBody::shared_ptr a2(new MessageAppendBody(v));
+ MessageAppendBody a1(v);
+ MessageAppendBody a2(v);
r1->addMessage(m1);
r1->addMessage(m2);
@@ -86,12 +86,6 @@ class ReferenceTest : public CppUnit::TestCase
r1->append(a2);
CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getAppends().size());
r1->close();
-
- CPPUNIT_ASSERT_EQUAL(m1->getReference()->getAppends()[0], a1);
- CPPUNIT_ASSERT_EQUAL(m1->getReference()->getAppends()[1], a2);
-
- CPPUNIT_ASSERT_EQUAL(m2->getReference()->getAppends()[0], a1);
- CPPUNIT_ASSERT_EQUAL(m2->getReference()->getAppends()[1], a2);
}
};
diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp
index a5d9eb69a5..24e8aac701 100644
--- a/cpp/src/tests/TxAckTest.cpp
+++ b/cpp/src/tests/TxAckTest.cpp
@@ -70,7 +70,8 @@ public:
for(int i = 0; i < 10; i++){
Message::shared_ptr msg(
new BasicMessage(0, "exchange", "routing_key", false, false));
- msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
+ AMQHeaderBody body(BASIC);
+ msg->setHeader(&body);
msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
messages.push_back(msg);
deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp
index 05c03754f9..3391be5128 100644
--- a/cpp/src/tests/TxPublishTest.cpp
+++ b/cpp/src/tests/TxPublishTest.cpp
@@ -72,7 +72,8 @@ public:
msg(new BasicMessage(0, "exchange", "routing_key", false, false)),
op(msg)
{
- msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
+ AMQHeaderBody body(BASIC);
+ msg->setHeader(&body);
msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
op.deliverTo(queue1);
op.deliverTo(queue2);
diff --git a/cpp/src/tests/Visitor.cpp b/cpp/src/tests/Visitor.cpp
deleted file mode 100644
index 0cb3cf15bb..0000000000
--- a/cpp/src/tests/Visitor.cpp
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/framing/Visitor.h"
-
-#define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*>
-#include <boost/test/auto_unit_test.hpp>
-#include <boost/tuple/tuple.hpp>
-
-using namespace std;
-using namespace qpid::framing;
-
-struct DummyA;
-struct DummyB;
-struct DummyC;
-
-QPID_VISITOR(DummyVisitor, (DummyA)(DummyB)(DummyC));
-
-struct DummyFrame : public VisitableRoot<DummyVisitor> {};
-
-struct DummyA : public Visitable<DummyA, DummyFrame> {};
-struct DummyB : public Visitable<DummyB, DummyFrame> {};
-struct DummyC : public Visitable<DummyC, DummyFrame> {};
-
-struct TestDummyVisitor : public DummyVisitor {
- boost::tuple<DummyA*, DummyB*, DummyC*> dummies;
- void visit(DummyA& a) { dummies.get<0>() = &a; }
- void visit(DummyB& b) { dummies.get<1>() = &b; }
- void visit(DummyC& c) { dummies.get<2>() = &c; }
-};
-
-BOOST_AUTO_TEST_CASE(Visitor_accept) {
- TestDummyVisitor v;
- DummyA a;
- DummyB b;
- DummyC c;
- a.accept(v);
- BOOST_CHECK_EQUAL(&a, v.dummies.get<0>());
- b.accept(v);
- BOOST_CHECK_EQUAL(&b, v.dummies.get<1>());
- c.accept(v);
- BOOST_CHECK_EQUAL(&c, v.dummies.get<2>());
-}