diff options
95 files changed, 934 insertions, 1185 deletions
diff --git a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java index 80ce266f39..114e3308d3 100644 --- a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java +++ b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java @@ -1164,7 +1164,7 @@ public class CppGenerator extends Generator { String[] fieldDomainPair = ordinalFieldMap.get(thisOrdinal); sb.append(indent + "" + setRef(fieldDomainPair[FIELD_CODE_TYPE]) + " get" + - Utils.firstUpper(fieldDomainPair[FIELD_NAME]) + "() { return " + + Utils.firstUpper(fieldDomainPair[FIELD_NAME]) + "() const { return " + fieldDomainPair[FIELD_NAME] + "; }" + cr); } return sb.toString(); @@ -1443,12 +1443,13 @@ public class CppGenerator extends Generator StringBuffer sb = new StringBuffer(); if (method.fieldMap.size() > 0) { - sb.append(indent + thisClass.name + Utils.firstUpper(method.name) + "Body(ProtocolVersion version," + cr); + sb.append(indent + thisClass.name + Utils.firstUpper(method.name) + "Body(ProtocolVersion," + cr); sb.append(generateFieldList(method.fieldMap, version, true, false, 8)); - sb.append(indent + tab + ") : " + baseClass(method, version) + "(version"); - sb.append(")"); - if (method.fieldMap.size() > 0) - sb.append(", \n" + generateFieldList(method.fieldMap, version, false, true, 8)); + sb.append(indent + tab + ")"); + if (method.fieldMap.size() > 0) { + sb.append(" : "); + sb.append(generateFieldList(method.fieldMap, version, false, true, 8)); + } sb.append(indent + "{ }\n"); } return sb.toString(); diff --git a/cpp/gentools/src/org/apache/qpid/gentools/Main.java b/cpp/gentools/src/org/apache/qpid/gentools/Main.java index 74e1ce1ab9..a25ddd103c 100644 --- a/cpp/gentools/src/org/apache/qpid/gentools/Main.java +++ b/cpp/gentools/src/org/apache/qpid/gentools/Main.java @@ -234,9 +234,7 @@ public class Main new File(tmplDir + Utils.fileSeparator + "AMQP_ServerOperations.h.tmpl"), new File(tmplDir + Utils.fileSeparator + "AMQP_ClientOperations.h.tmpl"), new File(tmplDir + Utils.fileSeparator + "AMQP_Constants.h.tmpl"), - new File(tmplDir + Utils.fileSeparator + "AMQP_MethodVersionMap.h.tmpl"), - new File(tmplDir + Utils.fileSeparator + "AMQP_MethodVersionMap.cpp.tmpl"), - new File(tmplDir + Utils.fileSeparator + "AMQP_HighestVersion.h.tmpl") + new File(tmplDir + Utils.fileSeparator + "AMQP_HighestVersion.h.tmpl") }; methodTemplateFiles = new File[] { diff --git a/cpp/gentools/templ.cpp/AMQP_MethodVersionMap.h.tmpl b/cpp/gentools/templ.cpp/AMQP_MethodVersionMap.h.tmpl deleted file mode 100644 index 83c3065a68..0000000000 --- a/cpp/gentools/templ.cpp/AMQP_MethodVersionMap.h.tmpl +++ /dev/null @@ -1,57 +0,0 @@ -&{AMQP_MethodVersionMap.h} -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/* - * This file is auto-generated by ${GENERATOR} - do not modify. - * Supported AMQP versions: -%{VLIST} * ${major}-${minor} - */ - -#ifndef qpid_framing_AMQP_MethodVersionMap__ -#define qpid_framing_AMQP_MethodVersionMap__ - -#include <map> -#include "qpid/framing/AMQMethodBody.h" - -%{MLIST} ${mc_method_body_include} - -namespace qpid -{ -namespace framing -{ - -template <class T> AMQMethodBody* createMethodBodyFn(u_int8_t major, u_int8_t minor) { return new T(ProtocolVersion(major, minor)); } -typedef AMQMethodBody* (*fnPtr)(u_int8_t, u_int8_t); - -class AMQP_MethodVersionMap: public std::map<u_int64_t, fnPtr> -{ -protected: - u_int64_t createMapKey(u_int16_t classId, u_int16_t methodId, u_int8_t major, u_int8_t minor); -public: - AMQP_MethodVersionMap(); - AMQMethodBody* createMethodBody(u_int16_t classId, u_int16_t methodId, u_int8_t major, u_int8_t minor); -}; - -} /* namespace framing */ -} /* namespace qpid */ - -#endif diff --git a/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl b/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl index 605b09ac94..5ffd2f2b4d 100644 --- a/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl +++ b/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl @@ -38,6 +38,9 @@ #include "qpid/framing/FieldTable.h" #include "qpid/framing/FramingContent.h" #include "qpid/framing/SequenceNumberSet.h" +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/MethodBodyConstVisitor.h" + namespace qpid { @@ -45,7 +48,7 @@ namespace framing { ${version_namespace_start} -class ${CLASS}${METHOD}Body : public ${mb_base_class} +class ${CLASS}${METHOD}Body : public AMQMethodBody { // Method field declarations @@ -57,13 +60,11 @@ public: static const ClassId CLASS_ID= ${CLASS_ID_INIT}; static const MethodId METHOD_ID = ${METHOD_ID_INIT}; - typedef boost::shared_ptr<${CLASS}${METHOD}Body> shared_ptr; - - // Constructors and destructors + // Constructors and destructors ${mb_constructor_with_initializers} - ${CLASS}${METHOD}Body(ProtocolVersion version=ProtocolVersion() ): ${mb_base_class}(version) {} + ${CLASS}${METHOD}Body(ProtocolVersion=ProtocolVersion()) {} virtual ~${CLASS}${METHOD}Body() {} // Attribute get methods @@ -71,10 +72,11 @@ ${mb_constructor_with_initializers} %{FLIST} ${mb_field_get_method} // Helper methods - - inline void print(std::ostream& out) const + using AMQMethodBody::accept; + void accept(MethodBodyConstVisitor& v) const { v.visit(*this); } + + void print(std::ostream& out) const { - printPrefix(out); out << "${CLASS}${METHOD}: "; %{FLIST} ${mb_field_print} } @@ -84,17 +86,17 @@ ${mb_constructor_with_initializers} u_int32_t size() const { - u_int32_t sz = baseSize(); + u_int32_t sz = 0; %{FLIST} ${mb_body_size} return sz; } - void encodeContent(Buffer& ${mb_buffer_param}) const + void encode(Buffer& ${mb_buffer_param}) const { %{FLIST} ${mb_encode} } - inline void decodeContent(Buffer& ${mb_buffer_param}) + inline void decode(Buffer& ${mb_buffer_param}, uint32_t=0) { %{FLIST} ${mb_decode} } diff --git a/cpp/rubygen/amqpgen.rb b/cpp/rubygen/amqpgen.rb index a144825f08..333c7dd654 100755 --- a/cpp/rubygen/amqpgen.rb +++ b/cpp/rubygen/amqpgen.rb @@ -115,11 +115,7 @@ class AmqpClass < AmqpElement # chassis should be "client" or "server" def amqp_methods_on(chassis) @cache_amqp_methods_on ||= { } - - els = elements.collect("method/chassis[@name='#{chassis}']/..") { |m| - AmqpMethod.new(m,self) - }.sort_by_name - @cache_amqp_methods_on[chassis] ||= els + @cache_amqp_methods_on[chassis] ||= elements.collect("method/chassis[@name='#{chassis}']/..") { |m| AmqpMethod.new(m,self) }.sort_by_name end end @@ -153,7 +149,8 @@ class AmqpRoot < AmqpElement end def amqp_classes() - @cache_amqp_classes ||= elements.collect("class") { |c| AmqpClass.new(c,self) }.sort_by_name + @cache_amqp_classes ||= elements.collect("class") { |c| + AmqpClass.new(c,self) }.sort_by_name end # Return all methods on all classes. diff --git a/cpp/rubygen/cppgen.rb b/cpp/rubygen/cppgen.rb index 724634e514..a3314c7e11 100755 --- a/cpp/rubygen/cppgen.rb +++ b/cpp/rubygen/cppgen.rb @@ -63,6 +63,7 @@ end class AmqpField def cppname() @cache_cppname ||= name.lcaps.cppsafe; end def cpptype() @cache_cpptype ||= amqp_root.param_type(field_type); end + def cppret_type() @cache_cpptype ||= amqp_root.return_type(field_type); end def type_name () @type_name ||= cpptype+" "+cppname; end end @@ -159,8 +160,12 @@ class CppGen < Generator def struct_class(type, name, bases, &block) genl gen "#{type} #{name}" - gen ": #{bases.join(', ')}" unless bases.empty? - scope(" {","};") { yield } + if (!bases.empty?) + genl ":" + indent { gen "#{bases.join(",\n")}" } + end + genl + scope("{","};") { yield } end def struct(name, *bases, &block) struct_class("struct", name, bases, &block); end diff --git a/cpp/rubygen/templates/MethodBodyConstVisitor.rb b/cpp/rubygen/templates/MethodBodyConstVisitor.rb new file mode 100755 index 0000000000..6fd7fe8ead --- /dev/null +++ b/cpp/rubygen/templates/MethodBodyConstVisitor.rb @@ -0,0 +1,27 @@ +#!/usr/bin/env ruby +$: << ".." # Include .. in load path +require 'cppgen' + +class MethodBodyConstVisitorGen < CppGen + + def initialize(outdir, amqp) + super(outdir, amqp) + @namespace="qpid::framing" + @classname="MethodBodyConstVisitor" + @filename="qpid/framing/MethodBodyConstVisitor" + end + + def generate() + h_file("#{@filename}") { + namespace(@namespace) { + @amqp.amqp_methods.each { |m| genl "class #{m.body_name};" } + cpp_class("MethodBodyConstVisitor") { + genl "public:" + genl "virtual ~MethodBodyConstVisitor() {}" + @amqp.amqp_methods.each { |m| genl "virtual void visit(const #{m.body_name}&) = 0;" } + }}} + end +end + +MethodBodyConstVisitorGen.new(Outdir, Amqp).generate(); + diff --git a/cpp/rubygen/templates/MethodHolder.rb b/cpp/rubygen/templates/MethodHolder.rb index 65fa824fd4..39a570c982 100755 --- a/cpp/rubygen/templates/MethodHolder.rb +++ b/cpp/rubygen/templates/MethodHolder.rb @@ -40,23 +40,42 @@ EOS def gen_construct cpp_file(@filename+"_construct") { include @filename + include "qpid/framing/MethodBodyConstVisitor.h" @amqp.amqp_methods.each { |m| include "qpid/framing/#{m.body_name}" } genl - namespace(@namespace) { - scope("void #{@classname}::construct(const Id& newId) {") { - scope("switch (newId.first) {") { + include "qpid/Exception.h" + genl + namespace(@namespace) { + # construct function + scope("void #{@classname}::construct(ClassId c, MethodId m) {") { + scope("switch (c) {") { @amqp.amqp_classes.each { |c| - scope("case #{c.index}: switch(newId.second) {") { + scope("case #{c.index}: switch(m) {") { c.amqp_methods.each { |m| genl "case #{m.index}: blob.construct(in_place<#{m.body_name}>()); break;" - }} + } + genl "default: throw Exception(QPID_MSG(\"Invalid method id \" << m << \" for class #{c.name} \"));" + } genl "break;" - }} - genl "id=newId;"; - }}} + } + genl "default: throw Exception(QPID_MSG(\"Invalid class id \" << c));" + } + } + # CopyVisitor + struct("#{@classname}::CopyVisitor", "public MethodBodyConstVisitor") { genl "MethodHolder& holder;" + genl "CopyVisitor(MethodHolder& h) : holder(h) {}" + @amqp.amqp_methods.each { |m| + genl "void visit(const #{m.body_name}& x) { holder.blob=x; }" + } + } + genl + # operator= + scope("#{@classname}& MethodHolder::operator=(const AMQMethodBody& m) {") { + genl "CopyVisitor cv(*this); m.accept(cv); return *this;" + } + }} end - def generate gen_max_size gen_construct diff --git a/cpp/rubygen/templates/Proxy.rb b/cpp/rubygen/templates/Proxy.rb index f36d6bcd99..a6c0763a8a 100755 --- a/cpp/rubygen/templates/Proxy.rb +++ b/cpp/rubygen/templates/Proxy.rb @@ -38,7 +38,7 @@ EOS genl "void #{@classname}::#{cname}::#{m.cppname}(#{m.signature.join(", ")})" scope { params=(["channel.getVersion()"]+m.param_names).join(", ") - genl "channel.send(make_shared_ptr(new #{m.body_name}(#{params})));" + genl "channel.send(#{m.body_name}(#{params}));" }} end diff --git a/cpp/rubygen/templates/Session.rb b/cpp/rubygen/templates/Session.rb index eaa4347974..4265731d32 100644 --- a/cpp/rubygen/templates/Session.rb +++ b/cpp/rubygen/templates/Session.rb @@ -37,7 +37,7 @@ class SessionGen < CppGen indent { gen params.join(",\n") } gen "){\n\n" indent (2) { - gen "return impl->send(AMQMethodBody::shared_ptr(new #{m.body_name}(" + gen "return impl->send(#{m.body_name}(" params = ["version"] + m.param_names gen params.join(", ") other_params=[] @@ -49,7 +49,7 @@ class SessionGen < CppGen else other_params << "true" end - gen ")), #{other_params.join(", ")});\n" + gen "), #{other_params.join(", ")});\n" } gen "}\n\n" end diff --git a/cpp/rubygen/templates/all_method_bodies.rb b/cpp/rubygen/templates/all_method_bodies.rb new file mode 100755 index 0000000000..38fbc31593 --- /dev/null +++ b/cpp/rubygen/templates/all_method_bodies.rb @@ -0,0 +1,21 @@ +#!/usr/bin/env ruby +$: << ".." # Include .. in load path +require 'cppgen' + +class AllMethodBodiesGen < CppGen + + def initialize(outdir, amqp) + super(outdir, amqp) + @namespace="qpid::framing" + @filename="qpid/framing/all_method_bodies" + end + + def generate() + h_file(@filename) { + @amqp.amqp_methods.each { |m| include "qpid/framing/"+m.body_name } + } + end +end + +AllMethodBodiesGen.new(Outdir, Amqp).generate(); + diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 0f91faba63..977db7ea36 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -46,8 +46,10 @@ rgen_tdir=$(rgen_dir)/templates rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)/generate rgen_templates=$(rgen_tdir)/MethodHolder.rb \ + $(rgen_tdir)/MethodBodyConstVisitor.rb \ $(rgen_tdir)/frame_body_lists.rb \ $(rgen_tdir)/Session.rb \ + $(rgen_tdir)/all_method_bodies.rb \ $(rgen_tdir)/Proxy.rb rgen_generator=$(rgen_dir)/generate \ @@ -135,11 +137,11 @@ libqpidcommon_la_LIBADD = \ libqpidcommon_la_SOURCES = \ $(platform_src) \ qpid/framing/AMQBody.cpp \ + qpid/framing/AMQMethodBody.cpp \ qpid/framing/AMQContentBody.cpp \ qpid/framing/AMQFrame.cpp \ qpid/framing/AMQHeaderBody.cpp \ qpid/framing/AMQHeartbeatBody.cpp \ - qpid/framing/AMQMethodBody.cpp \ qpid/framing/FrameHandler.h \ qpid/framing/BasicHeaderProperties.cpp \ qpid/framing/BodyHandler.cpp \ @@ -162,10 +164,12 @@ libqpidcommon_la_SOURCES = \ qpid/framing/Blob.h \ qpid/framing/AMQP_ClientProxy.cpp \ qpid/framing/AMQP_ServerProxy.cpp \ + qpid/framing/variant.h \ gen/qpid/framing/AMQP_HighestVersion.h \ - gen/qpid/framing/AMQP_MethodVersionMap.cpp \ + qpid/framing/Blob.cpp \ qpid/framing/MethodHolder.h qpid/framing/MethodHolder.cpp \ qpid/framing/MethodHolder_construct.cpp \ + qpid/framing/MethodHolderMaxSize.h \ qpid/Exception.cpp \ qpid/Plugin.h \ qpid/Plugin.cpp \ diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index 6e577ab354..e135e960c4 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -307,19 +307,19 @@ void Channel::handlePublish(Message* _message) messageBuilder.initialise(message); } -void Channel::handleHeader(AMQHeaderBody::shared_ptr header) +void Channel::handleHeader(AMQHeaderBody* header) { messageBuilder.setHeader(header); //at this point, decide based on the size of the message whether we want //to stage it by saving content directly to disk as it arrives } -void Channel::handleContent(AMQContentBody::shared_ptr content) +void Channel::handleContent(AMQContentBody* content) { messageBuilder.addContent(content); } -void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) { +void Channel::handleHeartbeat(AMQHeartbeatBody*) { // TODO aconway 2007-01-17: Implement heartbeating. } diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index 1f4f6f35e7..021110cf8c 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -170,9 +170,9 @@ class Channel : public CompletionHandler void flow(bool active); void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag); void handlePublish(Message* msg); - void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>); - void handleContent(boost::shared_ptr<framing::AMQContentBody>); - void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>); + void handleHeader(framing::AMQHeaderBody*); + void handleContent(framing::AMQContentBody*); + void handleHeartbeat(framing::AMQHeartbeatBody*); void handleInlineTransfer(Message::shared_ptr msg); }; diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp index 244bee4a92..bddd5802cf 100644 --- a/cpp/src/qpid/broker/BrokerMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessage.cpp @@ -72,22 +72,25 @@ BasicMessage::BasicMessage( const string& _exchange, const string& _routingKey, bool _mandatory, bool _immediate ) : - Message(_publisher, _exchange, _routingKey, _mandatory, - _immediate, framing::AMQMethodBody::shared_ptr(new BasicPublishBody(ProtocolVersion(0,9)))), + Message(_publisher, _exchange, _routingKey, _mandatory, _immediate), size(0) {} // For tests only. -BasicMessage::BasicMessage() : size(0) -{} +BasicMessage::BasicMessage() : isHeaderSet(false), size(0) {} BasicMessage::~BasicMessage(){} -void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){ - this->header = _header; +void BasicMessage::setHeader(AMQHeaderBody* _header){ + if (_header) { + this->header = *_header; + isHeaderSet = true; + } + else + isHeaderSet = false; } -void BasicMessage::addContent(AMQContentBody::shared_ptr data){ +void BasicMessage::addContent(AMQContentBody* data){ if (!content.get()) { content = std::auto_ptr<Content>(new InMemoryContent()); } @@ -96,7 +99,7 @@ void BasicMessage::addContent(AMQContentBody::shared_ptr data){ } bool BasicMessage::isComplete(){ - return header.get() && (header->getContentSize() == contentSize()); + return isHeaderSet && (header.getContentSize() == contentSize()); } DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue) @@ -113,10 +116,9 @@ void BasicMessage::deliver(ChannelAdapter& channel, const string& consumerTag, DeliveryId id, uint32_t framesize) { - channel.send(make_shared_ptr( - new BasicDeliverBody( + channel.send(BasicDeliverBody( channel.getVersion(), consumerTag, id.getValue(), - getRedelivered(), getExchange(), getRoutingKey()))); + getRedelivered(), getExchange(), getRoutingKey())); sendContent(channel, framesize); } @@ -125,11 +127,11 @@ void BasicMessage::sendGetOk(ChannelAdapter& channel, DeliveryId id, uint32_t framesize) { - channel.send(make_shared_ptr( - new BasicGetOkBody( + channel.send( + BasicGetOkBody( channel.getVersion(), id.getValue(), getRedelivered(), getExchange(), - getRoutingKey(), messageCount))); + getRoutingKey(), messageCount)); sendContent(channel, framesize); } @@ -156,12 +158,11 @@ void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize) channel.send(header); Mutex::ScopedLock locker(contentLock); if (content.get()) - content->send(channel, framesize); + content->send(channel, framesize); } BasicHeaderProperties* BasicMessage::getHeaderProperties(){ - return boost::polymorphic_downcast<BasicHeaderProperties*>( - header->getProperties()); + return isHeaderSet ? dynamic_cast<BasicHeaderProperties*>(header.getProperties()) : 0; } const FieldTable& BasicMessage::getApplicationHeaders(){ @@ -170,7 +171,7 @@ const FieldTable& BasicMessage::getApplicationHeaders(){ bool BasicMessage::isPersistent() { - if(!header) return false; + if(!isHeaderSet) return false; BasicHeaderProperties* props = getHeaderProperties(); return props && props->getDeliveryMode() == PERSISTENT; } @@ -194,9 +195,9 @@ void BasicMessage::decodeHeader(Buffer& buffer) setRouting(exchange, routingKey); uint32_t headerSize = buffer.getLong(); - AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody()); - headerBody->decode(buffer, headerSize); - setHeader(headerBody); + AMQHeaderBody headerBody; + headerBody.decode(buffer, headerSize); + setHeader(&headerBody); } void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize) @@ -214,9 +215,9 @@ void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize) uint64_t total = 0; while (total < expectedContentSize()) { uint64_t remaining = expected - total; - AMQContentBody::shared_ptr contentBody(new AMQContentBody()); - contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize); - addContent(contentBody); + AMQContentBody contentBody; + contentBody.decode(buffer, remaining < chunkSize ? remaining : chunkSize); + addContent(&contentBody); total += chunkSize; } } @@ -232,8 +233,8 @@ void BasicMessage::encodeHeader(Buffer& buffer) const RecoveryManagerImpl::encodeMessageType(*this, buffer); buffer.putShortString(getExchange()); buffer.putShortString(getRoutingKey()); - buffer.putLong(header->size()); - header->encode(buffer); + buffer.putLong(header.size()); + header.encode(buffer); } void BasicMessage::encodeContent(Buffer& buffer) const @@ -258,12 +259,12 @@ uint32_t BasicMessage::encodedHeaderSize() const return RecoveryManagerImpl::encodedMessageTypeSize() +getExchange().size() + 1 + getRoutingKey().size() + 1 - + header->size() + 4;//4 extra bytes for size + + header.size() + 4;//4 extra bytes for size } uint64_t BasicMessage::expectedContentSize() { - return header.get() ? header->getContentSize() : 0; + return isHeaderSet ? header.getContentSize() : 0; } void BasicMessage::releaseContent(MessageStore* store) @@ -294,5 +295,5 @@ void BasicMessage::setContent(std::auto_ptr<Content>& _content) uint32_t BasicMessage::getRequiredCredit() const { - return header->size() + contentSize(); + return header.size() + contentSize(); } diff --git a/cpp/src/qpid/broker/BrokerMessage.h b/cpp/src/qpid/broker/BrokerMessage.h index af8e4e62e9..0f46ff2e83 100644 --- a/cpp/src/qpid/broker/BrokerMessage.h +++ b/cpp/src/qpid/broker/BrokerMessage.h @@ -27,6 +27,7 @@ #include "BrokerMessageBase.h" #include "qpid/framing/BasicHeaderProperties.h" +#include "qpid/framing/AMQHeaderBody.h" #include "ConnectionToken.h" #include "Content.h" #include "qpid/sys/Mutex.h" @@ -51,7 +52,8 @@ using framing::string; * request. */ class BasicMessage : public Message { - boost::shared_ptr<framing::AMQHeaderBody> header; + framing::AMQHeaderBody header; + bool isHeaderSet; std::auto_ptr<Content> content; mutable sys::Mutex contentLock; uint64_t size; @@ -66,8 +68,8 @@ class BasicMessage : public Message { bool mandatory, bool immediate); BasicMessage(); ~BasicMessage(); - void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header); - void addContent(framing::AMQContentBody::shared_ptr data); + void setHeader(framing::AMQHeaderBody* header); + void addContent(framing::AMQContentBody* data); bool isComplete(); static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue); diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h index 94035905ce..bac5dc6386 100644 --- a/cpp/src/qpid/broker/BrokerMessageBase.h +++ b/cpp/src/qpid/broker/BrokerMessageBase.h @@ -54,22 +54,18 @@ class MessageStore; class Message : public PersistableMessage{ public: typedef boost::shared_ptr<Message> shared_ptr; - typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr; - Message(const ConnectionToken* publisher_, const std::string& _exchange, const std::string& _routingKey, - bool _mandatory, bool _immediate, - AMQMethodBodyPtr respondTo_) : + bool _mandatory, bool _immediate) : publisher(publisher_), exchange(_exchange), routingKey(_routingKey), mandatory(_mandatory), immediate(_immediate), persistenceId(0), - redelivered(false), - respondTo(respondTo_) + redelivered(false) {} Message() : @@ -145,8 +141,8 @@ class Message : public PersistableMessage{ * it uses). */ virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; - virtual void setHeader(boost::shared_ptr<framing::AMQHeaderBody>) {}; - virtual void addContent(boost::shared_ptr<framing::AMQContentBody>) {}; + virtual void setHeader(framing::AMQHeaderBody*) {}; + virtual void addContent(framing::AMQContentBody*) {}; /** * Releases the in-memory content data held by this * message. Must pass in a store from which the data can @@ -164,7 +160,6 @@ class Message : public PersistableMessage{ const bool immediate; mutable uint64_t persistenceId; bool redelivered; - AMQMethodBodyPtr respondTo; }; }} diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp index 0da5f3d8f5..1184885aeb 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp @@ -48,35 +48,34 @@ struct MessageDeliveryToken : public DeliveryToken }; MessageMessage::MessageMessage( - ConnectionToken* publisher, TransferPtr transfer_ + ConnectionToken* publisher, const MessageTransferBody* transfer_ ) : Message(publisher, transfer_->getDestination(), transfer_->getRoutingKey(), transfer_->getRejectUnroutable(), - transfer_->getImmediate(), - transfer_), - transfer(transfer_) + transfer_->getImmediate()), + transfer(*transfer_) { - assert(transfer->getBody().isInline()); + assert(transfer.getBody().isInline()); } MessageMessage::MessageMessage( - ConnectionToken* publisher, TransferPtr transfer_, ReferencePtr reference_ + ConnectionToken* publisher, const MessageTransferBody* transfer_, + ReferencePtr reference_ ) : Message(publisher, transfer_->getDestination(), transfer_->getRoutingKey(), transfer_->getRejectUnroutable(), - transfer_->getImmediate(), - transfer_), - transfer(transfer_), + transfer_->getImmediate()), + transfer(*transfer_), reference(reference_) { - assert(!transfer->getBody().isInline()); + assert(!transfer.getBody().isInline()); assert(reference_); } /** * Currently used by message store impls to recover messages */ -MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {} +MessageMessage::MessageMessage() {} // TODO: astitcher 1-Mar-2007: This code desperately needs better factoring void MessageMessage::transferMessage( @@ -84,27 +83,28 @@ void MessageMessage::transferMessage( const std::string& consumerTag, uint32_t framesize) { - const framing::Content& body = transfer->getBody(); + const framing::Content& body = transfer.getBody(); // Send any reference data ReferencePtr ref= getReference(); if (ref){ // Open - channel.send(make_shared_ptr(new MessageOpenBody(channel.getVersion(), ref->getId()))); + channel.send(MessageOpenBody(channel.getVersion(), ref->getId())); // Appends for(Reference::Appends::const_iterator a = ref->getAppends().begin(); a != ref->getAppends().end(); ++a) { - uint32_t sizeleft = (*a)->size(); - const string& content = (*a)->getBytes(); + uint32_t sizeleft = a->size(); + const string& content = a->getBytes(); // Calculate overhead bytes // Assume that the overhead is constant as the reference name doesn't change uint32_t overhead = sizeleft - content.size(); string::size_type contentStart = 0; while (sizeleft) { string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead; - channel.send(make_shared_ptr(new MessageAppendBody(channel.getVersion(), ref->getId(), - string(content, contentStart, contentSize)))); + + channel.send(MessageAppendBody(channel.getVersion(), ref->getId(), + string(content, contentStart, contentSize))); sizeleft -= contentSize; contentStart += contentSize; } @@ -112,76 +112,73 @@ void MessageMessage::transferMessage( } // The transfer - if ( transfer->size()<=framesize ) { - channel.send(make_shared_ptr( - new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getRejectUnroutable(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - body))); + if ( transfer.size()<=framesize ) { + channel.send(MessageTransferBody(ProtocolVersion(), + transfer.getTicket(), + consumerTag, + getRedelivered(), + transfer.getRejectUnroutable(), + transfer.getImmediate(), + transfer.getTtl(), + transfer.getPriority(), + transfer.getTimestamp(), + transfer.getDeliveryMode(), + transfer.getExpiration(), + getExchange(), + getRoutingKey(), + transfer.getMessageId(), + transfer.getCorrelationId(), + transfer.getReplyTo(), + transfer.getContentType(), + transfer.getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ + transfer.getUserId(), + transfer.getAppId(), + transfer.getTransactionId(), + transfer.getSecurityToken(), + transfer.getApplicationHeaders(), + body)); } else { // Thing to do here is to construct a simple reference message then deliver that instead // fragmentation will be taken care of in the delivery if necessary; string content = body.getValue(); string refname = "dummy"; - TransferPtr newTransfer( - new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getRejectUnroutable(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - framing::Content(REFERENCE, refname))); + MessageTransferBody newTransfer(channel.getVersion(), + transfer.getTicket(), + consumerTag, + getRedelivered(), + transfer.getRejectUnroutable(), + transfer.getImmediate(), + transfer.getTtl(), + transfer.getPriority(), + transfer.getTimestamp(), + transfer.getDeliveryMode(), + transfer.getExpiration(), + getExchange(), + getRoutingKey(), + transfer.getMessageId(), + transfer.getCorrelationId(), + transfer.getReplyTo(), + transfer.getContentType(), + transfer.getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ + transfer.getUserId(), + transfer.getAppId(), + transfer.getTransactionId(), + transfer.getSecurityToken(), + transfer.getApplicationHeaders(), + framing::Content(REFERENCE, refname)); ReferencePtr newRef(new Reference(refname)); - Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content)); - newRef->append(newAppend); - MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), newTransfer, newRef); + newRef->append(MessageAppendBody(channel.getVersion(), refname, content)); + MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), &newTransfer, newRef); newMsg.transferMessage(channel, consumerTag, framesize); return; } // Close any reference data if (ref) - channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId()))); + channel.send(MessageCloseBody(ProtocolVersion(), ref->getId())); } @@ -202,8 +199,8 @@ bool MessageMessage::isComplete() uint64_t MessageMessage::contentSize() const { - if (transfer->getBody().isInline()) - return transfer->getBody().getValue().size(); + if (transfer.getBody().isInline()) + return transfer.getBody().getValue().size(); else { assert(getReference()); return getReference()->getSize(); @@ -217,11 +214,11 @@ qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() const FieldTable& MessageMessage::getApplicationHeaders() { - return transfer->getApplicationHeaders(); + return transfer.getApplicationHeaders(); } bool MessageMessage::isPersistent() { - return transfer->getDeliveryMode() == PERSISTENT; + return transfer.getDeliveryMode() == PERSISTENT; } uint32_t MessageMessage::encodedSize() const @@ -231,7 +228,7 @@ uint32_t MessageMessage::encodedSize() const uint32_t MessageMessage::encodedHeaderSize() const { - return RecoveryManagerImpl::encodedMessageTypeSize() + transfer->size() - transfer->baseSize(); + return RecoveryManagerImpl::encodedMessageTypeSize() + transfer.size(); } uint32_t MessageMessage::encodedContentSize() const @@ -252,18 +249,17 @@ void MessageMessage::encode(Buffer& buffer) const void MessageMessage::encodeHeader(Buffer& buffer) const { RecoveryManagerImpl::encodeMessageType(*this, buffer); - if (transfer->getBody().isInline()) { - transfer->encodeContent(buffer); + if (transfer.getBody().isInline()) { + transfer.encode(buffer); } else { assert(getReference()); string data; const Reference::Appends& appends = getReference()->getAppends(); for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) { - data += (*a)->getBytes(); + data += a->getBytes(); } framing::Content body(INLINE, data); - std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body)); - copy->encodeContent(buffer); + copyTransfer(ProtocolVersion(), transfer.getDestination(), body).encode(buffer); } } @@ -271,8 +267,7 @@ void MessageMessage::decodeHeader(Buffer& buffer) { //don't care about the type here, but want encode/decode to be symmetric RecoveryManagerImpl::decodeMessageType(buffer); - - transfer->decodeContent(buffer); + transfer.decode(buffer); } void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) @@ -280,37 +275,36 @@ void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) } -MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version, - const string& destination, - const framing::Content& body) const +MessageTransferBody MessageMessage::copyTransfer(const ProtocolVersion& version, + const string& destination, + const framing::Content& body) const { - return new MessageTransferBody(version, - transfer->getTicket(), - destination, - getRedelivered(), - transfer->getRejectUnroutable(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - body); - + return MessageTransferBody(version, + transfer.getTicket(), + destination, + getRedelivered(), + transfer.getRejectUnroutable(), + transfer.getImmediate(), + transfer.getTtl(), + transfer.getPriority(), + transfer.getTimestamp(), + transfer.getDeliveryMode(), + transfer.getExpiration(), + getExchange(), + getRoutingKey(), + transfer.getMessageId(), + transfer.getCorrelationId(), + transfer.getReplyTo(), + transfer.getContentType(), + transfer.getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ + transfer.getUserId(), + transfer.getAppId(), + transfer.getTransactionId(), + transfer.getSecurityToken(), + transfer.getApplicationHeaders(), + body); } MessageMessage::ReferencePtr MessageMessage::getReference() const { @@ -321,7 +315,7 @@ uint32_t MessageMessage::getRequiredCredit() const { //TODO: change when encoding changes. Should be the payload of any //header & body frames. - return transfer->size(); + return transfer.size(); } diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.h b/cpp/src/qpid/broker/BrokerMessageMessage.h index 6b1bd9ab5d..6bfd0e045d 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.h +++ b/cpp/src/qpid/broker/BrokerMessageMessage.h @@ -29,10 +29,6 @@ namespace qpid { -namespace framing { -class MessageTransferBody; -} - namespace broker { class ConnectionToken; class Reference; @@ -40,16 +36,15 @@ class Reference; class MessageMessage: public Message{ public: typedef boost::shared_ptr<MessageMessage> shared_ptr; - typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr; typedef boost::shared_ptr<Reference> ReferencePtr; - MessageMessage(ConnectionToken* publisher, TransferPtr transfer); - MessageMessage(ConnectionToken* publisher, TransferPtr transfer, ReferencePtr reference); + MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer); + MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer, ReferencePtr reference); MessageMessage(); // Default destructor okay - TransferPtr getTransfer() const { return transfer; } + framing::MessageTransferBody* getTransfer() const { return const_cast<framing::MessageTransferBody*>(&transfer); } ReferencePtr getReference() const ; void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize); @@ -80,12 +75,12 @@ class MessageMessage: public Message{ const std::string& consumerTag, uint32_t framesize); - framing::MessageTransferBody* copyTransfer( + framing::MessageTransferBody copyTransfer( const framing::ProtocolVersion& version, const std::string& destination, const framing::Content& body) const; - const TransferPtr transfer; + framing::MessageTransferBody transfer; const boost::shared_ptr<Reference> reference; }; diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp index a67a5557c6..175f57df7d 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.cpp +++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp @@ -1,3 +1,4 @@ + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -42,8 +43,7 @@ void ConnectionAdapter::close(ReplyCode code, const string& text, ClassId classI handler->client.close(code, text, classId, methodId); } -void ConnectionAdapter::handleMethod( - boost::shared_ptr<qpid::framing::AMQMethodBody> method) +void ConnectionAdapter::handleMethod(framing::AMQMethodBody* method) { try{ method->invoke(*this); diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h index 1ce850a659..9aa3d130e8 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.h +++ b/cpp/src/qpid/broker/ConnectionAdapter.h @@ -47,12 +47,12 @@ public: void handle(framing::AMQFrame& frame); //ChannelAdapter virtual methods: - void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method); + void handleMethod(framing::AMQMethodBody* method); bool isOpen() const { return true; } //channel 0 is always open //never needed: - void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>) {} - void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>) {} - void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>) {} + void handleHeader(framing::AMQHeaderBody*) {} + void handleContent(framing::AMQContentBody*) {} + void handleHeartbeat(framing::AMQHeartbeatBody*) {} //AMQP_ServerOperations: ConnectionHandler* getConnectionHandler(); diff --git a/cpp/src/qpid/broker/Content.h b/cpp/src/qpid/broker/Content.h index df9e7a1132..97dce0d3f7 100644 --- a/cpp/src/qpid/broker/Content.h +++ b/cpp/src/qpid/broker/Content.h @@ -42,7 +42,7 @@ class Content{ virtual ~Content(){} /** Add a block of data to the content */ - virtual void add(framing::AMQContentBody::shared_ptr data) = 0; + virtual void add(framing::AMQContentBody* data) = 0; /** Total size of content in bytes */ virtual uint32_t size() = 0; diff --git a/cpp/src/qpid/broker/InMemoryContent.cpp b/cpp/src/qpid/broker/InMemoryContent.cpp index a6ce820f7e..d69dcfafe7 100644 --- a/cpp/src/qpid/broker/InMemoryContent.cpp +++ b/cpp/src/qpid/broker/InMemoryContent.cpp @@ -26,16 +26,16 @@ using namespace qpid::broker; using namespace qpid::framing; using boost::static_pointer_cast; -void InMemoryContent::add(AMQContentBody::shared_ptr data) +void InMemoryContent::add(AMQContentBody* data) { - content.push_back(data); + content.push_back(*data); } uint32_t InMemoryContent::size() { int sum(0); for (content_iterator i = content.begin(); i != content.end(); i++) { - sum += (*i)->size(); + sum += i->size(); } return sum; } @@ -43,22 +43,20 @@ uint32_t InMemoryContent::size() void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize) { for (content_iterator i = content.begin(); i != content.end(); i++) { - if ((*i)->size() > framesize) { + if (i->size() > framesize) { uint32_t offset = 0; - for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) { - string data = (*i)->getData().substr(offset, framesize); - channel.send(make_shared_ptr(new AMQContentBody(data))); + for (int chunk = i->size() / framesize; chunk > 0; chunk--) { + string data = i->getData().substr(offset, framesize); + channel.send(AMQContentBody(data)); offset += framesize; } - uint32_t remainder = (*i)->size() % framesize; + uint32_t remainder = i->size() % framesize; if (remainder) { - string data = (*i)->getData().substr(offset, remainder); - channel.send(make_shared_ptr(new AMQContentBody(data))); + string data = i->getData().substr(offset, remainder); + channel.send(AMQContentBody(data)); } } else { - AMQBody::shared_ptr contentBody = - static_pointer_cast<AMQBody, AMQContentBody>(*i); - channel.send(contentBody); + channel.send(*i); } } } @@ -66,7 +64,7 @@ void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize) void InMemoryContent::encode(Buffer& buffer) { for (content_iterator i = content.begin(); i != content.end(); i++) { - (*i)->encode(buffer); + i->encode(buffer); } } diff --git a/cpp/src/qpid/broker/InMemoryContent.h b/cpp/src/qpid/broker/InMemoryContent.h index 425f0e4e26..a6fca7ca98 100644 --- a/cpp/src/qpid/broker/InMemoryContent.h +++ b/cpp/src/qpid/broker/InMemoryContent.h @@ -22,21 +22,22 @@ #define _InMemoryContent_ #include "Content.h" +#include "qpid/framing/AMQContentBody.h" #include <vector> namespace qpid { namespace broker { class InMemoryContent : public Content{ - typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list; + typedef std::vector<framing::AMQContentBody> content_list; typedef content_list::iterator content_iterator; content_list content; public: - void add(qpid::framing::AMQContentBody::shared_ptr data); + void add(framing::AMQContentBody* data); uint32_t size(); void send(framing::ChannelAdapter&, uint32_t framesize); - void encode(qpid::framing::Buffer& buffer); + void encode(framing::Buffer& buffer); }; } } diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp index 80d06ebf2b..b8b5b37f45 100644 --- a/cpp/src/qpid/broker/LazyLoadedContent.cpp +++ b/cpp/src/qpid/broker/LazyLoadedContent.cpp @@ -33,7 +33,7 @@ LazyLoadedContent::~LazyLoadedContent() LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, uint64_t _expectedSize) : store(_store), msg(_msg), expectedSize(_expectedSize) {} -void LazyLoadedContent::add(AMQContentBody::shared_ptr data) +void LazyLoadedContent::add(AMQContentBody* data) { store->appendContent(*msg, data->getData()); } @@ -52,12 +52,12 @@ void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize) string data; store->loadContent(*msg, data, offset, remaining > framesize ? framesize : remaining); - channel.send(make_shared_ptr(new AMQContentBody(data))); + channel.send(AMQContentBody(data)); } } else { string data; store->loadContent(*msg, data, 0, expectedSize); - channel.send(make_shared_ptr(new AMQContentBody(data))); + channel.send(AMQContentBody(data)); } } diff --git a/cpp/src/qpid/broker/LazyLoadedContent.h b/cpp/src/qpid/broker/LazyLoadedContent.h index 9dff6158a5..79a33ed7a9 100644 --- a/cpp/src/qpid/broker/LazyLoadedContent.h +++ b/cpp/src/qpid/broker/LazyLoadedContent.h @@ -36,7 +36,7 @@ namespace qpid { MessageStore* const store, Message* const msg, uint64_t expectedSize); ~LazyLoadedContent(); - void add(qpid::framing::AMQContentBody::shared_ptr data); + void add(qpid::framing::AMQContentBody* data); uint32_t size(); void send( framing::ChannelAdapter&, diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index 6c33b38e72..f19927b708 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -50,7 +50,7 @@ void MessageBuilder::initialise(Message::shared_ptr& msg){ message = msg; } -void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){ +void MessageBuilder::setHeader(AMQHeaderBody* header){ if(!message.get()){ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish."); } @@ -65,7 +65,7 @@ void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){ route(); } -void MessageBuilder::addContent(AMQContentBody::shared_ptr& content){ +void MessageBuilder::addContent(AMQContentBody* content){ if(!message.get()){ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish."); } diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h index 7c4a529a64..18e85d7383 100644 --- a/cpp/src/qpid/broker/MessageBuilder.h +++ b/cpp/src/qpid/broker/MessageBuilder.h @@ -39,8 +39,8 @@ namespace qpid { MessageStore* const store = 0, uint64_t stagingThreshold = 0); void initialise(Message::shared_ptr& msg); - void setHeader(framing::AMQHeaderBody::shared_ptr& header); - void addContent(framing::AMQContentBody::shared_ptr& content); + void setHeader(framing::AMQHeaderBody* header); + void addContent(framing::AMQContentBody* content); Message::shared_ptr getMessage() { return message; } private: Message::shared_ptr message; diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index b5bea05eac..3f407c11f7 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -28,6 +28,7 @@ #include "BrokerAdapter.h" #include <boost/format.hpp> +#include <boost/cast.hpp> namespace qpid { namespace broker { @@ -159,9 +160,7 @@ MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ ) void MessageHandlerImpl::transfer(const framing::AMQMethodBody& context) { - MessageTransferBody::shared_ptr transfer( - make_shared_ptr(new MessageTransferBody(static_cast<const MessageTransferBody&>(context)))); - + const MessageTransferBody* transfer = boost::polymorphic_downcast<const MessageTransferBody*>(&context); if (transfer->getBody().isInline()) { MessageMessage::shared_ptr message(new MessageMessage(&connection, transfer)); channel.handleInlineTransfer(message); diff --git a/cpp/src/qpid/broker/Reference.cpp b/cpp/src/qpid/broker/Reference.cpp index c2060f8fee..283b231b60 100644 --- a/cpp/src/qpid/broker/Reference.cpp +++ b/cpp/src/qpid/broker/Reference.cpp @@ -40,9 +40,9 @@ Reference::shared_ptr ReferenceRegistry::get(const Reference::Id& id) { return i->second; } -void Reference::append(AppendPtr ptr) { - appends.push_back(ptr); - size += ptr->getBytes().length(); +void Reference::append(const framing::MessageAppendBody& app) { + appends.push_back(app); + size += app.getBytes().length(); } void Reference::close() { diff --git a/cpp/src/qpid/broker/Reference.h b/cpp/src/qpid/broker/Reference.h index 277eb7b917..5a373fbeba 100644 --- a/cpp/src/qpid/broker/Reference.h +++ b/cpp/src/qpid/broker/Reference.h @@ -19,6 +19,8 @@ * */ +#include "qpid/framing/MessageAppendBody.h" + #include <string> #include <vector> #include <map> @@ -56,8 +58,7 @@ class Reference typedef boost::shared_ptr<Reference> shared_ptr; typedef boost::shared_ptr<MessageMessage> MessagePtr; typedef std::vector<MessagePtr> Messages; - typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr; - typedef std::vector<AppendPtr> Appends; + typedef std::vector<framing::MessageAppendBody> Appends; Reference(const Id& id_=Id(), ReferenceRegistry* reg=0) : id(id_), size(0), registry(reg) {} @@ -69,7 +70,7 @@ class Reference void addMessage(MessagePtr message) { messages.push_back(message); } /** Append more data to the reference */ - void append(AppendPtr ptr); + void append(const framing::MessageAppendBody&); /** Close the reference, complete each associated message */ void close(); diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 6ef2162a4a..b7aa2aad25 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -22,6 +22,8 @@ #include "SemanticHandler.h" #include "BrokerAdapter.h" #include "qpid/framing/ChannelAdapter.h" +#include "qpid/framing/ExecutionCompleteBody.h" +#include "qpid/framing/ChannelCloseOkBody.h" using namespace qpid::broker; using namespace qpid::framing; @@ -60,7 +62,7 @@ void SemanticHandler::handle(framing::AMQFrame& frame) } //ChannelAdapter virtual methods: -void SemanticHandler::handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method) +void SemanticHandler::handleMethod(framing::AMQMethodBody* method) { try { if (!method->invoke(this)) { @@ -108,11 +110,11 @@ void SemanticHandler::flush() incoming.lwm = incoming.hwm; if (isOpen()) { Mutex::ScopedLock l(outLock); - ChannelAdapter::send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()))); + ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())); } } -void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method) +void SemanticHandler::handleL4(framing::AMQMethodBody* method) { try{ if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { @@ -139,17 +141,17 @@ bool SemanticHandler::isOpen() const return channel.isOpen(); } -void SemanticHandler::handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody> body) +void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody* body) { channel.handleHeader(body); } -void SemanticHandler::handleContent(boost::shared_ptr<qpid::framing::AMQContentBody> body) +void SemanticHandler::handleContent(qpid::framing::AMQContentBody* body) { channel.handleContent(body); } -void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody> body) +void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody* body) { channel.handleHeartbeat(body); } @@ -169,16 +171,12 @@ void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ msg->deliver(*this, tag, token, connection.getFrameMax()); } -void SemanticHandler::send(shared_ptr<AMQBody> body) +void SemanticHandler::send(const AMQBody& body) { Mutex::ScopedLock l(outLock); - uint8_t type(body->type()); - if (type == METHOD_BODY) { + if (body.getMethod() && body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID) { //temporary hack until channel management is moved to its own handler: - if (dynamic_pointer_cast<AMQMethodBody>(body)->amqpClassId() != ChannelOpenBody::CLASS_ID) { - ++outgoing.hwm; - //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl; - } + ++outgoing.hwm; } ChannelAdapter::send(body); } diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index 016c94738d..6748da8500 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -31,6 +31,14 @@ #include "qpid/framing/SequenceNumber.h" namespace qpid { + +namespace framing { +class AMQMethodBody; +class AMQHeaderBody; +class AMQContentBody; +class AMQHeaderBody; +} + namespace broker { class BrokerAdapter; @@ -48,16 +56,16 @@ class SemanticHandler : private framing::ChannelAdapter, framing::Window outgoing; sys::Mutex outLock; - void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method); + void handleL4(framing::AMQMethodBody* method); //ChannelAdapter virtual methods: - void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method); + void handleMethod(framing::AMQMethodBody* method); bool isOpen() const; - void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>); - void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>); - void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>); + void handleHeader(framing::AMQHeaderBody*); + void handleContent(framing::AMQContentBody*); + void handleHeartbeat(framing::AMQHeartbeatBody*); - void send(shared_ptr<framing::AMQBody> body); + void send(const framing::AMQBody& body); //delivery adapter methods: diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp index a6aea438f0..b3d720baf0 100644 --- a/cpp/src/qpid/client/ChannelHandler.cpp +++ b/cpp/src/qpid/client/ChannelHandler.cpp @@ -21,6 +21,7 @@ #include "ChannelHandler.h" #include "qpid/framing/amqp_framing.h" +#include "qpid/framing/all_method_bodies.h" using namespace qpid::client; using namespace qpid::framing; @@ -30,40 +31,39 @@ ChannelHandler::ChannelHandler() : StateManager(CLOSED), id(0) {} void ChannelHandler::incoming(AMQFrame& frame) { - AMQBody::shared_ptr body = frame.getBody(); + AMQBody* body = frame.getBody(); if (getState() == OPEN) { - if (isA<ChannelCloseBody>(body)) { - ChannelCloseBody::shared_ptr method(shared_polymorphic_cast<ChannelCloseBody>(body)); + ChannelCloseBody* closeBody= + dynamic_cast<ChannelCloseBody*>(body->getMethod()); + if (closeBody) { setState(CLOSED); if (onClose) { - onClose(method->getReplyCode(), method->getReplyText()); + onClose(closeBody->getReplyCode(), closeBody->getReplyText()); } } else { try { in(frame); }catch(ChannelException& e){ - if (body->type() == METHOD_BODY) { - AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body)); - close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); - } else { + AMQMethodBody* method=body->getMethod(); + if (method) + close(e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + else close(e.code, e.toString(), 0, 0); - } } } } else { - if (body->type() == METHOD_BODY) { - handleMethod(shared_polymorphic_cast<AMQMethodBody>(body)); - } else { + if (body->getMethod()) + handleMethod(body->getMethod()); + else throw new ConnectionException(504, "Channel not open."); - } - } } void ChannelHandler::outgoing(AMQFrame& frame) { if (getState() == OPEN) { - frame.channel = id; + frame.setChannel(id); out(frame); } else { throw Exception("Channel not open"); @@ -75,7 +75,7 @@ void ChannelHandler::open(uint16_t _id) id = _id; setState(OPENING); - AMQFrame f(version, id, make_shared_ptr(new ChannelOpenBody(version))); + AMQFrame f(version, id, ChannelOpenBody(version)); out(f); std::set<int> states; @@ -90,7 +90,7 @@ void ChannelHandler::open(uint16_t _id) void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId) { setState(CLOSING); - AMQFrame f(version, id, make_shared_ptr(new ChannelCloseBody(version, code, message, classId, methodId))); + AMQFrame f(version, id, ChannelCloseBody(version, code, message, classId, methodId)); out(f); } @@ -100,24 +100,24 @@ void ChannelHandler::close() waitFor(CLOSED); } -void ChannelHandler::handleMethod(AMQMethodBody::shared_ptr method) +void ChannelHandler::handleMethod(AMQMethodBody* method) { switch (getState()) { - case OPENING: + case OPENING: if (method->isA<ChannelOpenOkBody>()) { setState(OPEN); } else { throw ConnectionException(504, "Channel not opened."); } break; - case CLOSING: + case CLOSING: if (method->isA<ChannelCloseOkBody>()) { setState(CLOSED); } //else just ignore it break; - case CLOSED: + case CLOSED: throw ConnectionException(504, "Channel not opened."); - default: + default: throw Exception("Unexpected state encountered in ChannelHandler!"); } } diff --git a/cpp/src/qpid/client/ChannelHandler.h b/cpp/src/qpid/client/ChannelHandler.h index eaa7e7cc72..556e13a4f8 100644 --- a/cpp/src/qpid/client/ChannelHandler.h +++ b/cpp/src/qpid/client/ChannelHandler.h @@ -34,13 +34,7 @@ class ChannelHandler : private StateManager, public ChainableFrameHandler framing::ProtocolVersion version; uint16_t id; - void handleMethod(framing::AMQMethodBody::shared_ptr method); - - template <class T> bool isA(framing::AMQBody::shared_ptr body) { - return body->type() == framing::METHOD_BODY && - boost::shared_polymorphic_cast<framing::AMQMethodBody>(body)->isA<T>(); - } - + void handleMethod(framing::AMQMethodBody* method); void close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId); diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index 8ffddd0dbf..aa73e83328 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -25,11 +25,11 @@ #include "ClientMessage.h" #include "qpid/QpidError.h" #include "Connection.h" -#include "ConnectionHandler.h" #include "FutureResponse.h" #include "MessageListener.h" #include <boost/format.hpp> #include <boost/bind.hpp> +#include "qpid/framing/all_method_bodies.h" // FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent // handling of errors that should close the connection or the channel. @@ -37,7 +37,6 @@ // using namespace std; using namespace boost; -using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; @@ -49,13 +48,11 @@ const std::string empty; class ScopedSync { Session& session; -public: + public: ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } ~ScopedSync() { session.setSynchronous(false); } }; -}} - Channel::Channel(bool _transactional, u_int16_t _prefetch) : prefetch(_prefetch), transactional(_transactional), running(false) { @@ -250,3 +247,6 @@ void Channel::run() { } } catch (const QueueClosed&) {} } + +}} + diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index e2e83c8caf..e41ab363b5 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -27,6 +27,7 @@ #include "ClientChannel.h" #include "ConnectionImpl.h" #include "Session.h" +#include "qpid/framing/AMQP_HighestVersion.h" namespace qpid { diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index f47506d977..66db9384e2 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -22,6 +22,8 @@ #include "ConnectionHandler.h" #include "qpid/log/Statement.h" #include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQP_HighestVersion.h" +#include "qpid/framing/all_method_bodies.h" using namespace qpid::client; using namespace qpid::framing; @@ -53,16 +55,16 @@ void ConnectionHandler::incoming(AMQFrame& frame) throw Exception("Connection is closed."); } - AMQBody::shared_ptr body = frame.getBody(); + AMQBody* body = frame.getBody(); if (frame.getChannel() == 0) { - if (body->type() == METHOD_BODY) { - handle(shared_polymorphic_cast<AMQMethodBody>(body)); + if (body->getMethod()) { + handle(body->getMethod()); } else { error(503, "Cannot send content on channel zero."); } } else { switch(getState()) { - case OPEN: + case OPEN: try { in(frame); }catch(ConnectionException& e){ @@ -71,10 +73,10 @@ void ConnectionHandler::incoming(AMQFrame& frame) error(541/*internal error*/, e.what(), body); } break; - case CLOSING: + case CLOSING: QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored."); break; - default: + default: //must be in connection initialisation: fail("Cannot receive frames on non-zero channel until connection is established."); } @@ -101,32 +103,29 @@ void ConnectionHandler::waitForOpen() void ConnectionHandler::close() { setState(CLOSING); - send(make_shared_ptr(new ConnectionCloseBody(version, 200, OK, 0, 0))); - + send(ConnectionCloseBody(version, 200, OK, 0, 0)); waitFor(CLOSED); } -void ConnectionHandler::send(framing::AMQBody::shared_ptr body) +void ConnectionHandler::send(const framing::AMQBody& body) { - AMQFrame f; - f.setBody(body); + AMQFrame f(ProtocolVersion(), 0, body); out(f); } void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId) { setState(CLOSING); - send(make_shared_ptr(new ConnectionCloseBody(version, code, message, classId, methodId))); + send(ConnectionCloseBody(version, code, message, classId, methodId)); } -void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody::shared_ptr body) +void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body) { - if (body->type() == METHOD_BODY) { - AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body)); + AMQMethodBody* method = body->getMethod(); + if (method) error(code, message, method->amqpClassId(), method->amqpMethodId()); - } else { + else error(code, message); - } } @@ -136,54 +135,54 @@ void ConnectionHandler::fail(const std::string& message) setState(FAILED); } -void ConnectionHandler::handle(AMQMethodBody::shared_ptr method) +void ConnectionHandler::handle(AMQMethodBody* method) { switch (getState()) { - case NOT_STARTED: + case NOT_STARTED: if (method->isA<ConnectionStartBody>()) { setState(NEGOTIATING); string response = ((char)0) + uid + ((char)0) + pwd; - send(make_shared_ptr(new ConnectionStartOkBody(version, properties, mechanism, response, locale))); + send(ConnectionStartOkBody(version, properties, mechanism, response, locale)); } else { fail("Bad method sequence, expected connection-start."); } break; - case NEGOTIATING: + case NEGOTIATING: if (method->isA<ConnectionTuneBody>()) { - ConnectionTuneBody::shared_ptr proposal(shared_polymorphic_cast<ConnectionTuneBody>(method)); + ConnectionTuneBody* proposal=polymorphic_downcast<ConnectionTuneBody*>(method); heartbeat = proposal->getHeartbeat(); maxChannels = proposal->getChannelMax(); - send(make_shared_ptr(new ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat))); + send(ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat)); setState(OPENING); - send(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, insist))); - //TODO: support for further security challenges - //} else if (method->isA<ConnectionSecureBody>()) { + send(ConnectionOpenBody(version, vhost, capabilities, insist)); + //TODO: support for further security challenges + //} else if (method->isA<ConnectionSecureBody>()) { } else { fail("Unexpected method sequence, expected connection-tune."); } break; - case OPENING: + case OPENING: if (method->isA<ConnectionOpenOkBody>()) { setState(OPEN); - //TODO: support for redirection - //} else if (method->isA<ConnectionRedirectBody>()) { + //TODO: support for redirection + //} else if (method->isA<ConnectionRedirectBody>()) { } else { fail("Unexpected method sequence, expected connection-open-ok."); } break; - case OPEN: + case OPEN: if (method->isA<ConnectionCloseBody>()) { - send(make_shared_ptr(new ConnectionCloseOkBody(version))); + send(ConnectionCloseOkBody(version)); setState(CLOSED); if (onError) { - ConnectionCloseBody::shared_ptr c(shared_polymorphic_cast<ConnectionCloseBody>(method)); + ConnectionCloseBody* c=polymorphic_downcast<ConnectionCloseBody*>(method); onError(c->getReplyCode(), c->getReplyText()); } } else { error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId()); } break; - case CLOSING: + case CLOSING: if (method->isA<ConnectionCloseOkBody>()) { if (onClose) { onClose(); diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index 464d0ca26d..d05ae1428b 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -25,6 +25,8 @@ #include "StateManager.h" #include "ChainableFrameHandler.h" #include "qpid/framing/InputHandler.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/AMQMethodBody.h" namespace qpid { namespace client { @@ -53,10 +55,10 @@ class ConnectionHandler : private StateManager, enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED}; std::set<int> ESTABLISHED; - void handle(framing::AMQMethodBody::shared_ptr method); - void send(framing::AMQBody::shared_ptr body); + void handle(framing::AMQMethodBody* method); + void send(const framing::AMQBody& body); void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0); - void error(uint16_t code, const std::string& message, framing::AMQBody::shared_ptr body); + void error(uint16_t code, const std::string& message, framing::AMQBody* body); void fail(const std::string& message); public: diff --git a/cpp/src/qpid/client/Correlator.cpp b/cpp/src/qpid/client/Correlator.cpp index edb16bbcee..9ef6857957 100644 --- a/cpp/src/qpid/client/Correlator.cpp +++ b/cpp/src/qpid/client/Correlator.cpp @@ -25,7 +25,7 @@ using qpid::client::Correlator; using namespace qpid::framing; using namespace boost; -void Correlator::receive(AMQMethodBody::shared_ptr response) +void Correlator::receive(AMQMethodBody* response) { if (listeners.empty()) { throw ConnectionException(503, "Unexpected method!");//TODO: include the method & class name diff --git a/cpp/src/qpid/client/Correlator.h b/cpp/src/qpid/client/Correlator.h index 339c9bd0c4..d93e7b66cd 100644 --- a/cpp/src/qpid/client/Correlator.h +++ b/cpp/src/qpid/client/Correlator.h @@ -36,9 +36,9 @@ namespace client { class Correlator { public: - typedef boost::function<void(framing::AMQMethodBody::shared_ptr)> Listener; + typedef boost::function<void(framing::AMQMethodBody*)> Listener; - void receive(framing::AMQMethodBody::shared_ptr); + void receive(framing::AMQMethodBody*); void listen(Listener l); private: diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index abfce4f9d1..6ee6429b6b 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -23,31 +23,35 @@ #include "qpid/Exception.h" #include "qpid/framing/BasicDeliverBody.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/AMQP_HighestVersion.h" +#include "qpid/framing/all_method_bodies.h" using namespace qpid::client; using namespace qpid::framing; using namespace boost; -bool isMessageMethod(AMQMethodBody::shared_ptr method) +bool isMessageMethod(AMQMethodBody* method) { return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>(); } -bool isMessageMethod(AMQBody::shared_ptr body) +bool isMessageMethod(AMQBody* body) { - return body->type() == METHOD_BODY && isMessageMethod(shared_polymorphic_cast<AMQMethodBody>(body)); + AMQMethodBody* method=body->getMethod(); + return method && isMessageMethod(method); } bool isContentFrame(AMQFrame& frame) { - AMQBody::shared_ptr body = frame.getBody(); + AMQBody* body = frame.getBody(); uint8_t type = body->type(); return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); } -bool invoke(AMQBody::shared_ptr body, Invocable* target) +bool invoke(AMQBody* body, Invocable* target) { - return body->type() == METHOD_BODY && shared_polymorphic_cast<AMQMethodBody>(body)->invoke(target); + AMQMethodBody* method=body->getMethod(); + return method && method->invoke(target); } ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) : @@ -56,7 +60,7 @@ ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) : //incoming: void ExecutionHandler::handle(AMQFrame& frame) { - AMQBody::shared_ptr body = frame.getBody(); + AMQBody* body = frame.getBody(); if (!invoke(body, this)) { if (isContentFrame(frame)) { if (!arriving) { @@ -69,7 +73,7 @@ void ExecutionHandler::handle(AMQFrame& frame) } } else { ++incoming.hwm; - correlation.receive(shared_polymorphic_cast<AMQMethodBody>(body)); + correlation.receive(body->getMethod()); } } } @@ -95,16 +99,15 @@ void ExecutionHandler::flush() { //send completion incoming.lwm = incoming.hwm; - //make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()))); } void ExecutionHandler::sendFlush() { - AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version))); + AMQFrame frame(version, 0, ExecutionFlushBody()); out(frame); } -void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::Listener f, Correlator::Listener g) +void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g) { //allocate id: ++outgoing.hwm; @@ -116,18 +119,19 @@ void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::List correlation.listen(g); } - AMQFrame frame(version, 0/*id will be filled in be channel handler*/, command); + AMQFrame frame(version, 0/*id will be filled in be channel handler*/, + command); out(frame); } -void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeaderProperties& headers, const std::string& data, +void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data, CompletionTracker::Listener f, Correlator::Listener g) { send(command, f, g); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header->getProperties()), headers); - header->setContentSize(data.size()); + AMQHeaderBody header(BASIC); + BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header.getProperties()), headers); + header.setContentSize(data.size()); AMQFrame h(version, 0, header); out(h); @@ -136,7 +140,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade //frame itself uses 8 bytes u_int32_t frag_size = maxFrameSize - 8; if(data_length < frag_size){ - AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(data))); + AMQFrame frame(version, 0, AMQContentBody(data)); out(frame); }else{ u_int32_t offset = 0; @@ -144,7 +148,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade while (remaining > 0) { u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(data.substr(offset, length)); - AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(frag))); + AMQFrame frame(version, 0, AMQContentBody(frag)); out(frame); offset += length; remaining = data_length - offset; diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index f62598ef95..21613df779 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -56,10 +56,10 @@ public: void setMaxFrameSize(uint64_t size) { maxFrameSize = size; } void handle(framing::AMQFrame& frame); - void send(framing::AMQBody::shared_ptr command, + void send(const framing::AMQBody& command, CompletionTracker::Listener f = CompletionTracker::Listener(), Correlator::Listener g = Correlator::Listener()); - void sendContent(framing::AMQBody::shared_ptr command, + void sendContent(const framing::AMQBody& command, const framing::BasicHeaderProperties& headers, const std::string& data, CompletionTracker::Listener f = CompletionTracker::Listener(), Correlator::Listener g = Correlator::Listener()); diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp index 6b1246a449..e63dc9c192 100644 --- a/cpp/src/qpid/client/FutureResponse.cpp +++ b/cpp/src/qpid/client/FutureResponse.cpp @@ -26,16 +26,16 @@ using namespace qpid::framing; using namespace qpid::sys; -AMQMethodBody::shared_ptr FutureResponse::getResponse() +AMQMethodBody* FutureResponse::getResponse() { waitForCompletion(); - return response; + return response.get(); } -void FutureResponse::received(AMQMethodBody::shared_ptr r) +void FutureResponse::received(AMQMethodBody* r) { Monitor::ScopedLock l(lock); - response = r; + response = *r; complete = true; lock.notifyAll(); } diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/FutureResponse.h index ccc6fb5894..75b1f72c04 100644 --- a/cpp/src/qpid/client/FutureResponse.h +++ b/cpp/src/qpid/client/FutureResponse.h @@ -23,6 +23,7 @@ #define _FutureResponse_ #include "qpid/framing/amqp_framing.h" +#include "qpid/framing/MethodHolder.h" #include "FutureCompletion.h" namespace qpid { @@ -30,11 +31,10 @@ namespace client { class FutureResponse : public FutureCompletion { - framing::AMQMethodBody::shared_ptr response; - + framing::MethodHolder response; public: - framing::AMQMethodBody::shared_ptr getResponse(); - void received(framing::AMQMethodBody::shared_ptr response); + framing::AMQMethodBody* getResponse(); + void received(framing::AMQMethodBody* response); }; }} diff --git a/cpp/src/qpid/client/ReceivedContent.cpp b/cpp/src/qpid/client/ReceivedContent.cpp index 9cfee21c3c..5a1f48901a 100644 --- a/cpp/src/qpid/client/ReceivedContent.cpp +++ b/cpp/src/qpid/client/ReceivedContent.cpp @@ -20,6 +20,7 @@ */ #include "ReceivedContent.h" +#include "qpid/framing/all_method_bodies.h" using qpid::client::ReceivedContent; using namespace qpid::framing; @@ -27,9 +28,9 @@ using namespace boost; ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {} -void ReceivedContent::append(AMQBody::shared_ptr part) +void ReceivedContent::append(AMQBody* part) { - parts.push_back(part); + parts.push_back(AMQFrame(ProtocolVersion(), 0, part)); } bool ReceivedContent::isComplete() const @@ -37,7 +38,7 @@ bool ReceivedContent::isComplete() const if (parts.empty()) { return false; } else if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) { - AMQHeaderBody::shared_ptr headers(getHeaders()); + const AMQHeaderBody* headers(getHeaders()); return headers && headers->getContentSize() == getContentSize(); } else if (isA<MessageTransferBody>()) { //no longer support references, headers and data are still method fields @@ -48,14 +49,14 @@ bool ReceivedContent::isComplete() const } -AMQMethodBody::shared_ptr ReceivedContent::getMethod() const +const AMQMethodBody* ReceivedContent::getMethod() const { - return parts.empty() ? AMQMethodBody::shared_ptr() : dynamic_pointer_cast<AMQMethodBody>(parts[0]); + return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody()); } -AMQHeaderBody::shared_ptr ReceivedContent::getHeaders() const +const AMQHeaderBody* ReceivedContent::getHeaders() const { - return parts.size() < 2 ? AMQHeaderBody::shared_ptr() : dynamic_pointer_cast<AMQHeaderBody>(parts[1]); + return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody()); } uint64_t ReceivedContent::getContentSize() const @@ -63,7 +64,7 @@ uint64_t ReceivedContent::getContentSize() const if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) { uint64_t size(0); for (uint i = 2; i < parts.size(); i++) { - size += parts[i]->size(); + size += parts[i].getBody()->size(); } return size; } else if (isA<MessageTransferBody>()) { @@ -78,7 +79,7 @@ std::string ReceivedContent::getContent() const if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) { string data; for (uint i = 2; i < parts.size(); i++) { - data += dynamic_pointer_cast<AMQContentBody>(parts[i])->getData(); + data += static_cast<const AMQContentBody*>(parts[i].getBody())->getData(); } return data; } else if (isA<MessageTransferBody>()) { @@ -93,7 +94,7 @@ void ReceivedContent::populate(Message& msg) if (!isComplete()) throw Exception("Incomplete message"); if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) { - BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(getHeaders()->getProperties()); + const BasicHeaderProperties* properties = dynamic_cast<const BasicHeaderProperties*>(getHeaders()->getProperties()); BasicHeaderProperties::copy<Message, BasicHeaderProperties>(msg, *properties); msg.setData(getContent()); } else if (isA<MessageTransferBody>()) { diff --git a/cpp/src/qpid/client/ReceivedContent.h b/cpp/src/qpid/client/ReceivedContent.h index 1886034f9b..4f84039c10 100644 --- a/cpp/src/qpid/client/ReceivedContent.h +++ b/cpp/src/qpid/client/ReceivedContent.h @@ -20,8 +20,8 @@ */ #include <string> #include <vector> -#include <boost/shared_ptr.hpp> #include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" #include "qpid/framing/SequenceNumber.h" #include "ClientMessage.h" @@ -38,37 +38,29 @@ namespace client { class ReceivedContent { const framing::SequenceNumber id; - std::vector<framing::AMQBody::shared_ptr> parts; + std::vector<framing::AMQFrame> parts; public: typedef boost::shared_ptr<ReceivedContent> shared_ptr; ReceivedContent(const framing::SequenceNumber& id); - void append(framing::AMQBody::shared_ptr part); + void append(framing::AMQBody* part); bool isComplete() const; uint64_t getContentSize() const; std::string getContent() const; - framing::AMQMethodBody::shared_ptr getMethod() const; - framing::AMQHeaderBody::shared_ptr getHeaders() const; + const framing::AMQMethodBody* getMethod() const; + const framing::AMQHeaderBody* getHeaders() const; template <class T> bool isA() const { - framing::AMQMethodBody::shared_ptr method = getMethod(); - if (!method) { - return false; - } else { - return method->isA<T>(); - } + const framing::AMQMethodBody* method=getMethod(); + return method && method->isA<T>(); } - template <class T> boost::shared_ptr<T> as() const { - framing::AMQMethodBody::shared_ptr method = getMethod(); - if (method && method->isA<T>()) { - return boost::dynamic_pointer_cast<T>(method); - } else { - return boost::shared_ptr<T>(); - } + template <class T> const T* as() const { + const framing::AMQMethodBody* method=getMethod(); + return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0; } const framing::SequenceNumber& getId() const { return id; } diff --git a/cpp/src/qpid/client/Response.h b/cpp/src/qpid/client/Response.h index 745d4648ad..ad37d7c0f4 100644 --- a/cpp/src/qpid/client/Response.h +++ b/cpp/src/qpid/client/Response.h @@ -38,12 +38,12 @@ public: template <class T> T& as() { - framing::AMQMethodBody::shared_ptr response(future->getResponse()); + framing::AMQMethodBody* response(future->getResponse()); return dynamic_cast<T&>(*response); } template <class T> bool isA() { - framing::AMQMethodBody::shared_ptr response(future->getResponse()); + framing::AMQMethodBody* response(future->getResponse()); return response && response->isA<T>(); } diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 391dcd909d..f7ed7416cd 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -44,7 +44,7 @@ void SessionCore::flush() l3.sendFlush(); } -Response SessionCore::send(AMQMethodBody::shared_ptr method, bool expectResponse) +Response SessionCore::send(const AMQMethodBody& method, bool expectResponse) { boost::shared_ptr<FutureResponse> f(futures.createResponse()); if (expectResponse) { @@ -59,7 +59,7 @@ Response SessionCore::send(AMQMethodBody::shared_ptr method, bool expectResponse return Response(f); } -Response SessionCore::send(AMQMethodBody::shared_ptr method, const MethodContent& content, bool expectResponse) +Response SessionCore::send(const AMQMethodBody& method, const MethodContent& content, bool expectResponse) { //TODO: lots of duplication between these two send methods; refactor boost::shared_ptr<FutureResponse> f(futures.createResponse()); diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index 15cd36114f..bcbaf0028d 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -47,8 +47,8 @@ public: typedef boost::shared_ptr<SessionCore> shared_ptr; SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize); - Response send(framing::AMQMethodBody::shared_ptr method, bool expectResponse = false); - Response send(framing::AMQMethodBody::shared_ptr method, const framing::MethodContent& content, bool expectResponse = false); + Response send(const framing::AMQMethodBody& method, bool expectResponse = false); + Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false); ReceivedContent::shared_ptr get(); uint16_t getId() const { return id; } void setSync(bool); diff --git a/cpp/src/qpid/cluster/ClassifierHandler.cpp b/cpp/src/qpid/cluster/ClassifierHandler.cpp index 1cce126800..9410a3cc38 100644 --- a/cpp/src/qpid/cluster/ClassifierHandler.cpp +++ b/cpp/src/qpid/cluster/ClassifierHandler.cpp @@ -36,7 +36,7 @@ typedef uint32_t FullMethodId; // Combind class & method ID. FullMethodId fullId(ClassId c, MethodId m) { return c<<16+m; } -FullMethodId fullId(const shared_ptr<AMQMethodBody>& body) { +FullMethodId fullId(const AMQMethodBody*& body) { return fullId(body->amqpClassId(), body->amqpMethodId()); } @@ -59,8 +59,8 @@ void ClassifierHandler::handle(AMQFrame& frame) { // TODO aconway 2007-07-03: Flatten the frame hierarchy so we // can do a single lookup to dispatch a frame. Chain chosen; - shared_ptr<AMQMethodBody> method = - dynamic_pointer_cast<AMQMethodBody>(frame.getBody()); + AMQMethodBody* method = dynamic_cast<AMQMethodBody*>(frame.getBody()); + // FIXME aconway 2007-07-05: Need to stop bypassed frames // from overtaking mcast frames. // diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index e5af237bd2..d9f1679fdf 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -95,7 +95,7 @@ void Cluster::handle(AMQFrame& frame) { void Cluster::notify() { AMQFrame frame(ProtocolVersion(), 0, - new ClusterNotifyBody(ProtocolVersion(), url)); + ClusterNotifyBody(ProtocolVersion(), url)); handle(frame); } diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp index a4efa939fc..b8290ad953 100644 --- a/cpp/src/qpid/cluster/SessionManager.cpp +++ b/cpp/src/qpid/cluster/SessionManager.cpp @@ -36,7 +36,7 @@ using namespace sys; using namespace broker; /** Handler to send frames direct to local broker (bypass correlation etc.) */ - struct BrokerHandler : public FrameHandler, private ChannelAdapter, private DeliveryAdapter { +struct BrokerHandler : public FrameHandler, private ChannelAdapter, private DeliveryAdapter { Connection connection; Channel channel; BrokerAdapter adapter; @@ -61,13 +61,13 @@ using namespace broker; } // Dummy methods. - virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>){} - virtual void handleContent(boost::shared_ptr<AMQContentBody>){} - virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>){} + virtual void handleHeader(AMQHeaderBody*){} + virtual void handleContent(AMQContentBody*){} + virtual void handleHeartbeat(AMQHeartbeatBody*){} virtual bool isOpen() const{ return true; } - virtual void handleMethod(shared_ptr<AMQMethodBody>){} + virtual void handleMethod(AMQMethodBody*){} // No-op send. - virtual void send(shared_ptr<AMQBody>) {} + virtual void send(const AMQBody&) {} //delivery adapter methods, also no-ops: virtual DeliveryId deliver(Message::shared_ptr&, DeliveryToken::shared_ptr) { return 0; } diff --git a/cpp/src/qpid/framing/AMQBody.h b/cpp/src/qpid/framing/AMQBody.h index eaa568c06a..0b2718bef3 100644 --- a/cpp/src/qpid/framing/AMQBody.h +++ b/cpp/src/qpid/framing/AMQBody.h @@ -1,3 +1,6 @@ +#ifndef QPID_FRAMING_AMQBODY_H +#define QPID_FRAMING_AMQBODY_H + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,42 +21,58 @@ * under the License. * */ -#include <boost/shared_ptr.hpp> -#include "amqp_types.h" -#include "Buffer.h" +#include "qpid/framing/amqp_types.h" +#include "qpid/shared_ptr.h" -#ifndef _AMQBody_ -#define _AMQBody_ +#include <ostream> namespace qpid { - namespace framing { +namespace framing { + +class Buffer; + +class AMQMethodBody; +class AMQHeaderBody; +class AMQContentBody; +class AMQHeartbeatBody; + +struct AMQBodyConstVisitor { + virtual ~AMQBodyConstVisitor() {} + virtual void visit(const AMQHeaderBody&) = 0; + virtual void visit(const AMQContentBody&) = 0; + virtual void visit(const AMQHeartbeatBody&) = 0; + virtual void visit(const AMQMethodBody&) = 0; +}; + +class AMQBody +{ + public: + typedef shared_ptr<AMQBody> shared_ptr; + + virtual ~AMQBody(); + + virtual uint8_t type() const = 0; - class AMQBody - { - public: - typedef boost::shared_ptr<AMQBody> shared_ptr; + virtual void encode(Buffer& buffer) const = 0; + virtual void decode(Buffer& buffer, uint32_t=0) = 0; + virtual uint32_t size() const = 0; - virtual ~AMQBody(); - virtual uint32_t size() const = 0; - virtual uint8_t type() const = 0; - virtual void encode(Buffer& buffer) const = 0; - virtual void decode(Buffer& buffer, uint32_t size) = 0; + virtual void print(std::ostream& out) const = 0; + virtual void accept(AMQBodyConstVisitor&) const = 0; - virtual void print(std::ostream& out) const = 0; - }; + virtual AMQMethodBody* getMethod() { return 0; } + virtual const AMQMethodBody* getMethod() const { return 0; } +}; - std::ostream& operator<<(std::ostream& out, const AMQBody& body) ; +std::ostream& operator<<(std::ostream& out, const AMQBody& body) ; - enum BodyTypes { - METHOD_BODY = 1, - HEADER_BODY = 2, - CONTENT_BODY = 3, - HEARTBEAT_BODY = 8, - REQUEST_BODY = 9, - RESPONSE_BODY = 10 - }; - } -} +enum BodyTypes { + METHOD_BODY = 1, + HEADER_BODY = 2, + CONTENT_BODY = 3, + HEARTBEAT_BODY = 8 +}; +}} // namespace qpid::framing -#endif +#endif /*!QPID_FRAMING_AMQBODY_H*/ diff --git a/cpp/src/qpid/framing/AMQContentBody.cpp b/cpp/src/qpid/framing/AMQContentBody.cpp index 19d2e5714a..c472af555d 100644 --- a/cpp/src/qpid/framing/AMQContentBody.cpp +++ b/cpp/src/qpid/framing/AMQContentBody.cpp @@ -12,7 +12,7 @@ * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +n * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. diff --git a/cpp/src/qpid/framing/AMQContentBody.h b/cpp/src/qpid/framing/AMQContentBody.h index 6d4c1ef4b6..a476796015 100644 --- a/cpp/src/qpid/framing/AMQContentBody.h +++ b/cpp/src/qpid/framing/AMQContentBody.h @@ -28,13 +28,11 @@ namespace qpid { namespace framing { -class AMQContentBody : public AMQBody +class AMQContentBody : public AMQBody { string data; public: - typedef boost::shared_ptr<AMQContentBody> shared_ptr; - AMQContentBody(); AMQContentBody(const string& data); inline virtual ~AMQContentBody(){} @@ -44,6 +42,7 @@ public: void encode(Buffer& buffer) const; void decode(Buffer& buffer, uint32_t size); void print(std::ostream& out) const; + void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } }; } diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index f79eae3524..780af71be4 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -1,4 +1,3 @@ - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,46 +18,70 @@ * under the License. * */ -#include <boost/format.hpp> - #include "AMQFrame.h" + #include "qpid/QpidError.h" -#include "AMQMethodBody.h" +#include "qpid/framing/variant.h" +#include "qpid/framing/AMQMethodBody.h" +#include <boost/format.hpp> + +#include <iostream> namespace qpid { namespace framing { +namespace { +struct GetBodyVisitor : public NoBlankVisitor<AMQBody*> { + QPID_USING_NOBLANK(AMQBody*); + AMQBody* operator()(MethodHolder& t) const { return t.get(); } + template <class T> AMQBody* operator()(T& t) const { return &t; } +}; + +struct EncodeVisitor : public NoBlankVisitor<void> { + Buffer& buffer; + EncodeVisitor(Buffer& b) : buffer(b) {} + + QPID_USING_NOBLANK(void); + template <class T> void operator()(const T& t) const { return t.encode(buffer); } +}; + +struct SizeVisitor : public NoBlankVisitor<uint32_t> { + QPID_USING_NOBLANK(uint32_t); + template <class T> uint32_t operator()(const T& t) const { return t.size(); } +}; + +struct DecodeVisitor : public NoBlankVisitor<void> { + Buffer& buffer; + uint32_t size; + DecodeVisitor(Buffer& b, uint32_t s) : buffer(b), size(s) {} + QPID_USING_NOBLANK(void); + void operator()(MethodHolder& t) const { return t.decode(buffer); } + template <class T> void operator()(T& t) const { return t.decode(buffer, size); } +}; -AMQP_MethodVersionMap AMQFrame::versionMap; - -AMQFrame::AMQFrame(ProtocolVersion _version) - : channel(0), type(0), version(_version) - { - assert(version != ProtocolVersion(0,0)); - } - -AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, AMQBody* _body) : channel(_channel), body(_body),version(_version) {} +} -AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, const AMQBody::shared_ptr& _body) : - channel(_channel), body(_body), version(_version) -{} +AMQBody* AMQFrame::getBody() { + return boost::apply_visitor(GetBodyVisitor(), body); +} -AMQFrame::~AMQFrame() {} +const AMQBody* AMQFrame::getBody() const { + return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body)); +} void AMQFrame::encode(Buffer& buffer) { - buffer.putOctet(body->type()); + buffer.putOctet(getBody()->type()); buffer.putShort(channel); - buffer.putLong(body->size()); - body->encode(buffer); + buffer.putLong(boost::apply_visitor(SizeVisitor(), body)); + boost::apply_visitor(EncodeVisitor(buffer), body); buffer.putOctet(0xCE); } uint32_t AMQFrame::size() const{ - assert(body.get()); - return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size() - + 1/*0xCE*/; + return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + + boost::apply_visitor(SizeVisitor(), body) + 1/*0xCE*/; } bool AMQFrame::decode(Buffer& buffer) @@ -66,56 +89,42 @@ bool AMQFrame::decode(Buffer& buffer) if(buffer.available() < 7) return false; buffer.record(); - uint32_t frameSize = decodeHead(buffer); - if(buffer.available() < frameSize + 1){ + + uint8_t type = buffer.getOctet(); + channel = buffer.getShort(); + uint32_t size = buffer.getLong(); + + if(buffer.available() < size+1){ buffer.restore(); return false; } - decodeBody(buffer, frameSize); + decodeBody(buffer, size, type); uint8_t end = buffer.getOctet(); if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); return true; } -uint32_t AMQFrame::decodeHead(Buffer& buffer){ - type = buffer.getOctet(); - channel = buffer.getShort(); - return buffer.getLong(); -} - -void AMQFrame::decodeBody(Buffer& buffer, uint32_t size) +void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type) { switch(type) { - case METHOD_BODY: - body = AMQMethodBody::create(versionMap, version, buffer); - break; - case HEADER_BODY: - body = AMQBody::shared_ptr(new AMQHeaderBody()); - break; - case CONTENT_BODY: - body = AMQBody::shared_ptr(new AMQContentBody()); - break; - case HEARTBEAT_BODY: - body = AMQBody::shared_ptr(new AMQHeartbeatBody()); - break; + case METHOD_BODY: body = MethodHolder(); break; + case HEADER_BODY: body = AMQHeaderBody(); break; + case CONTENT_BODY: body = AMQContentBody(); break; + case HEARTBEAT_BODY: body = AMQHeartbeatBody(); break; + default: THROW_QPID_ERROR( FRAMING_ERROR, boost::format("Unknown frame type %d") % type); } - body->decode(buffer, size); + boost::apply_visitor(DecodeVisitor(buffer,size), body); } -std::ostream& operator<<(std::ostream& out, const AMQFrame& t) +std::ostream& operator<<(std::ostream& out, const AMQFrame& f) { - out << "Frame[channel=" << t.channel << "; "; - if (t.body.get() == 0) - out << "empty"; - else - out << *t.body; - out << "]"; - return out; + return out << "Frame[channel=" << f.getChannel() << "; " << *f.getBody() + << "]"; } diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h index 16c1427802..9e825a9936 100644 --- a/cpp/src/qpid/framing/AMQFrame.h +++ b/cpp/src/qpid/framing/AMQFrame.h @@ -21,55 +21,76 @@ * under the License. * */ -#include <boost/cast.hpp> - -#include "amqp_types.h" -#include "AMQBody.h" #include "AMQDataBlock.h" -#include "AMQMethodBody.h" #include "AMQHeaderBody.h" #include "AMQContentBody.h" #include "AMQHeartbeatBody.h" -#include "qpid/framing/AMQP_MethodVersionMap.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/Buffer.h" -#include "qpid/shared_ptr.h" +#include "MethodHolder.h" +#include "ProtocolVersion.h" + +#include <boost/cast.hpp> +#include <boost/variant.hpp> namespace qpid { namespace framing { - class AMQFrame : public AMQDataBlock { public: - AMQFrame(ProtocolVersion _version = highestProtocolVersion); - AMQFrame(ProtocolVersion _version, uint16_t channel, AMQBody* body); - AMQFrame(ProtocolVersion _version, uint16_t channel, const AMQBody::shared_ptr& body); - virtual ~AMQFrame(); - virtual void encode(Buffer& buffer); - virtual bool decode(Buffer& buffer); - virtual uint32_t size() const; - uint16_t getChannel() const { return channel; } - - shared_ptr<AMQBody> getBody() { return body; } - void setBody(const shared_ptr<AMQBody>& b) { body = b; } - - /** Convenience template to cast the body to an expected type */ - template <class T> boost::shared_ptr<T> castBody() { - assert(dynamic_cast<T*>(getBody().get())); - boost::static_pointer_cast<T>(getBody()); + AMQFrame(ProtocolVersion=ProtocolVersion()) {} + + /** Construct a frame with a copy of b */ + AMQFrame(ProtocolVersion, ChannelId c, const AMQBody* b) : channel(c) { + setBody(*b); + } + + AMQFrame(ProtocolVersion, ChannelId c, const AMQBody& b) : channel(c) { + setBody(b); } + + ChannelId getChannel() const { return channel; } + void setChannel(ChannelId c) { channel = c; } - uint32_t decodeHead(Buffer& buffer); - void decodeBody(Buffer& buffer, uint32_t size); + AMQBody* getBody(); + const AMQBody* getBody() const; - uint16_t channel; - uint8_t type; - AMQBody::shared_ptr body; - ProtocolVersion version; + /** Copy a body instance to the frame */ + void setBody(const AMQBody& b) { CopyVisitor cv(*this); b.accept(cv); } + + /** Convenience template to cast the body to an expected type. */ + template <class T> T* castBody() { + boost::polymorphic_downcast<T*>(getBody()); + } + + bool empty() { return boost::get<boost::blank>(&body); } + + void encode(Buffer& buffer); + bool decode(Buffer& buffer); + uint32_t size() const; private: - static AMQP_MethodVersionMap versionMap; + struct CopyVisitor : public AMQBodyConstVisitor { + AMQFrame& frame; + CopyVisitor(AMQFrame& f) : frame(f) {} + void visit(const AMQHeaderBody& x) { frame.body=x; } + void visit(const AMQContentBody& x) { frame.body=x; } + void visit(const AMQHeartbeatBody& x) { frame.body=x; } + void visit(const AMQMethodBody& x) { frame.body=MethodHolder(x); } + }; + friend struct CopyVisitor; + + typedef boost::variant<boost::blank, + AMQHeaderBody, + AMQContentBody, + AMQHeartbeatBody, + MethodHolder> Variant; + + void visit(AMQHeaderBody& x) { body=x; } + + void decodeBody(Buffer& buffer, uint32_t size, uint8_t type); + + uint16_t channel; + Variant body; }; std::ostream& operator<<(std::ostream&, const AMQFrame&); diff --git a/cpp/src/qpid/framing/AMQHeaderBody.cpp b/cpp/src/qpid/framing/AMQHeaderBody.cpp index 4d3611eb40..6a3c8f27d1 100644 --- a/cpp/src/qpid/framing/AMQHeaderBody.cpp +++ b/cpp/src/qpid/framing/AMQHeaderBody.cpp @@ -22,54 +22,34 @@ #include "qpid/QpidError.h" #include "BasicHeaderProperties.h" -qpid::framing::AMQHeaderBody::AMQHeaderBody(int classId) : weight(0), contentSize(0){ - createProperties(classId); -} +qpid::framing::AMQHeaderBody::AMQHeaderBody(int) : weight(0), contentSize(0) {} -qpid::framing::AMQHeaderBody::AMQHeaderBody() : properties(0), weight(0), contentSize(0){ -} +qpid::framing::AMQHeaderBody::AMQHeaderBody() : weight(0), contentSize(0){} -qpid::framing::AMQHeaderBody::~AMQHeaderBody(){ - delete properties; -} +qpid::framing::AMQHeaderBody::~AMQHeaderBody(){} uint32_t qpid::framing::AMQHeaderBody::size() const{ - return 12 + properties->size(); + return 12 + properties.size(); } void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const { - buffer.putShort(properties->classId()); + buffer.putShort(properties.classId()); buffer.putShort(weight); buffer.putLongLong(contentSize); - properties->encode(buffer); + properties.encode(buffer); } void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t bufSize){ - uint16_t classId = buffer.getShort(); + buffer.getShort(); // Ignore classId weight = buffer.getShort(); contentSize = buffer.getLongLong(); - createProperties(classId); - properties->decode(buffer, bufSize - 12); -} - -void qpid::framing::AMQHeaderBody::createProperties(int classId){ - switch(classId){ - case BASIC: - properties = new qpid::framing::BasicHeaderProperties(); - break; - default: - THROW_QPID_ERROR(FRAMING_ERROR, "Unknown header class"); - } + properties.decode(buffer, bufSize - 12); } void qpid::framing::AMQHeaderBody::print(std::ostream& out) const { out << "header (" << size() << " bytes)" << " content_size=" << getContentSize(); - const BasicHeaderProperties* props = - dynamic_cast<const BasicHeaderProperties*>(getProperties()); - if (props) { - out << ", message_id=" << props->getMessageId(); - out << ", delivery_mode=" << (int) props->getDeliveryMode(); - out << ", headers=" << const_cast<BasicHeaderProperties*>(props)->getHeaders(); - } + out << ", message_id=" << properties.getMessageId(); + out << ", delivery_mode=" << (int) properties.getDeliveryMode(); + out << ", headers=" << properties.getHeaders(); } diff --git a/cpp/src/qpid/framing/AMQHeaderBody.h b/cpp/src/qpid/framing/AMQHeaderBody.h index 691ceeff73..894936060c 100644 --- a/cpp/src/qpid/framing/AMQHeaderBody.h +++ b/cpp/src/qpid/framing/AMQHeaderBody.h @@ -21,7 +21,7 @@ #include "amqp_types.h" #include "AMQBody.h" #include "Buffer.h" -#include "HeaderProperties.h" +#include "BasicHeaderProperties.h" #ifndef _AMQHeaderBody_ #define _AMQHeaderBody_ @@ -31,19 +31,15 @@ namespace framing { class AMQHeaderBody : public AMQBody { - HeaderProperties* properties; + BasicHeaderProperties properties; uint16_t weight; uint64_t contentSize; - - void createProperties(int classId); -public: - typedef boost::shared_ptr<AMQHeaderBody> shared_ptr; - + public: AMQHeaderBody(int classId); AMQHeaderBody(); inline uint8_t type() const { return HEADER_BODY; } - HeaderProperties* getProperties(){ return properties; } - const HeaderProperties* getProperties() const { return properties; } + BasicHeaderProperties* getProperties(){ return &properties; } + const BasicHeaderProperties* getProperties() const { return &properties; } inline uint64_t getContentSize() const { return contentSize; } inline void setContentSize(uint64_t _size) { contentSize = _size; } virtual ~AMQHeaderBody(); @@ -51,6 +47,8 @@ public: virtual void encode(Buffer& buffer) const; virtual void decode(Buffer& buffer, uint32_t size); virtual void print(std::ostream& out) const; + + void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } }; } diff --git a/cpp/src/qpid/framing/AMQHeartbeatBody.h b/cpp/src/qpid/framing/AMQHeartbeatBody.h index 4c046b81a7..a2701c3398 100644 --- a/cpp/src/qpid/framing/AMQHeartbeatBody.h +++ b/cpp/src/qpid/framing/AMQHeartbeatBody.h @@ -31,14 +31,13 @@ namespace framing { class AMQHeartbeatBody : public AMQBody { public: - typedef boost::shared_ptr<AMQHeartbeatBody> shared_ptr; - virtual ~AMQHeartbeatBody(); inline uint32_t size() const { return 0; } inline uint8_t type() const { return HEARTBEAT_BODY; } inline void encode(Buffer& ) const {} inline void decode(Buffer& , uint32_t /*size*/) {} virtual void print(std::ostream& out) const; + void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } }; } diff --git a/cpp/src/qpid/framing/AMQMethodBody.cpp b/cpp/src/qpid/framing/AMQMethodBody.cpp index 0a2720e69a..924d906d43 100644 --- a/cpp/src/qpid/framing/AMQMethodBody.cpp +++ b/cpp/src/qpid/framing/AMQMethodBody.cpp @@ -18,51 +18,11 @@ * under the License. * */ -#include "AMQFrame.h" #include "AMQMethodBody.h" -#include "qpid/QpidError.h" -#include "qpid/framing/AMQP_MethodVersionMap.h" namespace qpid { namespace framing { -void AMQMethodBody::encodeId(Buffer& buffer) const{ - buffer.putShort(amqpClassId()); - buffer.putShort(amqpMethodId()); -} - -void AMQMethodBody::invoke(AMQP_ServerOperations&){ - assert(0); - THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server."); -} - -bool AMQMethodBody::invoke(Invocable*) { - return false; -} - -AMQMethodBody::shared_ptr AMQMethodBody::create( - AMQP_MethodVersionMap& versionMap, ProtocolVersion version, - Buffer& buffer) -{ - ClassMethodId id; - id.decode(buffer); - return AMQMethodBody::shared_ptr( - versionMap.createMethodBody( - id.classId, id.methodId, version.getMajor(), version.getMinor())); -} - -void AMQMethodBody::ClassMethodId::decode(Buffer& buffer) { - classId = buffer.getShort(); - methodId = buffer.getShort(); -} - -void AMQMethodBody::decode(Buffer& buffer, uint32_t /*size*/) { - decodeContent(buffer); -} - -void AMQMethodBody::encode(Buffer& buffer) const { - encodeId(buffer); - encodeContent(buffer); -} +AMQMethodBody::~AMQMethodBody() {} }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h index 73c5eb78a6..9c776e143b 100644 --- a/cpp/src/qpid/framing/AMQMethodBody.h +++ b/cpp/src/qpid/framing/AMQMethodBody.h @@ -21,65 +21,48 @@ * under the License. * */ -#include <iostream> #include "amqp_types.h" #include "AMQBody.h" -#include "Buffer.h" -#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/shared_ptr.h" + +#include <ostream> + +#include <assert.h> namespace qpid { namespace framing { -class AMQP_MethodVersionMap; +class Buffer; +class AMQP_ServerOperations; +class Invocable; +class MethodBodyConstVisitor; -class AMQMethodBody : public AMQBody -{ +class AMQMethodBody : public AMQBody { public: - typedef boost::shared_ptr<AMQMethodBody> shared_ptr; - - static shared_ptr create( - AMQP_MethodVersionMap& map, ProtocolVersion version, Buffer& buf); - - ProtocolVersion version; - uint8_t type() const { return METHOD_BODY; } - AMQMethodBody(uint8_t major, uint8_t minor) : version(major, minor) {} - AMQMethodBody(ProtocolVersion ver) : version(ver) {} - virtual ~AMQMethodBody() {} - void decode(Buffer&, uint32_t); - virtual void encode(Buffer& buffer) const; + AMQMethodBody() {} + AMQMethodBody(uint8_t, uint8_t) {} + + virtual ~AMQMethodBody(); + virtual void accept(MethodBodyConstVisitor&) const = 0; + virtual MethodId amqpMethodId() const = 0; virtual ClassId amqpClassId() const = 0; - virtual void invoke(AMQP_ServerOperations&); - virtual bool invoke(Invocable* target); + virtual void invoke(AMQP_ServerOperations&) { assert(0); } + virtual bool invoke(Invocable*) { return false; } - template <class T> bool isA() { + template <class T> bool isA() const { return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID; } - /** Return request ID or response correlationID */ - virtual RequestId getRequestId() const { return 0; } - - virtual bool isRequest() const { return false; } - virtual bool isResponse() const { return false; } - - static uint32_t baseSize() { return 4; } - protected: - - struct ClassMethodId { - uint16_t classId; - uint16_t methodId; - void decode(Buffer& b); - }; - - void encodeId(Buffer& buffer) const; - virtual void encodeContent(Buffer& buffer) const = 0; - virtual void decodeContent(Buffer& buffer) = 0; - - virtual void printPrefix(std::ostream&) const {} + virtual uint32_t size() const = 0; + virtual uint8_t type() const { return METHOD_BODY; } - friend class MethodHolder; + AMQMethodBody* getMethod() { return this; } + const AMQMethodBody* getMethod() const { return this; } + void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } }; diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.h b/cpp/src/qpid/framing/BasicHeaderProperties.h index a6347b37fd..a8ef401b50 100644 --- a/cpp/src/qpid/framing/BasicHeaderProperties.h +++ b/cpp/src/qpid/framing/BasicHeaderProperties.h @@ -57,7 +57,7 @@ class BasicHeaderProperties : public HeaderProperties virtual void encode(Buffer& buffer) const; virtual void decode(Buffer& buffer, uint32_t size); - virtual uint8_t classId() { return BASIC; } + virtual uint8_t classId() const { return BASIC; } string getContentType() const { return contentType; } string getContentEncoding() const { return contentEncoding; } diff --git a/cpp/src/qpid/framing/Blob.cpp b/cpp/src/qpid/framing/Blob.cpp new file mode 100644 index 0000000000..0aaeb4138e --- /dev/null +++ b/cpp/src/qpid/framing/Blob.cpp @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Blob.h" + + +namespace qpid { +namespace framing { + +void BlobHelper<void>::destroy(void*) {} +void BlobHelper<void>::copy(void*, const void*) {} + +}} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/Blob.h b/cpp/src/qpid/framing/Blob.h index 1754e851e5..f89bad55ea 100644 --- a/cpp/src/qpid/framing/Blob.h +++ b/cpp/src/qpid/framing/Blob.h @@ -33,6 +33,8 @@ namespace boost { /** + * 0-arg typed_in_place_factory constructor and in_place() override. + * * Boost doesn't provide the 0 arg version since it assumes * in_place_factory will be used when there is no default ctor. */ @@ -48,20 +50,29 @@ typed_in_place_factory0<T> in_place() { return typed_in_place_factory0<T>(); } } // namespace boost + namespace qpid { namespace framing { using boost::in_place; -template <class T> -void destroyT(void* ptr) { static_cast<T*>(ptr)->~T(); } -inline void nullDestroy(void*) {} +template <class T> struct BlobHelper { + static void destroy(void* ptr) { static_cast<T*>(ptr)->~T(); } + static void copy(void* dest, const void* src) { + new (dest) T(*static_cast<const T*>(src)); + } +}; + +template <> struct BlobHelper<void> { + static void destroy(void*); + static void copy(void* to, const void* from); +}; /** * A "blob" is a chunk of memory which can contain a single object at * a time arbitrary type, provided sizeof(T)<=blob.size(). Blob * ensures proper construction and destruction of its contents, - * nothing else. + * and proper copying between Blobs, but nothing else. * * In particular the user must ensure the blob is big enough for its * contents and must know the type of object in the blob to cast get(). @@ -75,7 +86,13 @@ class Blob { boost::aligned_storage<Size> store; void (*destroy)(void*); + void (*copy)(void*, const void*); + template <class T> void setType() { + destroy=&BlobHelper<T>::destroy; + copy=&BlobHelper<T>::copy; + } + template<class TypedInPlaceFactory> void construct (const TypedInPlaceFactory& factory, const boost::typed_in_place_factory_base* ) @@ -84,16 +101,28 @@ class Blob assert(sizeof(T) <= Size); clear(); // Destroy old object. factory.apply(store.address()); - destroy=&destroyT<T>; + setType<T>(); } public: /** Construct an empty blob. */ - Blob() : destroy(&nullDestroy) {} + Blob() { setType<void>(); } + + /** Copy a blob. */ + Blob(const Blob& b) { *this = b; } + + /** Assign a blob */ + Blob& operator=(const Blob& b) { + setType<void>(); // Exception safety. + b.copy(this->get(), b.get()); + copy = b.copy; + destroy = b.destroy; + return *this; + } /** @see construct() */ template<class Expr> - Blob( const Expr & expr ) : destroy(nullDestroy) { construct(expr,&expr); } + Blob( const Expr & expr ) { setType<void>(); construct(expr,&expr); } ~Blob() { clear(); } @@ -106,7 +135,7 @@ class Blob /** Copy construct an instance of T into the Blob. */ template<class T> - Blob& operator=(const T& x) { construct(in_place<T>(x)); } + Blob& operator=(const T& x) { construct(in_place<T>(x)); return *this; } /** Get pointer to blob contents. Caller must know how to cast it. */ void* get() { return store.address(); } @@ -116,17 +145,15 @@ class Blob /** Destroy the object in the blob making it empty. */ void clear() { - void (*saveDestroy)(void*) = destroy; // Exception safety - destroy = &nullDestroy; - saveDestroy(store.address()); + void (*oldDestroy)(void*) = destroy; + setType<void>(); + oldDestroy(store.address()); } - /** True if there is no object allocated in the blob */ - bool empty() { return destroy==nullDestroy; } - static size_t size() { return Size; } }; + }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp index 7a7bf22f15..53a01141c1 100644 --- a/cpp/src/qpid/framing/BodyHandler.cpp +++ b/cpp/src/qpid/framing/BodyHandler.cpp @@ -25,25 +25,28 @@ #include "AMQContentBody.h" #include "AMQHeartbeatBody.h" +#include <boost/cast.hpp> + using namespace qpid::framing; using namespace boost; BodyHandler::~BodyHandler() {} -void BodyHandler::handleBody(shared_ptr<AMQBody> body) { +// TODO aconway 2007-08-13: Replace with visitor. +void BodyHandler::handleBody(AMQBody* body) { switch(body->type()) { case METHOD_BODY: - handleMethod(shared_polymorphic_cast<AMQMethodBody>(body)); + handleMethod(polymorphic_downcast<AMQMethodBody*>(body)); break; case HEADER_BODY: - handleHeader(shared_polymorphic_cast<AMQHeaderBody>(body)); + handleHeader(polymorphic_downcast<AMQHeaderBody*>(body)); break; case CONTENT_BODY: - handleContent(shared_polymorphic_cast<AMQContentBody>(body)); + handleContent(polymorphic_downcast<AMQContentBody*>(body)); break; case HEARTBEAT_BODY: - handleHeartbeat(shared_polymorphic_cast<AMQHeartbeatBody>(body)); + handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body)); break; default: QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type()); diff --git a/cpp/src/qpid/framing/BodyHandler.h b/cpp/src/qpid/framing/BodyHandler.h index 07d1658afa..9ded737195 100644 --- a/cpp/src/qpid/framing/BodyHandler.h +++ b/cpp/src/qpid/framing/BodyHandler.h @@ -32,6 +32,8 @@ class AMQHeaderBody; class AMQContentBody; class AMQHeartbeatBody; +// TODO aconway 2007-08-10: rework using Visitor pattern? + /** * Interface to handle incoming frame bodies. * Derived classes provide logic for each frame type. @@ -39,13 +41,13 @@ class AMQHeartbeatBody; class BodyHandler { public: virtual ~BodyHandler(); - virtual void handleBody(boost::shared_ptr<AMQBody> body); + virtual void handleBody(AMQBody* body); protected: - virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0; - virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0; - virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0; - virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) = 0; + virtual void handleMethod(AMQMethodBody*) = 0; + virtual void handleHeader(AMQHeaderBody*) = 0; + virtual void handleContent(AMQContentBody*) = 0; + virtual void handleHeartbeat(AMQHeartbeatBody*) = 0; }; }} diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp index d61126bc7f..25ff46acdd 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.cpp +++ b/cpp/src/qpid/framing/ChannelAdapter.cpp @@ -23,6 +23,9 @@ #include "FrameHandler.h" #include "qpid/Exception.h" +#include "AMQMethodBody.h" +#include "qpid/framing/ConnectionOpenBody.h" + using boost::format; namespace qpid { @@ -45,7 +48,7 @@ void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v) handlers.out= make_shared_ptr(new OutputHandlerFrameHandler(out)); } -void ChannelAdapter::send(shared_ptr<AMQBody> body) +void ChannelAdapter::send(const AMQBody& body) { assertChannelOpen(); AMQFrame frame(getVersion(), getId(), body); diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h index 9e418013eb..729f5e7b47 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.h +++ b/cpp/src/qpid/framing/ChannelAdapter.h @@ -25,11 +25,11 @@ * */ -#include "qpid/shared_ptr.h" #include "BodyHandler.h" #include "ProtocolVersion.h" #include "amqp_types.h" #include "FrameHandler.h" +#include "OutputHandler.h" namespace qpid { namespace framing { @@ -66,7 +66,7 @@ class ChannelAdapter : protected BodyHandler { ChannelId getId() const { return id; } ProtocolVersion getVersion() const { return version; } - virtual void send(shared_ptr<AMQBody> body); + virtual void send(const AMQBody& body); virtual bool isOpen() const = 0; @@ -75,7 +75,7 @@ class ChannelAdapter : protected BodyHandler { void assertChannelOpen() const; void assertChannelNotOpen() const; - virtual void handleMethod(shared_ptr<AMQMethodBody>) = 0; + virtual void handleMethod(AMQMethodBody*) = 0; private: class ChannelAdapterHandler; diff --git a/cpp/src/qpid/framing/Frame.cpp b/cpp/src/qpid/framing/Frame.cpp deleted file mode 100644 index 1ba8112faa..0000000000 --- a/cpp/src/qpid/framing/Frame.cpp +++ /dev/null @@ -1,117 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/format.hpp> - -#include "Frame.h" -#include "qpid/QpidError.h" - - -namespace qpid { -namespace framing { - -namespace { -struct GetBodyVisitor : public NoBlankVisitor<AMQBody*> { - QPID_USING_NOBLANK(AMQBody*); - AMQBody* operator()(MethodHolder& h) const { return h.getMethod(); } - template <class T> AMQBody* operator()(T& t) const { return &t; } -}; -} - -AMQBody* Frame::getBody() { - return boost::apply_visitor(GetBodyVisitor(), body); -} - -const AMQBody* Frame::getBody() const { - return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body)); -} - -void Frame::encode(Buffer& buffer) -{ - buffer.putOctet(getBody()->type()); - buffer.putShort(channel); - buffer.putLong(getBody()->size()); - getBody()->encode(buffer); - buffer.putOctet(0xCE); -} - -uint32_t Frame::size() const{ - return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + getBody()->size() - + 1/*0xCE*/; -} - -bool Frame::decode(Buffer& buffer) -{ - if(buffer.available() < 7) - return false; - buffer.record(); - uint32_t frameSize = decodeHead(buffer); - if(buffer.available() < frameSize + 1){ - buffer.restore(); - return false; - } - decodeBody(buffer, frameSize); - uint8_t end = buffer.getOctet(); - if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); - return true; -} - -uint32_t Frame::decodeHead(Buffer& buffer){ - type = buffer.getOctet(); - channel = buffer.getShort(); - return buffer.getLong(); -} - -void Frame::decodeBody(Buffer& buffer, uint32_t size) -{ - switch(type) - { - case METHOD_BODY: - case REQUEST_BODY: - case RESPONSE_BODY: { - ClassId c=buffer.getShort(); - MethodId m=buffer.getShort(); - body = MethodHolder(c,m); - break; - } - case HEADER_BODY: - body = AMQHeaderBody(); - break; - case CONTENT_BODY: - body = AMQContentBody(); - break; - case HEARTBEAT_BODY: - body = AMQHeartbeatBody(); - break; - default: - THROW_QPID_ERROR( - FRAMING_ERROR, - boost::format("Unknown frame type %d") % type); - } - getBody()->decode(buffer, size); -} - -std::ostream& operator<<(std::ostream& out, const Frame& f) -{ - return out << "Frame[channel=" << f.getChannel() << "; " << f.body << "]"; -} - - -}} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/Frame.h b/cpp/src/qpid/framing/Frame.h deleted file mode 100644 index 02ab15a828..0000000000 --- a/cpp/src/qpid/framing/Frame.h +++ /dev/null @@ -1,72 +0,0 @@ -#ifndef QPID_FRAMING_FRAME_H -#define QPID_FRAMING_FRAME_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "AMQDataBlock.h" -#include "AMQHeaderBody.h" -#include "AMQContentBody.h" -#include "AMQHeartbeatBody.h" -#include "MethodHolder.h" - -namespace qpid { -namespace framing { - -class Frame : public AMQDataBlock { - public: - typedef boost::variant<boost::blank, - AMQHeaderBody, - AMQContentBody, - AMQHeartbeatBody, - MethodHolder> Variant; - - Frame(ChannelId channel_=0, const Variant& body_=Variant()) - : body(body_), channel(channel_) {} - - void encode(Buffer& buffer); - bool decode(Buffer& buffer); - uint32_t size() const; - - uint16_t getChannel() const { return channel; } - - AMQBody* getBody(); - const AMQBody* getBody() const; - - template <class T> T* castBody() { - return boost::polymorphic_downcast<T*>(getBody()); - } - - Variant body; - - private: - uint32_t decodeHead(Buffer& buffer); - void decodeBody(Buffer& buffer, uint32_t size); - - uint8_t type; - uint16_t channel; -}; - -std::ostream& operator<<(std::ostream&, const Frame&); - -}} // namespace qpid::framing - - -#endif /*!QPID_FRAMING_FRAME_H*/ diff --git a/cpp/src/qpid/framing/HeaderProperties.h b/cpp/src/qpid/framing/HeaderProperties.h index ae8b796aa9..0c805922e8 100644 --- a/cpp/src/qpid/framing/HeaderProperties.h +++ b/cpp/src/qpid/framing/HeaderProperties.h @@ -34,7 +34,7 @@ namespace framing { public: inline virtual ~HeaderProperties(){} - virtual uint8_t classId() = 0; + virtual uint8_t classId() const = 0; virtual uint32_t size() const = 0; virtual void encode(Buffer& buffer) const = 0; virtual void decode(Buffer& buffer, uint32_t size) = 0; diff --git a/cpp/src/qpid/framing/MethodHolder.cpp b/cpp/src/qpid/framing/MethodHolder.cpp index 43997e6d55..de8f0da6d4 100644 --- a/cpp/src/qpid/framing/MethodHolder.cpp +++ b/cpp/src/qpid/framing/MethodHolder.cpp @@ -23,8 +23,8 @@ #include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/Buffer.h" -// Note: MethodHolder::construct is in a separate generated file -// MethodHolder_construct.cpp +// Note: MethodHolder::construct is and operator= are code-generated +// in file MethodHolder_construct.cpp. using namespace boost; @@ -35,16 +35,18 @@ void MethodHolder::encode(Buffer& b) const { const AMQMethodBody* body = get(); b.putShort(body->amqpClassId()); b.putShort(body->amqpMethodId()); - body->encodeContent(b); + body->encode(b); } void MethodHolder::decode(Buffer& b) { - construct(std::make_pair(b.getShort(), b.getShort())); - get()->decodeContent(b); + ClassId c=b.getShort(); + MethodId m=b.getShort(); + construct(c,m); + get()->decode(b); } uint32_t MethodHolder::size() const { - return sizeof(Id)+get()->size(); + return sizeof(ClassId)+sizeof(MethodId)+get()->size(); } std::ostream& operator<<(std::ostream& out, const MethodHolder& h) { diff --git a/cpp/src/qpid/framing/MethodHolder.h b/cpp/src/qpid/framing/MethodHolder.h index fa14db2f79..59dbb1a708 100644 --- a/cpp/src/qpid/framing/MethodHolder.h +++ b/cpp/src/qpid/framing/MethodHolder.h @@ -22,64 +22,74 @@ * */ +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/amqp_types.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/Blob.h" #include "qpid/framing/MethodHolderMaxSize.h" // Generated file. +#include <boost/type_traits/is_base_and_derived.hpp> +#include <boost/utility/enable_if.hpp> + #include <utility> namespace qpid { namespace framing { class AMQMethodBody; +class AMQBody; class Buffer; +class MethodHolder; +std::ostream& operator<<(std::ostream& out, const MethodHolder& h); + /** * Holder for arbitrary method body. */ +// TODO aconway 2007-08-14: Fix up naming, this class should really be +// called AMQMethodBody and use a different name for the root of +// the concrete method body tree, which should not inherit AMQBody. +// class MethodHolder { - public: - typedef std::pair<ClassId, MethodId> Id; - - template <class T>static Id idOf() { - return std::make_pair(T::CLASS_ID, T::METHOD_ID); } + template <class T> struct EnableIfMethod: + public boost::enable_if<boost::is_base_and_derived<AMQMethodBody,T>,T> + {}; + template <class T> EnableIfMethod<T>& assertMethod(T& t) { return t; } + + public: MethodHolder() {} - MethodHolder(const Id& id) { construct(id); } - MethodHolder(ClassId& c, MethodId& m) { construct(std::make_pair(c,m)); } + MethodHolder(ClassId& c, MethodId& m) { construct(c,m); } - template <class M> - MethodHolder(const M& m) : blob(m), id(idOf<M>()) {} + /** Construct with a copy of a method body. */ + MethodHolder(const AMQMethodBody& m) { *this = m; } - template <class M> - MethodHolder& operator=(const M& m) { blob=m; id=idOf<M>(); return *this; } + /** Copy method body into holder. */ + MethodHolder& operator=(const AMQMethodBody&); - /** Construct the method body corresponding to Id */ - void construct(const Id&); + /** Construct the method body corresponding to class/method id */ + void construct(ClassId c, MethodId m); + uint8_t type() const { return 1; } void encode(Buffer&) const; void decode(Buffer&); uint32_t size() const; AMQMethodBody* get() { - return static_cast<AMQMethodBody*>(blob.get()); + return reinterpret_cast<AMQMethodBody*>(blob.get()); } const AMQMethodBody* get() const { - return static_cast<const AMQMethodBody*>(blob.get()); + return reinterpret_cast<const AMQMethodBody*>(blob.get()); } - AMQMethodBody* operator* () { return get(); } - const AMQMethodBody* operator*() const { return get(); } - AMQMethodBody* operator-> () { return get(); } - const AMQMethodBody* operator->() const { return get(); } - private: Blob<MAX_METHODBODY_SIZE> blob; - Id id; + class CopyVisitor; + friend struct CopyVisitor; }; -std::ostream& operator<<(std::ostream& out, const MethodHolder& h); + }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h index 888c6a7382..eec28333bc 100644 --- a/cpp/src/qpid/framing/amqp_framing.h +++ b/cpp/src/qpid/framing/amqp_framing.h @@ -26,7 +26,6 @@ #include "AMQHeaderBody.h" #include "AMQContentBody.h" #include "AMQHeartbeatBody.h" -#include "qpid/framing/AMQP_MethodVersionMap.h" #include "InputHandler.h" #include "OutputHandler.h" #include "InitiationHandler.h" diff --git a/cpp/src/qpid/framing/amqp_types.h b/cpp/src/qpid/framing/amqp_types.h index 5ea08a69af..bfd5b2206f 100644 --- a/cpp/src/qpid/framing/amqp_types.h +++ b/cpp/src/qpid/framing/amqp_types.h @@ -44,8 +44,6 @@ namespace framing { using std::string; typedef uint8_t FrameType; typedef uint16_t ChannelId; -typedef uint64_t RequestId; -typedef uint64_t ResponseId; typedef uint32_t BatchOffset; typedef uint16_t ClassId; typedef uint16_t MethodId; diff --git a/cpp/src/qpid/shared_ptr.h b/cpp/src/qpid/shared_ptr.h index eb5f3f906a..0c933ea6a6 100644 --- a/cpp/src/qpid/shared_ptr.h +++ b/cpp/src/qpid/shared_ptr.h @@ -42,6 +42,8 @@ shared_ptr<T> make_shared_ptr(T* ptr, D deleter) { return shared_ptr<T>(ptr, deleter); } +inline void nullDeleter(void const *) {} + } // namespace qpid diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index 6ee7a84beb..062564d8ed 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -134,7 +134,9 @@ Socket::recv(void* data, size_t size) const int Socket::listen(int port, int backlog) const { - const int& socket = impl->fd; + const int& socket = impl->fd; + int yes=1; + QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); struct sockaddr_in name; name.sin_family = AF_INET; name.sin_port = htons(port); diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index eb67601875..6c9b910637 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -31,6 +31,7 @@ #include "MockChannel.h" #include "qpid/broker/Connection.h" #include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/ConnectionStartBody.h" #include <vector> using namespace boost; @@ -233,18 +234,18 @@ class BrokerChannelTest : public CppUnit::TestCase Queue::shared_ptr queue(new Queue("my_queue")); exchange->bind(queue, "", 0); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + AMQHeaderBody header(BASIC); uint64_t contentSize(0); for (int i = 0; i < 3; i++) { contentSize += data[i].size(); } - header->setContentSize(contentSize); + header.setContentSize(contentSize); channel.handlePublish(msg); - channel.handleHeader(header); + channel.handleHeader(&header); for (int i = 0; i < 3; i++) { - AMQContentBody::shared_ptr body(new AMQContentBody(data[i])); - channel.handleContent(body); + AMQContentBody body(data[i]); + channel.handleContent(&body); } Message::shared_ptr msg2 = queue->dequeue(); CPPUNIT_ASSERT_EQUAL(msg, msg2.get()); @@ -312,8 +313,7 @@ class BrokerChannelTest : public CppUnit::TestCase //there will always be a connection-start frame CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size()); CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); - CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( - handler.frames[0].getBody().get())); + CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(handler.frames[0].getBody())); const string data("abcdefghijklmn"); @@ -340,17 +340,17 @@ class BrokerChannelTest : public CppUnit::TestCase { BasicMessage* msg = new BasicMessage( 0, exchange, routingKey, false, false); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(contentSize); - msg->setHeader(header); + AMQHeaderBody header(BASIC); + header.setContentSize(contentSize); + msg->setHeader(&header); msg->getHeaderProperties()->setMessageId(messageId); return msg; } void addContent(Message::shared_ptr msg, const string& data) { - AMQContentBody::shared_ptr body(new AMQContentBody(data)); - msg->addContent(body); + AMQContentBody body(data); + msg->addContent(&body); } }; diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp index 811fc0133e..2d23b87627 100644 --- a/cpp/src/tests/Cluster.cpp +++ b/cpp/src/tests/Cluster.cpp @@ -33,7 +33,7 @@ static const ProtocolVersion VER; /** Verify membership in a cluster with one member. */ BOOST_AUTO_TEST_CASE(testClusterOne) { TestCluster cluster("clusterOne", "amqp:one:1"); - AMQFrame send(VER, 1, new SessionPingBody(VER)); + AMQFrame send(VER, 1, SessionPingBody(VER)); cluster.handle(send); AMQFrame received; BOOST_REQUIRE(cluster.received.waitPop(received)); @@ -59,7 +59,7 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) { BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child. // Exchange frames with child. - AMQFrame send(VER, 1, new SessionPingBody(VER)); + AMQFrame send(VER, 1, SessionPingBody(VER)); cluster.handle(send); AMQFrame received; BOOST_REQUIRE(cluster.received.waitPop(received)); @@ -90,8 +90,8 @@ struct CountHandler : public FrameHandler { /** Test the ClassifierHandler */ BOOST_AUTO_TEST_CASE(testClassifierHandlerWiring) { - AMQFrame queueDecl(VER, 0, new QueueDeclareBody(VER)); - AMQFrame messageTrans(VER, 0, new MessageTransferBody(VER)); + AMQFrame queueDecl(VER, 0, QueueDeclareBody(VER)); + AMQFrame messageTrans(VER, 0, MessageTransferBody(VER)); shared_ptr<CountHandler> wiring(new CountHandler()); shared_ptr<CountHandler> other(new CountHandler()); diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h index 510a788f7c..366ea92a8b 100644 --- a/cpp/src/tests/Cluster.h +++ b/cpp/src/tests/Cluster.h @@ -55,8 +55,6 @@ class TestHandler : public Handler<T&>, public ConcurrentQueue<T> typedef TestHandler<AMQFrame> TestFrameHandler; -void nullDeleter(void*) {} - struct TestCluster : public Cluster { TestCluster(string name, string url) diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp index 7bea9d0490..62ccb9bd72 100644 --- a/cpp/src/tests/Cluster_child.cpp +++ b/cpp/src/tests/Cluster_child.cpp @@ -40,7 +40,7 @@ void clusterTwo() { BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *frame.getBody()); BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent - AMQFrame send(VER, 1, new SessionPongBody(VER)); + AMQFrame send(VER, 1, SessionPongBody(VER)); cluster.handle(send); BOOST_REQUIRE(cluster.received.waitPop(frame)); BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *frame.getBody()); diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp index b0aeb9db6f..a0dd8d37f6 100644 --- a/cpp/src/tests/FramingTest.cpp +++ b/cpp/src/tests/FramingTest.cpp @@ -18,24 +18,27 @@ * under the License. * */ +#include "InProcessBroker.h" +#include "qpid/QpidError.h" +#include "qpid/client/ClientExchange.h" +#include "qpid/client/ClientQueue.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Connector.h" +#include "qpid/framing/AMQP_HighestVersion.h" +#include "qpid/framing/BasicGetOkBody.h" #include "qpid/framing/ConnectionRedirectBody.h" #include "qpid/framing/ProtocolVersion.h" +#include "qpid/framing/all_method_bodies.h" #include "qpid/framing/amqp_framing.h" -#include <iostream> #include "qpid_test_plugin.h" + +#include <boost/bind.hpp> +#include <boost/lexical_cast.hpp> +#include <iostream> + +#include <memory> #include <sstream> #include <typeinfo> -#include "qpid/QpidError.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include "InProcessBroker.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Connector.h" -#include "qpid/client/ClientExchange.h" -#include "qpid/client/ClientQueue.h" -#include "qpid/framing/BasicGetOkBody.h" -#include <memory> -#include <boost/lexical_cast.hpp> -#include <boost/bind.hpp> using namespace qpid; using namespace qpid::framing; @@ -62,13 +65,11 @@ class FramingTest : public CppUnit::TestCase CPPUNIT_TEST(testInlineContent); CPPUNIT_TEST(testContentReference); CPPUNIT_TEST(testContentValidation); - CPPUNIT_TEST(testRequestResponseRoundtrip); CPPUNIT_TEST_SUITE_END(); private: Buffer buffer; ProtocolVersion version; - AMQP_MethodVersionMap versionMap; public: @@ -77,10 +78,10 @@ class FramingTest : public CppUnit::TestCase void testBasicQosBody() { BasicQosBody in(version, 0xCAFEBABE, 0xABBA, true); - in.encodeContent(buffer); + in.encode(buffer); buffer.flip(); BasicQosBody out(version); - out.decodeContent(buffer); + out.decode(buffer); CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } @@ -88,10 +89,10 @@ class FramingTest : public CppUnit::TestCase { std::string s = "security credential"; ConnectionSecureBody in(version, s); - in.encodeContent(buffer); + in.encode(buffer); buffer.flip(); ConnectionSecureBody out(version); - out.decodeContent(buffer); + out.decode(buffer); CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } @@ -100,10 +101,10 @@ class FramingTest : public CppUnit::TestCase std::string a = "hostA"; std::string b = "hostB"; ConnectionRedirectBody in(version, a, b); - in.encodeContent(buffer); + in.encode(buffer); buffer.flip(); ConnectionRedirectBody out(version); - out.decodeContent(buffer); + out.decode(buffer); CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } @@ -111,10 +112,10 @@ class FramingTest : public CppUnit::TestCase { std::string s = "text"; AccessRequestBody in(version, s, true, false, true, false, true); - in.encodeContent(buffer); + in.encode(buffer); buffer.flip(); AccessRequestBody out(version); - out.decodeContent(buffer); + out.decode(buffer); CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } @@ -124,10 +125,10 @@ class FramingTest : public CppUnit::TestCase std::string t = "tag"; BasicConsumeBody in(version, 0, q, t, false, true, false, false, FieldTable()); - in.encodeContent(buffer); + in.encode(buffer); buffer.flip(); BasicConsumeBody out(version); - out.decodeContent(buffer); + out.decode(buffer); CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } @@ -137,7 +138,7 @@ class FramingTest : public CppUnit::TestCase std::string a = "hostA"; std::string b = "hostB"; AMQFrame in(version, 999, - new ConnectionRedirectBody(version, a, b)); + ConnectionRedirectBody(version, a, b)); in.encode(buffer); buffer.flip(); AMQFrame out; @@ -148,7 +149,7 @@ class FramingTest : public CppUnit::TestCase void testBasicConsumeOkBodyFrame() { std::string s = "hostA"; - AMQFrame in(version, 999, new BasicConsumeOkBody(version, s)); + AMQFrame in(version, 999, BasicConsumeOkBody(version, s)); in.encode(buffer); buffer.flip(); AMQFrame out; @@ -211,46 +212,6 @@ class FramingTest : public CppUnit::TestCase } - // expect may contain null chars so use string(ptr,size) constructor - // Use sizeof(expect)-1 to strip the trailing null. -#define ASSERT_FRAME(expect, frame) \ - CPPUNIT_ASSERT_EQUAL(string(expect, sizeof(expect)-1), boost::lexical_cast<string>(frame)) - - void testRequestResponseRoundtrip() { - boost::shared_ptr<broker::InProcessBroker> ibroker(new broker::InProcessBroker(version)); - client::Connection clientConnection(boost::static_pointer_cast<client::Connector>(ibroker)); - clientConnection.open(""); - client::Channel c; - clientConnection.openChannel(c); - - client::Exchange exchange( - "MyExchange", client::Exchange::TOPIC_EXCHANGE); - client::Queue queue("MyQueue", true); - c.declareExchange(exchange); - c.declareQueue(queue); - c.bind(exchange, queue, "MyTopic", framing::FieldTable()); - c.close(); - clientConnection.close(); - broker::InProcessBroker::Conversation::const_iterator i = ibroker->conversation.begin(); - ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionOpenOk: knownHosts=]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; ChannelOpen: outOfBand=]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; ChannelOpenOk: ]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=1; rangedExecutionSet={}]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=3; rangedExecutionSet={}]", *i++); - } }; diff --git a/cpp/src/tests/InMemoryContentTest.cpp b/cpp/src/tests/InMemoryContentTest.cpp index cb729f9930..bc95548d45 100644 --- a/cpp/src/tests/InMemoryContentTest.cpp +++ b/cpp/src/tests/InMemoryContentTest.cpp @@ -65,9 +65,8 @@ public: CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size()); for (unsigned int i = 0; i < outCount; i++) { - AMQContentBody::shared_ptr chunk( - dynamic_pointer_cast<AMQContentBody>( - channel.out.frames[i].getBody())); + AMQContentBody* chunk = dynamic_cast<AMQContentBody*>( + channel.out.frames[i].getBody()); CPPUNIT_ASSERT(chunk); CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData()); CPPUNIT_ASSERT_EQUAL( @@ -78,8 +77,8 @@ public: void addframes(InMemoryContent& content, size_t frameCount, string* frameData) { for (unsigned int i = 0; i < frameCount; i++) { - AMQContentBody::shared_ptr frame(new AMQContentBody(frameData[i])); - content.add(frame); + AMQContentBody frame(frameData[i]); + content.add(&frame); } } diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h index 9f30ee584d..0e5f3895f9 100644 --- a/cpp/src/tests/InProcessBroker.h +++ b/cpp/src/tests/InProcessBroker.h @@ -54,7 +54,7 @@ class InProcessBroker : public client::Connector { template <class MethodType> MethodType* asMethod() { - return dynamic_cast<MethodType*>(frame.getBody().get()); + return dynamic_cast<MethodType*>(frame.getBody()); } framing::AMQFrame frame; Sender sender; diff --git a/cpp/src/tests/LazyLoadedContentTest.cpp b/cpp/src/tests/LazyLoadedContentTest.cpp index 7bac3613ad..df46f6b48e 100644 --- a/cpp/src/tests/LazyLoadedContentTest.cpp +++ b/cpp/src/tests/LazyLoadedContentTest.cpp @@ -97,7 +97,8 @@ public: CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size()); for (unsigned int i = 0; i < outCount; i++) { - AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[i].getBody())); + AMQContentBody* chunk(dynamic_cast<AMQContentBody*>( + channel.out.frames[i].getBody())); CPPUNIT_ASSERT(chunk); CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData()); CPPUNIT_ASSERT_EQUAL( diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 67b173af33..c9a08e6075 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -43,7 +43,6 @@ check_PROGRAMS+=Shlib Shlib_SOURCES=Shlib.cpp Shlib_LDADD=-lboost_unit_test_framework $(lib_common) - # TODO aconway 2007-08-07: Why aren't these tests run automatically? check_PROGRAMS+=ConcurrentQueue @@ -54,10 +53,6 @@ check_PROGRAMS+=Serializer Serializer_SOURCES=Serializer.cpp Serializer_LDADD=-lboost_unit_test_framework $(lib_common) -check_PROGRAMS+=Visitor -Visitor_SOURCES=Visitor.cpp -Visitor_LDADD=-lboost_unit_test_framework - include cluster.mk # diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp index 3d2ee1aaea..a12fc603ce 100644 --- a/cpp/src/tests/MessageBuilderTest.cpp +++ b/cpp/src/tests/MessageBuilderTest.cpp @@ -119,12 +119,12 @@ class MessageBuilderTest : public CppUnit::TestCase Message::shared_ptr message( new BasicMessage( 0, "test", "my_routing_key", false, false)); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(0); + AMQHeaderBody header(BASIC); + header.setContentSize(0); builder.initialise(message); CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(header); + builder.setHeader(&header); CPPUNIT_ASSERT(handler.msg); CPPUNIT_ASSERT_EQUAL(message, handler.msg); } @@ -137,15 +137,15 @@ class MessageBuilderTest : public CppUnit::TestCase Message::shared_ptr message( new BasicMessage(0, "test", "my_routing_key", false, false)); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(7); - AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); + AMQHeaderBody header(BASIC); + header.setContentSize(7); + AMQContentBody part1(data1); builder.initialise(message); CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(header); + builder.setHeader(&header); CPPUNIT_ASSERT(!handler.msg); - builder.addContent(part1); + builder.addContent(&part1); CPPUNIT_ASSERT(handler.msg); CPPUNIT_ASSERT_EQUAL(message, handler.msg); } @@ -159,18 +159,18 @@ class MessageBuilderTest : public CppUnit::TestCase Message::shared_ptr message( new BasicMessage(0, "test", "my_routing_key", false, false)); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(14); - AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); - AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); + AMQHeaderBody header(BASIC); + header.setContentSize(14); + AMQContentBody part1(data1); + AMQContentBody part2(data2); builder.initialise(message); CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(header); + builder.setHeader(&header); CPPUNIT_ASSERT(!handler.msg); - builder.addContent(part1); + builder.addContent(&part1); CPPUNIT_ASSERT(!handler.msg); - builder.addContent(part2); + builder.addContent(&part2); CPPUNIT_ASSERT(handler.msg); CPPUNIT_ASSERT_EQUAL(message, handler.msg); } @@ -189,19 +189,19 @@ class MessageBuilderTest : public CppUnit::TestCase Message::shared_ptr message( new BasicMessage(0, "test", "my_routing_key", false, false)); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(14); - BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties()); + AMQHeaderBody header(BASIC); + header.setContentSize(14); + BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header.getProperties()); properties->setMessageId("MyMessage"); properties->getHeaders().setString("abc", "xyz"); - AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); - AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); + AMQContentBody part1(data1); + AMQContentBody part2(data2); builder.initialise(message); - builder.setHeader(header); - builder.addContent(part1); - builder.addContent(part2); + builder.setHeader(&header); + builder.addContent(&part1); + builder.addContent(&part2); CPPUNIT_ASSERT(handler.msg); CPPUNIT_ASSERT_EQUAL(message, handler.msg); diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp index 1e976312be..1fbb18b7d3 100644 --- a/cpp/src/tests/MessageTest.cpp +++ b/cpp/src/tests/MessageTest.cpp @@ -47,13 +47,13 @@ class MessageTest : public CppUnit::TestCase BasicMessage::shared_ptr msg( new BasicMessage(0, exchange, routingKey, false, false)); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(14); - AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); - AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); - msg->setHeader(header); - msg->addContent(part1); - msg->addContent(part2); + AMQHeaderBody header(BASIC); + header.setContentSize(14); + AMQContentBody part1(data1); + AMQContentBody part2(data2); + msg->setHeader(&header); + msg->addContent(&part1); + msg->addContent(&part2); msg->getHeaderProperties()->setMessageId(messageId); msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); @@ -75,7 +75,8 @@ class MessageTest : public CppUnit::TestCase MockChannel channel(1); msg->deliver(channel, "ignore", 0, 100); CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size()); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[2].getBody())); + AMQContentBody* contentBody( + dynamic_cast<AMQContentBody*>(channel.out.frames[2].getBody())); CPPUNIT_ASSERT(contentBody); CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData()); } diff --git a/cpp/src/tests/MockChannel.h b/cpp/src/tests/MockChannel.h index fb2de98d2a..b9a7c0a2a2 100644 --- a/cpp/src/tests/MockChannel.h +++ b/cpp/src/tests/MockChannel.h @@ -51,13 +51,10 @@ struct MockChannel : public qpid::framing::ChannelAdapter bool isOpen() const { return true; } - void handleHeader( - boost::shared_ptr<qpid::framing::AMQHeaderBody> b) { send(b); } - void handleContent( - boost::shared_ptr<qpid::framing::AMQContentBody> b) { send(b); } - void handleHeartbeat( - boost::shared_ptr<qpid::framing::AMQHeartbeatBody> b) { send(b); } - void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method) { send(method); }; + void handleHeader(qpid::framing::AMQHeaderBody* b) { send(*b); } + void handleContent(qpid::framing::AMQContentBody* b) { send(*b); } + void handleHeartbeat(qpid::framing::AMQHeartbeatBody* b) { send(*b); } + void handleMethod(qpid::framing::AMQMethodBody* b) { send(*b); }; }; diff --git a/cpp/src/tests/ReferenceTest.cpp b/cpp/src/tests/ReferenceTest.cpp index b3dd44bf7d..411462564a 100644 --- a/cpp/src/tests/ReferenceTest.cpp +++ b/cpp/src/tests/ReferenceTest.cpp @@ -67,17 +67,17 @@ class ReferenceTest : public CppUnit::TestCase Reference::shared_ptr r1(registry.open("bar")); - MessageTransferBody::shared_ptr t1(new MessageTransferBody(v)); + MessageTransferBody t1(v); // TODO aconway 2007-04-03: hack around lack of generated setters. Clean this up. - const_cast<framing::Content&>(t1->getBody()) = framing::Content(REFERENCE,"bar"); - MessageMessage::shared_ptr m1(new MessageMessage(0, t1, r1)); + const_cast<framing::Content&>(t1.getBody()) = framing::Content(REFERENCE,"bar"); + MessageMessage::shared_ptr m1(new MessageMessage(0, &t1, r1)); - MessageTransferBody::shared_ptr t2(new MessageTransferBody(v)); - const_cast<framing::Content&>(t2->getBody()) = framing::Content(REFERENCE,"bar"); - MessageMessage::shared_ptr m2(new MessageMessage(0, t2, r1)); + MessageTransferBody t2(v); + const_cast<framing::Content&>(t2.getBody()) = framing::Content(REFERENCE,"bar"); + MessageMessage::shared_ptr m2(new MessageMessage(0, &t2, r1)); - MessageAppendBody::shared_ptr a1(new MessageAppendBody(v)); - MessageAppendBody::shared_ptr a2(new MessageAppendBody(v)); + MessageAppendBody a1(v); + MessageAppendBody a2(v); r1->addMessage(m1); r1->addMessage(m2); @@ -86,12 +86,6 @@ class ReferenceTest : public CppUnit::TestCase r1->append(a2); CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getAppends().size()); r1->close(); - - CPPUNIT_ASSERT_EQUAL(m1->getReference()->getAppends()[0], a1); - CPPUNIT_ASSERT_EQUAL(m1->getReference()->getAppends()[1], a2); - - CPPUNIT_ASSERT_EQUAL(m2->getReference()->getAppends()[0], a1); - CPPUNIT_ASSERT_EQUAL(m2->getReference()->getAppends()[1], a2); } }; diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp index a5d9eb69a5..24e8aac701 100644 --- a/cpp/src/tests/TxAckTest.cpp +++ b/cpp/src/tests/TxAckTest.cpp @@ -70,7 +70,8 @@ public: for(int i = 0; i < 10; i++){ Message::shared_ptr msg( new BasicMessage(0, "exchange", "routing_key", false, false)); - msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); + AMQHeaderBody body(BASIC); + msg->setHeader(&body); msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); messages.push_back(msg); deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1))); diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp index 05c03754f9..3391be5128 100644 --- a/cpp/src/tests/TxPublishTest.cpp +++ b/cpp/src/tests/TxPublishTest.cpp @@ -72,7 +72,8 @@ public: msg(new BasicMessage(0, "exchange", "routing_key", false, false)), op(msg) { - msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); + AMQHeaderBody body(BASIC); + msg->setHeader(&body); msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); op.deliverTo(queue1); op.deliverTo(queue2); diff --git a/cpp/src/tests/Visitor.cpp b/cpp/src/tests/Visitor.cpp deleted file mode 100644 index 0cb3cf15bb..0000000000 --- a/cpp/src/tests/Visitor.cpp +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/framing/Visitor.h" - -#define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*> -#include <boost/test/auto_unit_test.hpp> -#include <boost/tuple/tuple.hpp> - -using namespace std; -using namespace qpid::framing; - -struct DummyA; -struct DummyB; -struct DummyC; - -QPID_VISITOR(DummyVisitor, (DummyA)(DummyB)(DummyC)); - -struct DummyFrame : public VisitableRoot<DummyVisitor> {}; - -struct DummyA : public Visitable<DummyA, DummyFrame> {}; -struct DummyB : public Visitable<DummyB, DummyFrame> {}; -struct DummyC : public Visitable<DummyC, DummyFrame> {}; - -struct TestDummyVisitor : public DummyVisitor { - boost::tuple<DummyA*, DummyB*, DummyC*> dummies; - void visit(DummyA& a) { dummies.get<0>() = &a; } - void visit(DummyB& b) { dummies.get<1>() = &b; } - void visit(DummyC& c) { dummies.get<2>() = &c; } -}; - -BOOST_AUTO_TEST_CASE(Visitor_accept) { - TestDummyVisitor v; - DummyA a; - DummyB b; - DummyC c; - a.accept(v); - BOOST_CHECK_EQUAL(&a, v.dummies.get<0>()); - b.accept(v); - BOOST_CHECK_EQUAL(&b, v.dummies.get<1>()); - c.accept(v); - BOOST_CHECK_EQUAL(&c, v.dummies.get<2>()); -} |